当前位置: 首页 > news >正文

椒江做网站的公司网站结构分类

椒江做网站的公司,网站结构分类,上弦 网站建设,安徽美丽乡村建设网站版本 flink 1.16.0kafka 2.3 流程描述: flink利用KafkaSource,读取kafka的数据,然后经过一系列的处理,通过KafkaSink,采用 EXACTLY_ONCE 的模式,将处理后的数据再写入到新的topic中。 问题描述&#xff1…

版本

  • flink 1.16.0
  • kafka 2.3

流程描述:

flink利用KafkaSource,读取kafka的数据,然后经过一系列的处理,通过KafkaSink,采用 EXACTLY_ONCE 的模式,将处理后的数据再写入到新的topic中。

问题描述:

数据写入到新的topic后,过上几分钟的时间,利用工具offset explorer观察对应topic的数据量,显示为0。
刚写入没多久的数据消失了 ???大写的懵 ???

定位问题:

  • 首先查看kafka的日志:

在这里插入图片描述

  • 阅读flink 官方文档 kafkaSink的介绍:

DeliveryGuarantee.EXACTLY_ONCE: In this mode, the KafkaSink will write
all messages in a Kafka transaction that will be committed to Kafka on
a checkpoint. Thus, if the consumer reads only committed data (see
Kafka consumer config isolation.level), no duplicates will be seen in
case of a Flink restart. However, this delays record visibility
effectively until a checkpoint is written, so adjust the checkpoint
duration accordingly. Please ensure that you use unique
transactionalIdPrefix across your applications running on the same
Kafka cluster such that multiple running jobs do not interfere in
their transactions! Additionally, it is highly recommended to tweak
Kafka transaction timeout (see Kafka producer transaction.timeout.ms)»
maximum checkpoint duration + maximum restart duration or data loss
may happen when Kafka expires an uncommitted transaction.

  • 翻译过来的意思大概就是:

在EXACTLY_ONCE这种模式下,KafkaSink在事务中写入所有的消息,这些消息在checkpoint上提交给kafka。因此,在flink重启的情况下,如果消费者值读取提交的数据,不会看到重复的数据。缺点就是延迟记录可见性,知道写入检查点为止。强烈建议调整kafka的事务超时时间(见Kafka producer transaction.timeout.ms),超时时间要大于【最大检查点持续时间+最大重启持续时间】,否则当Kafka过期未提交的事务时可能会发生数据丢失。

  • 阅读kafka的官网介绍:

Producer Configs:
transaction.timeout.ms:60000(默认值)

参数描述:
The maximum amount of time in ms that the transaction coordinator will
wait for a transaction status update from the producer before
proactively aborting the ongoing transaction.If this value is larger
than the transaction.max.timeout.ms setting in the broker, the request
will fail with a InvalidTransactionTimeout error.

Broker Configs
transaction.max.timeout.ms:900000(默认值)

参数描述:
The maximum allowed timeout for transactions. If a client’s requested
transaction time exceed this, then the broker will return an error in
InitProducerIdRequest. This prevents a client from too large of a
timeout, which can stall consumers reading from topics included in the
transaction.

  • 最后排查
    在flink中设置的超时时间违反了kafka producer对应的参数规定。

解决问题

在kafkaSink的配置中,加入

Properties properties = new Properties();
// 根据上面的介绍自己计算这边的超时时间,满足条件即可
properties.setProperty("transaction.timeout.ms","900000");KafkaSink<String> sink = KafkaSink.<String>builder().setBootstrapServers(bootstrapServers).setRecordSerializer(KafkaRecordSerializationSchema.<String>builder().setTopic(sinkTopic).setValueSerializationSchema(new SimpleStringSchema()).build()).setKafkaProducerConfig(properties).setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE).setTransactionalIdPrefix("flink-xhaodream-").build();

总结

在使用现有框架和工具的时候,往往只是懂得怎么用,具体底层的逻辑、原理,了解的很少。往往只有真正理解了原理,遇到了问题,才会更快、更准确的定位问题、解决问题。

http://www.yayakq.cn/news/112913/

相关文章:

  • 做网站怎么加视频中职网络营销专业
  • 显示网站正在维护是什么情况绿色主色调的网站
  • 宁波网站制作好公司零基础网页制作培训
  • 普洱市建设局网站wordpress 超级搜索
  • 学建站wordpress眉山 网站开发
  • 聊城市城乡建设部网站查询做出口网站
  • 网页建站网站百度推广搜索排名
  • 做网站有现成的程序重庆排名优化整站优化
  • 河南工程建设信息网站手机优化网站建设
  • 企业网站建设大概多少钱网站 字体
  • 网站备案核验点商盈网站建设
  • 上线了免费建网站代刷网站怎么做
  • 盐城公司做网站公司网站要使用我个人的信息备案
  • 网站建设公司海外东莞网站建设方案表
  • 网站推广托管dede模板网站如何搭建
  • wordpress的标签有什么用广东seo推广
  • php网站打开速度慢网站是怎么做的
  • 起飞页做网站步骤网站开发公司多少钱
  • 山东省安全双体系建设网站地址wordpress怎么添加邮箱
  • 建设网站的新闻做中医药网站有前景吗
  • 房山建站公司vi设计收费标准
  • 潍坊哪家网站制作公司好网站制作 东莞
  • 淘宝联盟登记新网站网站架构搭建
  • 装饰设计的变形手法有哪些seo编辑是干什么的
  • 企业内网搭建要多少钱厦门seo公司网站
  • 上海大型网站ps兼职做网站
  • 网站监控的软件怎么做北京建网站
  • 网站建设接私活平台长沙知名网站
  • 富阳做网站公司网站线上推广方式
  • 做美图 网站网站开发电脑配置推荐