当前位置: 首页 > news >正文

济南门户网站建设新建设电影院 网站

济南门户网站建设,新建设电影院 网站,怎样在网上做广告,上海最大的网络推广公司Flink写入Kafka两阶段提交 端到端的 exactly-once(精准一次) kafka -> Flink -> kafka 1)输入端 输入数据源端的 Kafka 可以对数据进行持久化保存,并可以重置偏移量(offset) 2)Flink内…

Flink写入Kafka两阶段提交

端到端的 exactly-once(精准一次)

kafka -> Flink -> kafka

1)输入端

输入数据源端的 Kafka 可以对数据进行持久化保存,并可以重置偏移量(offset)

2)Flink内部

Flink 内部可以通过检查点机制保证状态和处理结果的 exactly-once 语义

3)输出端

两阶段提交(2PC)

写入 Kafka 的过程实际上是一个两段式的提交:处理完毕得到结果,写入 Kafka 时是基于事务的“预提交”;等到检查点保存完毕,才会提交事务进行“正式提交”

如果中间出现故障,事务进行回滚,预提交就会被放弃;恢复状态之后,也只能恢复所有已经确认提交的操作。

必须的配置

1)必须启用检查点

2)指定 KafkaSink 的发送级别为 DeliveryGuarantee.EXACTLY_ONCE

3)配置 Kafka 读取数据的消费者的隔离级别【默认kafka消费者隔离级别是读未提交,2PC第一阶段预提交数据也会被读到,下游消费者需要设置为读已提交

4)事务超时配置

【配置的事务超时时间 transaction.timeout.ms 默认是1小时,而Kafka 集群配置的事务最大超时时间 transaction.max.timeout.ms 默认是15 分钟。在检查点保存时间很长时,有可能出现 Kafka 已经认为事务超时了,丢弃了预提交的数据;而Sink任务认为还可以继续等待。如果接下来检查点保存成功,发生故障后回滚到这个检查点的状态,这部分数据就被真正丢掉了。因此checkpoint 间隔 < 事务超时时间 < max的15分钟

代码实战

kafka -> Flink -> kafka【Flink处理kafka来源数据再输出到kafka】

public class KafkaEOSDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 【1】、启用检查点,设置为精准一次env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);CheckpointConfig checkpointConfig = env.getCheckpointConfig();checkpointConfig.setCheckpointStorage("hdfs://hadoop102:8020/chk");checkpointConfig.setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);// 2.读取 kafkaKafkaSource<String> kafkaSource = KafkaSource.<String>builder().setBootstrapServers("hadoop102:9092").setGroupId("default").setTopics("topic_1").setValueOnlyDeserializer(new SimpleStringSchema()).setStartingOffsets(OffsetsInitializer.latest()).build();DataStreamSource<String> kafkasource = env.fromSource(kafkaSource,WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(3)), "kafkasource");/*3.写出到 Kafka精准一次 写入 Kafka,需要满足以下条件,【缺一不可】1、开启 checkpoint2、sink 设置保证级别为 精准一次3、sink 设置事务前缀4、sink 设置事务超时时间: checkpoint 间隔 < 事务超时时间 < max的15分钟*/KafkaSink<String> kafkaSink = KafkaSink.<String>builder()// 指定 kafka 的地址和端口.setBootstrapServers("hadoop102:9092")// 指定序列化器:指定 Topic 名称、具体的序列化.setRecordSerializer(KafkaRecordSerializationSchema.<String>builder().setTopic("ws").setValueSerializationSchema(new SimpleStringSchema()).build())// 【3.1】 精准一次,开启 2pc.setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)// 【3.2】 精准一次,必须设置 事务的前缀.setTransactionalIdPrefix("li-")// 【3.3】 设置事务超时时间.setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 10 * 60 * 1000 + "").build();kafkasource.sinkTo(kafkaSink);env.execute();}
}

后续读取“ws”这个 topic 的消费者,要设置事务的隔离级别为“读已提交”

public class KafkaEOSConsumer {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 消费 在前面使用【两阶段提交】写入的 TopicKafkaSource<String> kafkaSource = KafkaSource.<String>builder().setBootstrapServers("hadoop102:9092").setGroupId("default").setTopics("ws").setValueOnlyDeserializer(new SimpleStringSchema()).setStartingOffsets(OffsetsInitializer.latest())// 作为 下游的消费者,要设置事务的隔离级别为 【读已提交】.setProperty(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed").build();env.fromSource(kafkaSource,WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(3)), "kafkasource").print();env.execute();}
}

处理程序以及消费程序如上设置才能真正实现端到端精准一次的保证。

http://www.yayakq.cn/news/413781/

相关文章:

  • 建一个分类信息网站页面设计美观
  • 网站建设技术服务费怎么入账搭建起什么样的平台
  • 网站图片加alt创意网站建设话术
  • 2015百度推广网站遭到攻击什么是seo搜索引擎优化
  • 怎么建一个购物网站成都需要网站制作
  • 网站的域名可以修改吗wordpress如何添加一级目录
  • 外贸类网站模板分类信息发布网站模板
  • 网站建设运营企划案东莞网站建设平台
  • 怎么做无损mp3下载网站温州建设网站哪家好
  • 辽宁省建设工程招投标协会网站做网站图片素材
  • 汕头网站建设和运营海口市公司网站建设
  • wordpress简单易懂的网站哈尔滨企业网站建设公司
  • 建设高端网站的公司行业电子商务网站有哪些
  • 网站开发 报价单外贸seo网站建设
  • 网站建设工作室wordpress不能添加用户
  • 社区类网站有哪些中国互联网排名
  • 郑州做网站推广地址广州的一起做网站
  • php做网站用html做吗定制软件开发流程图
  • 网站上传工具有什么买域名做网站
  • 免费自助建设网站软件设计公司排名
  • 广州木马网站建设公司怎么样建设银行在网站上开通短信提醒
  • 怎样上传网站到空间中搜seo
  • 淮安市建设工程安全监督站网站网络营销推广方法公司推荐
  • 长宁区网站建设网页制电子商务网站建设实训目的
  • 遵义高端网站建设通辽做家教的网站
  • 制作网站的心得体会wordpress浮窗
  • 自动网站建设网站开发 北京外包公司
  • 杨凌网站建设推广网站建设人员招聘
  • 河北网站建设价格低网站上漂亮的甘特图是怎么做的
  • 网站设计师培训学校北京故宫网站建设分析