当前位置: 首页 > news >正文

家用机做网站服务器网站开发与网页设计大作业

家用机做网站服务器,网站开发与网页设计大作业,wordpress 删除标签,wordpress公司模板下载1 前言 在Spring Kafka中,可以通过配置拦截器来实现对生产者和消费者消息的拦截。拦截器可以用来记录日志、修改消息等等。 2 基于Kafka管理的拦截器 Kafka原生提供的拦截器接口是org.apache.kafka.clients.producer.ProducerInterceptor和 org.apache.kafka.cli…

1 前言

在Spring Kafka中,可以通过配置拦截器来实现对生产者和消费者消息的拦截。拦截器可以用来记录日志、修改消息等等。

2 基于Kafka管理的拦截器

Kafka原生提供的拦截器接口是org.apache.kafka.clients.producer.ProducerInterceptor
org.apache.kafka.clients.consumer.ConsumerInterceptor, 示例如下:

2.1 定义拦截器

生产者拦截器

import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;public class CustomProducerInterceptor implements ProducerInterceptor<String, String> {@Overridepublic ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {// 在发送消息之前操作System.out.println("Sending message: " + record.value());return record; // 继续发送}@Overridepublic void onAcknowledgement(RecordMetadata metadata, Exception exception) {}@Overridepublic void close() {// 资源清理}@Overridepublic void configure(Map<String, ?> configs) {// 可以在这里获取配置}
}

2.2 定义消费者拦截器

import org.apache.kafka.clients.consumer.ConsumerInterceptor;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.TopicPartition;public class CustomConsumerInterceptor implements ConsumerInterceptor<String, String> {@Overridepublic void configure(Map<String, ?> configs) {// 配置拦截器}@Overridepublic ConsumerRecords<String, String> onConsume(ConsumerRecords<String, String> records) {// 处理接收到的消息records.forEach(record -> {System.out.println("Consumed message: " + record.value());});return records;}@Overridepublic void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {}@Overridepublic void close() {// 资源清理}
}

2.3 添加拦截器

方式一,通过工厂自定义器设置拦截器

import org.springframework.boot.autoconfigure.kafka.DefaultKafkaProducerFactoryCustomizer;
import org.springframework.boot.autoconfigure.kafka.DefaultKafkaConsumerFactoryCustomizer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.consumer.ConsumerConfig;@Component
public class KafkaProducerCustomizer implements DefaultKafkaProducerFactoryCustomizer, DefaultKafkaConsumerFactoryCustomizer {@Overridepublic void customize(DefaultKafkaProducerFactory<?, ?> producerFactory) {producerFactory.updateConfigs(Map.of(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, CustomProducerInterceptor.class.getName()));}@Overridepublic void customize(DefaultKafkaConsumerFactory<?, ?> consumerFactory) {consumerFactory.updateConfigs(Map.of(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, CustomConsumerInterceptor.class.getName()));}
}

方式二,通过配置设置拦截器

spring:kafka:producer:properties:interceptor.classes: org.example.kafka.CustomProducerInterceptorconsumer:properties:interceptor.classes: org.example.kafka.CustomConsumerInterceptor

2.4 拦截器使用Spring容器中的Bean

上面的方法可以看到,拦截器由于没有在Spring容器中管理,则无法使用容器中其他Bean来做业务处理,那么可以另外一种策略达到让拦截器受Spring容器管理的需求, 已消息生产者拦截器为例:
Bean定义

@Component
public class MyComponent {public void checkMessage(String message) {System.out.println("Sending message: " + message);}
}

生产者拦截器

import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;public class CustomProducerInterceptor implements ProducerInterceptor<String, String> {private MyComponent myComponent;@Overridepublic ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {myComponent.checkMessage(record.value());return record; // 继续发送}@Overridepublic void onAcknowledgement(RecordMetadata metadata, Exception exception) {}@Overridepublic void close() {// 资源清理}@Overridepublic void configure(Map<String, ?> configs) {myComponent = configs.get("myComponent");}
}

设置拦截器

import org.springframework.boot.autoconfigure.kafka.DefaultKafkaProducerFactoryCustomizer;
import org.springframework.boot.autoconfigure.kafka.DefaultKafkaConsumerFactoryCustomizer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.consumer.ConsumerConfig;@Component
public class KafkaProducerCustomizer implements DefaultKafkaProducerFactoryCustomizer {@Autowiredprivate MyComponent myComponent;@Overridepublic void customize(DefaultKafkaProducerFactory<?, ?> producerFactory) {producerFactory.updateConfigs(Map.of("myComponent", myComponent,ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, CustomProducerInterceptor.class.getName()));}
}

3 基于Spring-Kafka管理的拦截器

基于Kafka管理的拦截器对于消费消息的拦截只能做到批量消费级别(ConsumerRecords),如果要对单条消息拦截,可以使用Spring-Kafka提供的org.springframework.kafka.listener.RecordInterceptor接口。

3.1 单条消息拦截接口定义

由于此拦截器是受Spring容器管理的,所以可以通过@Component注解自动注入到容器中,进行自动拦截。

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.listener.RecordInterceptor;@Component
public class CustomRecordInterceptor implements RecordInterceptor<Object, Object> {@Overridepublic ConsumerRecord<Object, Object> intercept(ConsumerRecord<Object, Object> record) {System.out.println(record.topic());return record;}
}
http://www.yayakq.cn/news/839375/

相关文章:

  • 上海门户网站建设方案营销平台
  • 网站制作寻找客户淘宝详情页设计一个多少钱
  • 大家做公司网站 都是在哪里做的深圳万户网络科技有限公司
  • 重庆网站建设只选承越你们网站做301
  • 做简历的网站 知乎那个网站平台可以做兼职
  • 有哪些建设工程类网站阿里巴巴网站建设分析
  • 苏州建设工程招标网站wordpress登陆网址
  • 未来做那个网站能致富怎么免费制作网站平台
  • 先做产品网站还是app小装修网站开发费用
  • 查询注册过哪些网站wordpress remove google
  • 西安网站制作哪家便宜又好dw建设手机网站
  • 建设网站和别人公司重名资源网站优化排名
  • 天津网站建设方案咨询简述网站建设的基本特征
  • 网站建设客户需求分析调查表建设网站收费
  • 网站百度排名查询服装设计公司排名前十强
  • 滴滴优惠券网站怎么做wordpress文章图片幻灯片
  • 华为荣耀手机商城官方网站哈尔滨网站建设 哈尔滨网站推广
  • wordpress 网站备案国外做测评的网站
  • 广州网站改版设计制作程序员源码网站
  • 村网站开设两学一做栏目建立外贸网站多少钱
  • 网站建设公司企业网站网络营销推广有哪些方法
  • 网站开发的项目总结如何开发软件程序
  • 创建公司网站免费西安网站建设第一品牌
  • 网站建设与开发英文文献快速制作效果图软件
  • 做网站 先备案么wordpress模板主题介绍
  • 企业网站写好如何发布百度云加速
  • 企业网站优化公司有哪些网站开发类的合同范本
  • 乌海网站开发网页制作网站发布教学设计
  • 龙岗网站建公司网站免费建站怎么样
  • 广告素材网站哪个比较好正能量免费下载