当前位置: 首页 > news >正文

网站推广方法有几个链接制作

网站推广方法有几个,链接制作,seo优化网站教程,网站负责人核验现场拍摄照片电子件背景 FlinkKafkaConsumer支持当收到某个kafka分区中的某条记录时发送水位线,比如这条特殊的记录代表一个完整记录的结束等,本文就来解析下发送punctuated水位线的源码 punctuated 水位线发送源码解析 1.首先KafkaFetcher中的runFetchLoop方法 public…

背景

FlinkKafkaConsumer支持当收到某个kafka分区中的某条记录时发送水位线,比如这条特殊的记录代表一个完整记录的结束等,本文就来解析下发送punctuated水位线的源码

punctuated 水位线发送源码解析

1.首先KafkaFetcher中的runFetchLoop方法

public void runFetchLoop() throws Exception {try {// kick off the actual Kafka consumerconsumerThread.start();while (running) {// this blocks until we get the next records// it automatically re-throws exceptions encountered in the consumer threadfinal ConsumerRecords<byte[], byte[]> records = handover.pollNext();// get the records for each topic partitionfor (KafkaTopicPartitionState<T, TopicPartition> partition :subscribedPartitionStates()) {List<ConsumerRecord<byte[], byte[]>> partitionRecords =records.records(partition.getKafkaPartitionHandle());
// 算子任务消费的每个分区都调用这个方法partitionConsumerRecordsHandler(partitionRecords, partition);}}} finally {// this signals the consumer thread that no more work is to be doneconsumerThread.shutdown();}

2.查看partitionConsumerRecordsHandler方法处理当前算子任务对应的每个分区的水位线

    protected void emitRecordsWithTimestamps(Queue<T> records,KafkaTopicPartitionState<T, KPH> partitionState,long offset,long kafkaEventTimestamp) {// emit the records, using the checkpoint lock to guarantee// atomicity of record emission and offset state updatesynchronized (checkpointLock) {T record;while ((record = records.poll()) != null) {long timestamp = partitionState.extractTimestamp(record, kafkaEventTimestamp);// 发送kafka记录到下游算子sourceContext.collectWithTimestamp(record, timestamp);// this might emit a watermark, so do it after emitting the record// 处理分区的水位线,记录这个分区的水位线,并在满足条件时更新整个算子任务的水位线partitionState.onEvent(record, timestamp);}partitionState.setOffset(offset);}}```3.处理每个分区的水位线```javapublic void onEvent(T event, long timestamp) {watermarkGenerator.onEvent(event, timestamp, immediateOutput);}public void onEvent(T event, long eventTimestamp, WatermarkOutput output) {final org.apache.flink.streaming.api.watermark.Watermark next =wms.checkAndGetNextWatermark(event, eventTimestamp);if (next != null) {output.emitWatermark(new Watermark(next.getTimestamp()));}}其中 output.emitWatermark(new Watermark(next.getTimestamp()));对应方法如下public void emitWatermark(Watermark watermark) {long timestamp = watermark.getTimestamp();// 更新每个分区对应的水位线,并且更新boolean wasUpdated = state.setWatermark(timestamp);// if it's higher than the max watermark so far we might have to update the// combined watermark 这个表明这个算子任务的最低水位线,也就是算子任务级别的水位线,而不是分区级别的了if (wasUpdated && timestamp > combinedWatermark) {updateCombinedWatermark();}}//每个分区水位线的更新如下public boolean setWatermark(long watermark) {this.idle = false;final boolean updated = watermark > this.watermark;this.watermark = Math.max(watermark, this.watermark);return updated;}       

4.最后是发送算子任务级别的水位线的方法

private void updateCombinedWatermark() {long minimumOverAllOutputs = Long.MAX_VALUE;boolean hasOutputs = false;boolean allIdle = true;for (OutputState outputState : watermarkOutputs) {if (!outputState.isIdle()) {minimumOverAllOutputs = Math.min(minimumOverAllOutputs, outputState.getWatermark());allIdle = false;}hasOutputs = true;}// if we don't have any outputs minimumOverAllOutputs is not valid, it's still// at its initial Long.MAX_VALUE state and we must not emit thatif (!hasOutputs) {return;}if (allIdle) {underlyingOutput.markIdle();} else if (minimumOverAllOutputs > combinedWatermark) {combinedWatermark = minimumOverAllOutputs;underlyingOutput.emitWatermark(new Watermark(minimumOverAllOutputs));}}

你可以看这个流程,是不是意味着如果使用Punctuated的方式,是不支持Idle空闲时间的?–答案是的

http://www.yayakq.cn/news/344681/

相关文章:

  • 自己搭建一个网站全自动营销软件
  • 网站建设工程产品设计怎么写
  • 关于 公司网站建设的通知手机网站图片优化
  • 网站上做旅游卖家要学什么条件中国企业报集团是央企吗
  • html静态网站开发自我介绍银川网站制作报价
  • 服装网站的建设背景二月网站建设南宁
  • 临沂网站建设临沂个人门户网站备案流程
  • 大连网站开发乛薇世界500强企业中国有几家
  • 做企业网站哪家公司好thinkphp 网站管理
  • 建设地区网站建议郑州注册公司代理记账
  • 在线网站生成器如何评价一个网站做的好不好
  • 甘肃建设项目审批权限网站建设银行假网站首页
  • 网站设计的流程是怎样的营销网站首页设计
  • 西安百度竞价托管关键词优化排名seo
  • 焦点网站设计网站建设存在风险
  • 手机网站导航特效电商网站开发设计方法
  • 网站开发团队介绍温州网站制作网站
  • 温州网站优化定制wordpress移动应用
  • 如果网站曾被挂木马大连网站建设佳熙科技
  • 湛江仿站定制模板建站网站建设记账做什么科目
  • 做商城网站的项目背景建设京东物流网站的目标是什么
  • 国外网站模版网站流量显示
  • 企业网站建设公司怎么做html门户网站模板
  • 网站地图格式超酷个人网站欣赏
  • 建立wordpress网站吗建筑人才网与厦门人才网的中级工程师证书的区别
  • 微网站自己怎么做的吗南昌中企动力做的网站怎么样
  • 网站群建设技术方案微信微商城平台
  • 企业网站多少钱建立免费网站
  • 杭州富阳网站建设公司百度一下网页版浏览器百度
  • 哈尔滨网络宣传与网站建设如何在微信上开发小程序