当前位置: 首页 > news >正文

网站建设实习小结济南建设工程交易网官网

网站建设实习小结,济南建设工程交易网官网,什么网站百度收录快,建一个简单的公司官网需要多少钱目录 1、消息生产流程 2、生产者常见参数配置 3、序列化器 基本概念 自定义序列化器 4、分区器 默认分区规则 自定义分区器 5、生产者拦截器 作用 自定义拦截器 6、生产者原理解析 1、消息生产流程 2、生产者常见参数配置 3、序列化器 基本概念 在Kafka中保存的数…

目录

1、消息生产流程

2、生产者常见参数配置

3、序列化器

基本概念

自定义序列化器

4、分区器

默认分区规则

自定义分区器

5、生产者拦截器

作用

自定义拦截器

6、生产者原理解析


1、消息生产流程

2、生产者常见参数配置

3、序列化器

基本概念

  • 在Kafka中保存的数据都是字节数组。
  • 消息发送前,需要将消息序列化为字节数组进行发送。
  • 生产者通过key.serializer和value.serializer指定key和value的序列化器。
  • Kafka使用org.apache.kafka.common.serialization.Serializer接口定义序列化器。
  • Kafka已实现的序列化器有:ByteArraySerializer、ByteBufferSerializer、BytesSerializer、DoubleSerializer、FloatSerializer、IntegerSerializer、StringSerializer、LongSerializer、ShortSerializer。

自定义序列化器

实现org.apache.kafka.common.serialization.Serializer<T>接口,并实现其中的serializer方法。

@Data
public class User {private Integer userId;private String username;
}public class UserSerializer implements Serializer<User> {@Overridepublic void configure(Map<String, ?> configs, boolean isKey) {// do nothing}@Overridepublic byte[] serialize(String topic, User data) {try {// 如果数据是null,则返回nullif (data == null) return null;Integer userId = data.getUserId();String username = data.getUsername();int length = 0;byte[] bytes = null;if (null != username) {bytes = username.getBytes("utf-8");length = bytes.length;}// 第一个4字节存储userId的值// 第二个4字节存储username字节数组的长度int值// 第三个length长度,存储username序列化之后的字节数组ByteBuffer buffer = ByteBuffer.allocate(4 + 4 + length);buffer.putInt(userId);buffer.putInt(length);buffer.put(bytes);return buffer.array();} catch (UnsupportedEncodingException e) {throw new SerializationException("序列化数据异常");}}@Overridepublic void close() {// do nothing}
}

4、分区器

默认分区规则

KafkaProducer.partition();DefaultPartitioner.partition();

  1. 如果record提供了分区号,则使⽤record提供的分区号
  2. 如果record没有提供分区号,则使⽤key的序列化后的值的hash值对分区数量取模
  3. 如果record没有提供分区号,也没有提供key,则使⽤轮询的⽅式分配分区号。

自定义分区器

实现org.apache.kafka.clients.producer.Partitioner接口,并实现其中的partition方法。

在生产者参数中通过配置partitioner.class指定自定义分区器。

/*** 自定义分区器*/
public class MyPartitioner implements Partitioner {@Overridepublic int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {// 此处可以计算分区的数字。// 我们直接指定为2return 2;}@Overridepublic void close() {}@Overridepublic void configure(Map<String, ?> configs) {}
}

5、生产者拦截器

作用

        在发送消息前,或者在执行回调逻辑前,对消息做一些定制化的处理,比如修改消息,打印消息日志等。此外,Producer允许设置多个拦截器从而形成一条拦截器链,Producer将按照指定顺序调用它们。

自定义拦截器

        自定义拦截器实现org.apache.kafka.clients.producer.ProducerInterceptor接口,并实现其中的onSend()、onAcknowledgement()、close()接口。其中:

  • onSend(ProducerRecord):Producer 确保在消息被序列化前调⽤该⽅法。⽤户可以在该⽅法中对消息做任何操作,但最好保证不要修 改消息所属的topic和分区,否则会影响⽬标分区的计算。
  • onAcknowledgement(RecordMetadata, Exception):该⽅法会在消息被应答之前或消息发送失败时调⽤, 并且通常都是在Producer回调逻辑触发之前。
  • close:关闭Interceptor,主要⽤于执⾏⼀些资源清理⼯作。

        在生产者参数中通过配置ProducerConfig.INTERCEPTOR_CLASSES_CONFIG指定自定义拦截器。

public class Interceptor<KEY, VALUE> implements ProducerInterceptor<KEY, VALUE> {private static final Logger LOGGER = LoggerFactory.getLogger(InterceptorTwo.class);@Overridepublic ProducerRecord<KEY, VALUE> onSend(ProducerRecord<KEY, VALUE> record) {System.out.println("拦截器---go");// 此处根据业务需要对相关的数据作修改String topic = record.topic();Integer partition = record.partition();Long timestamp = record.timestamp();KEY key = record.key();VALUE value = record.value();Headers headers = record.headers();// 添加消息头headers.add("interceptor", "interceptor".getBytes());ProducerRecord<KEY, VALUE> newRecord = new ProducerRecord<KEY, VALUE>(topic, partition, timestamp, key, value, headers);return newRecord;}@Overridepublic void onAcknowledgement(RecordMetadata metadata, Exception exception) {System.out.println("拦截器---back");if (exception != null) {// 如果发生异常,记录在日志中LOGGER.error(exception.getMessage());}}@Overridepublic void close() {}@Overridepublic void configure(Map<String, ?> configs) {}
}

6、生产者原理解析

以上内容为个人学习理解,如有问题,欢迎在评论区指出。

部分内容截取自网络,如有侵权,联系作者删除。

http://www.yayakq.cn/news/935381/

相关文章:

  • 游戏微网站模板sem广告投放是做什么的
  • 模仿淘宝详情页做网站wordpress wind
  • 网站建设五合一是指什么wordpress 本地数据库
  • 做便民工具网站更新失败wordpress修改页面
  • 上海建设公司网站一台电脑如何做网站
  • 站酷网首页教做幼儿菜谱菜的网站
  • 网站制作公司挣钱吗网站建站去哪找客户
  • 绍兴网站制作工具菏泽网站开发公司
  • qq网站临时会话关键词排名优化易下拉技术
  • 视频网站 备案网站开发的目的相关书籍
  • 网站建设属于IT城阳网站建设公司
  • 电子商务网站建设的认识公司网站图片传不上去
  • 怎么建立一个博客网站网站建设 文章
  • 建设厅报名网站网站上线 备案
  • 赣州市住房和城乡建设局网站升学历的正规机构官网
  • 勒流网站制作阿里巴巴国际站怎么注册
  • 建设部网站 合同格式hhvm wordpress 空白
  • 一个公司网站多少钱提高基层治理效能
  • 网站设计与网站建设书店360网站建设商家
  • 农业行业网站建设杭州网站搜索排名
  • 百度免费建立网站吗织梦网站更改主页链接
  • 事业单位考试网站orchard可以做哪些网站
  • 山西大川建设有限公司网站好玩的电脑网页游戏
  • 展示型网站设计案例建分类网站得花多少钱
  • 昆山网站制作哪家好哪里可以上传自己的php网站
  • 物流网站建设方案范文做网站平台公司
  • 梧州网站建设厂家网站登录密码保存在哪里设置
  • 建设工程交易中心网站收费标准网站建设图片流程图
  • 哈尔滨优化网站公司在线设计发型
  • 广元市建设银行网站小程序自助建站