当前位置: 首页 > news >正文

婚礼策划网站建设如何做自己的淘宝网站

婚礼策划网站建设,如何做自己的淘宝网站,数字化营销与传统营销的区别,织梦发布网站Kafka 位移提交自动提交手动提交Consumer 的消费位移 : 记录 Consumer 下一条消息的消费位移 如 : Consumer 已消费 5 条消息 (位移: 0 - 4) , 此时 Consumer 位移 5 : 指向下一条消息的位移 提交位移 (Committing Offsets) : Consumer 向 Kafka 汇报位移数据 Consumer 能同…

Kafka 位移提交

  • 自动提交
  • 手动提交

Consumer 的消费位移 : 记录 Consumer 下一条消息的消费位移

  • 如 : Consumer 已消费 5 条消息 (位移: 0 - 4) , 此时 Consumer 位移 = 5 : 指向下一条消息的位移

提交位移 (Committing Offsets) : Consumer 向 Kafka 汇报位移数据

  • Consumer 能同时消费多个分区的数据,Consumer 要维护每个分区提交各自的位移数据
  • 当 Consumer 重启后,能从之前位移继续消费,避免重新消费整个消息

Consumer API 的提交位移的方法 :

  • 从用户分 : 自动提交 , 手动提交
  • 从 Consumer 分 : 同步提交 , 异步提交
  • 自动提交 : Consumer 在后台提交位移,用户无需操作
  • 手动提交 : 用户提交位移,Consumer 不管
提交位移自动提交配置enable.auto.commit = true
手动提交同步提交KafkaConsumer.commitSync
异步提交KafkaConsumer.commitAsync
细化位移提交commitSync(Map<TopicPartition, OffsetAndMetadata>)
commitAsync(Map<TopicPartition, OffsetAndMetadata>)

自动提交

Consumer 参数 :

  • enable.auto.commit = true : 自动提交位移
  • auto.commit.interval.ms (默认值是 5 秒) : Kafka 每 5 秒自动提交一次位移

自动提交位移 :

  • 可能出现重复消费
  • 例子:Consumer 每 5 秒自动提交一次位移。提交位移 3 秒后出现 Rebalance。在 Rebalance 后,所有 Consumer 从上一次提交的位移处继续消费,但该位移已经是 3 秒前的位移数据,在 Rebalance 发生前 3 秒消费的所有数据都会重新消费

设置自动提交位移 :

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "2000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("foo", "bar"));while (true) {ConsumerRecords<String, String> records = consumer.poll(100);for (ConsumerRecord<String, String> record : records) {System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());}
}

手动提交

enable.auto.commit = false : 手动提交位移

手动提交位移 :

  • 好处 : 更灵活,能把控位移提交的时机和频率
  • 缺点 : 用 commitSync() 时,Consumer 处于阻塞状态,直到 Broker 返回提交结果,影响整个应用程序的 TPS

commitSync() :

while (true) {// 返回最新位移。一直等位移提交后才返回 (同步操作)ConsumerRecords<String, String> records =consumer.poll(Duration.ofSeconds(1));process(records); // 处理消息try {consumer.commitSync();} catch (CommitFailedException e) {handle(e); // 处理提交失败异常}
}

commitAsync() :

  • 异步操作,会立即返回,不会阻塞,不影响 Consumer 的 TPS
  • 用回调函数 (callback) 实现提交后的逻辑,如 : 记录日志或处理异常
  • 无法自动失败重试
while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));process(records); // 处理消息consumer.commitAsync((offsets, exception) -> {if (exception != null)handle(exception);});
}

异步无阻塞式 :

  • 用 commitSync 自动重试避免瞬时错误,如 : 网络的瞬时抖动,Broker 端 GC
  • 异步处理,不影响 TPS
// 实现异步无阻塞式的位移管理,保证 Consumer 位移的正确性
try {while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));process(records); // 处理消息commitAysnc(); // 使用异步提交规避阻塞}
} catch (Exception e) {handle(e); // 处理异常
} finally {try {consumer.commitSync(); // 最后一次提交使用同步阻塞式提交} finally {consumer.close();}
}

更精细的位移管理 :

  • commitSync(Map<TopicPartition, OffsetAndMetadata>)
  • commitAsync(Map<TopicPartition, OffsetAndMetadata>)
  • 参数 : Map 对象 : 键 = TopicPartition (消费的分区),值 = OffsetAndMetadata 对象 (位移数据)
// 创建 Map 对象,保存 Consumer 消费要提交的分区位移
private Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
int count = 0;
//...
while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));for (ConsumerRecord<String, String> record: records) {process(record);  // 处理消息// 构造要提交的位移值offsets.put(new TopicPartition(record.topic(), record.partition()),new OffsetAndMetadata(record.offset() + 1);// 每 100 条消息提交一次位移if(count % 100 == 0{consumer.commitAsync(offsets, null); // 回调处理逻辑是 null}count++;}
}
http://www.yayakq.cn/news/879607/

相关文章:

  • 哪个网站是做包装材料珍珠棉包管国家建筑工程网查询证书
  • 被骗去国外做网站网站推广东莞网络推广网络推广
  • 佛山用户网站建站品牌设计公司深圳
  • 公司做网站需要哪些资料企业管理平台软件
  • 南阳商都网站做网站大数据获客
  • 一般网站开发公司新办公司流程及资料
  • 深圳app客户端做网站聚名网app下载
  • 湘潭网站建设公司有哪些软件开发工程师需要什么证书
  • 广州做网站厉害的公司长春标准网站建设
  • 南宁网站建设公司业绩wordpress打折插件
  • 网站开发app开发培训东莞专业建站公司费用
  • 北京网站建设 网站制作php 网站开发案例教程
  • 外贸小家电网站推广品牌营销增长好牌子推荐
  • 医院手机网站直播带货平台
  • 做网站要花多少钱牡丹江47号公告
  • 怎么向谷歌提交网站制作网站合同需注意
  • 专做化妆品网站如何打开wordpress
  • 镇江建站建设银行面试经验网站
  • 南京市公共工程建设 中心网站星空无限传媒在线观看电视剧赘婿
  • 如何修改网站icowordpress没有页面模板
  • wordpress积分系统wordpress seo tdk
  • 苏州网站建设网站如何做医美机构网站观察分析
  • 陕西 工程建设 公司 网站好的做网站的
  • 成都网站设计报告书单人给一个公司做网站费用
  • 网站备案是每年一次吗湖南省城乡和住房建设厅官网
  • 加强企业网站建设作用已经买了域名怎么做网站
  • 湖南省公司注册网站aso推广
  • 怎么建一个购物网站做中英文网站公司
  • 自己电脑上做网站怎么使用源码徐州网站建设公司官网
  • 金龙网站哪里建设的网站关键词密度怎么计算的