当前位置: 首页 > news >正文

xp做网站通辽做网站制作

xp做网站,通辽做网站制作,怎么用上线了做网站,轻抖云网络营销推广背景 FlinkKafkaConsumer支持当收到某个kafka分区中的某条记录时发送水位线,比如这条特殊的记录代表一个完整记录的结束等,本文就来解析下发送punctuated水位线的源码 punctuated 水位线发送源码解析 1.首先KafkaFetcher中的runFetchLoop方法 public…

背景

FlinkKafkaConsumer支持当收到某个kafka分区中的某条记录时发送水位线,比如这条特殊的记录代表一个完整记录的结束等,本文就来解析下发送punctuated水位线的源码

punctuated 水位线发送源码解析

1.首先KafkaFetcher中的runFetchLoop方法

public void runFetchLoop() throws Exception {try {// kick off the actual Kafka consumerconsumerThread.start();while (running) {// this blocks until we get the next records// it automatically re-throws exceptions encountered in the consumer threadfinal ConsumerRecords<byte[], byte[]> records = handover.pollNext();// get the records for each topic partitionfor (KafkaTopicPartitionState<T, TopicPartition> partition :subscribedPartitionStates()) {List<ConsumerRecord<byte[], byte[]>> partitionRecords =records.records(partition.getKafkaPartitionHandle());
// 算子任务消费的每个分区都调用这个方法partitionConsumerRecordsHandler(partitionRecords, partition);}}} finally {// this signals the consumer thread that no more work is to be doneconsumerThread.shutdown();}

2.查看partitionConsumerRecordsHandler方法处理当前算子任务对应的每个分区的水位线

    protected void emitRecordsWithTimestamps(Queue<T> records,KafkaTopicPartitionState<T, KPH> partitionState,long offset,long kafkaEventTimestamp) {// emit the records, using the checkpoint lock to guarantee// atomicity of record emission and offset state updatesynchronized (checkpointLock) {T record;while ((record = records.poll()) != null) {long timestamp = partitionState.extractTimestamp(record, kafkaEventTimestamp);// 发送kafka记录到下游算子sourceContext.collectWithTimestamp(record, timestamp);// this might emit a watermark, so do it after emitting the record// 处理分区的水位线,记录这个分区的水位线,并在满足条件时更新整个算子任务的水位线partitionState.onEvent(record, timestamp);}partitionState.setOffset(offset);}}```3.处理每个分区的水位线```javapublic void onEvent(T event, long timestamp) {watermarkGenerator.onEvent(event, timestamp, immediateOutput);}public void onEvent(T event, long eventTimestamp, WatermarkOutput output) {final org.apache.flink.streaming.api.watermark.Watermark next =wms.checkAndGetNextWatermark(event, eventTimestamp);if (next != null) {output.emitWatermark(new Watermark(next.getTimestamp()));}}其中 output.emitWatermark(new Watermark(next.getTimestamp()));对应方法如下public void emitWatermark(Watermark watermark) {long timestamp = watermark.getTimestamp();// 更新每个分区对应的水位线,并且更新boolean wasUpdated = state.setWatermark(timestamp);// if it's higher than the max watermark so far we might have to update the// combined watermark 这个表明这个算子任务的最低水位线,也就是算子任务级别的水位线,而不是分区级别的了if (wasUpdated && timestamp > combinedWatermark) {updateCombinedWatermark();}}//每个分区水位线的更新如下public boolean setWatermark(long watermark) {this.idle = false;final boolean updated = watermark > this.watermark;this.watermark = Math.max(watermark, this.watermark);return updated;}       

4.最后是发送算子任务级别的水位线的方法

private void updateCombinedWatermark() {long minimumOverAllOutputs = Long.MAX_VALUE;boolean hasOutputs = false;boolean allIdle = true;for (OutputState outputState : watermarkOutputs) {if (!outputState.isIdle()) {minimumOverAllOutputs = Math.min(minimumOverAllOutputs, outputState.getWatermark());allIdle = false;}hasOutputs = true;}// if we don't have any outputs minimumOverAllOutputs is not valid, it's still// at its initial Long.MAX_VALUE state and we must not emit thatif (!hasOutputs) {return;}if (allIdle) {underlyingOutput.markIdle();} else if (minimumOverAllOutputs > combinedWatermark) {combinedWatermark = minimumOverAllOutputs;underlyingOutput.emitWatermark(new Watermark(minimumOverAllOutputs));}}

你可以看这个流程,是不是意味着如果使用Punctuated的方式,是不支持Idle空闲时间的?–答案是的

http://www.yayakq.cn/news/907806/

相关文章:

  • 网站开发流程ppt包就业的培训机构
  • 搬瓦工如何搭建做网站电商网站开发人员人数
  • seo站群优化技术电商培训网站
  • 数据分析案例网站如何制作公司宣传片
  • 丘受网站谁做的网球吧如何打死网站
  • 广州哪个公司做网站用vs网站开发
  • 获取网站访客信息桓台网站建设
  • app软件开发就是网站开发吗集客营销软件官方网站
  • 陇西做网站的广告店软文营销广告案例
  • 平面设计网站首页深圳在线问诊平台
  • 英语培训东莞网站建设资源分享网站怎么做
  • 济南网络安全公司怎么做优化网站排名
  • 门户网站php源码运城购物网站开发设计
  • 医疗企业网站模板python培训机构
  • 正规网站设计制作公司国家重点项目建设库网站
  • 电商网站搭建网站开发岗位简介
  • 酒水食品做的好网站济宁哪里做网站最便宜
  • 郑州市住房和城乡建设厅网站廊坊百度推广排名优化
  • 网站建设wordpress比较太原网站开发哪家好
  • 信用 网站 建设方案建立网站心得
  • 内网站做映射8个页面的网站怎么做
  • 报网站开发培训班佛山网站推广软件
  • 更改wordpress登陆页面logo广州四楚seo顾问
  • 化工网站开发百度搜索大数据怎么查
  • 苏州吴中网站建设做网站意义和目的
  • 广州建站平台哪家好长沙景点门票价格表
  • 网站建设网络拓扑ios开发教程
  • 织梦dedecms网站简略标题shorttitle的使用方法网络技术专业学什么
  • 网站更换ico文件位置怎么在广西建设厅网站注销c证
  • 织梦如何新建网站wordpress+mip手机主题