当前位置: 首页 > news >正文

和龙建设局网站WordPress 蜘蛛池

和龙建设局网站,WordPress 蜘蛛池,上海外贸平台,自媒体培训文章目录 1、基本处理函数2、定时器和定时服务3、KeyedProcessFunction下演示定时器4、process重获取当前watermark 前面API篇完结,对数据的转换、聚合、窗口等,都是基于DataStream的,称DataStreamAPI,如图: 在Flink…

文章目录

  • 1、基本处理函数
  • 2、定时器和定时服务
  • 3、KeyedProcessFunction下演示定时器
  • 4、process重获取当前watermark

前面API篇完结,对数据的转换、聚合、窗口等,都是基于DataStream的,称DataStreamAPI,如图:
在这里插入图片描述

在Flink底层,可以不定义具体是什么算子,而只是一个统一的处理(process)操作,里面可以自定义逻辑。即图中底层的处理函数层。从下到上,封装越来越重,使用越来越简单。前面用的map等都是Flink封装好的,底层则是process。当现有的算子无法实现需求时,直接用process就行,最底层,最灵活,逻辑你自己开发就行,自定义处理逻辑!!!

1、基本处理函数

处理函数的使用和前面的转换算子一样,基于DataStream对象调用即可:

stream.process(new MyProcessFunction())
  • ProcessFunction不是接口,而是一个抽象类,继承了AbstractRichFunction

  • ProcessFunction的两个泛型:I表示Input,是输入的数据类型;O即Output,是处理完成之后输出的数据类型

  • ProcessFunction抽象类有抽象方法processElement须重写,以及非抽象方法onTimer

public abstract class ProcessFunction<I, O> extends AbstractRichFunction {...public abstract void processElement(I value, Context ctx, Collector<O> out) throws Exception;public void onTimer(long timestamp, OnTimerContext ctx, Collector<O> out) throws Exception {}...}

抽象方法 processElement:

  • 定义处理元素的逻辑
  • 流中的每个元素都会调用一次这个房啊
  • 三个形参分别为:流中数据value自身、上下文对象ctx获取相关信息、收集器out往下游发处理完的数据

非抽象方法onTimer:

  • 定时器触发时调用这个方法
  • 注册定时器即设一个闹钟,onTimer则是闹钟响了以后要做的事
  • onTimer是基于时间线的一个回调方法
  • onTimer的三个形参分别为:时间戳(timestamp),上下文(ctx),以及收集器(out)

最初的DataStream流在经过不同的操作后会得到不同类型的流,比如keyBy后的KeyedStream,window后的WindowedStream。对于这些不同类型的流,其实都可以直接调用.process()方法进行自定义处理,不过process重载,传参是不同类型的ProcessFunction

关于处理函数的分类:

  • 在什么情况下调用process方法,就传入一个什么类型的ProcessFunction
  • 具体类型,在process下Ctrl+P查看传参提示就行,比如DataStream下传ProcessFunction,按键分区后得到KeyedStream传KeyedProcessFunction

2、定时器和定时服务

ProcessFunction的上下文对象Context有timerService()方法,可返回一个TimerService对象。TimerService是Flink实现定时功能的关键。其常用方法:

  • 获取当前的处理时间
long currentProcessingTime();
  • 获取当前的水位线(事件时间)
long currentWatermark();
  • 注册处理时间定时器,当处理时间超过time时触发
void registerProcessingTimeTimer(long time);
  • 注册事件时间定时器,当水位线超过time时触发
void registerEventTimeTimer(long time);
  • 删除触发时间为time的处理时间定时器
void deleteProcessingTimeTimer(long time);
  • 删除触发时间为time的处理时间定时器
void deleteEventTimeTimer(long time);

注意:

  • 只有在KeyedStream中才支持使用TimerService设置定时器的操作
  • TimerService会以键(key)和时间戳为标准,对定时器进行去重,即同样的key和时间戳下,定时器只会留一个,触发时onTimer只被调用一次

3、KeyedProcessFunction下演示定时器

事件时间下的定时器演示:定义一个5s的定时器,在水位线时间到达5s时触发

public class KeyedProcessTimerDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("node01", 9527).map(new WaterSensorMapFunction()).assignTimestampsAndWatermarks(WatermarkStrategy.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3))  //乱序默认的水位生成器.withTimestampAssigner((element, ts) -> element.getTs() * 1000L)  //时间戳提取);KeyedStream<WaterSensor, String> sensorKS = sensorDS.keyBy(sensor -> sensor.getId());// TODO Process:keyedSingleOutputStreamOperator<String> process = sensorKS.process(new KeyedProcessFunction<String, WaterSensor, String>() {/*** 来一条数据调用一次* @param value  每条数据* @param ctx 上下文对象* @param out 采集器* @throws Exception*/@Overridepublic void processElement(WaterSensor value, Context ctx, Collector<String> out) throws Exception {//获取当前数据的keyString currentKey = ctx.getCurrentKey();// 获取定时器服务对象TimerService timerService = ctx.timerService();// 数据中提取出来的事件时间Long currentEventTime = ctx.timestamp(); //注册定时任务,水位线被推到5s时触发timerService.registerEventTimeTimer(5000L);System.out.println("当前key=" + currentKey + ",当前时间=" + currentEventTime + ",注册了一个5s的定时器");/*** 时间进展到定时器注册的时间,调用该方法* @param timestamp 当前时间进展,就是定时器被触发时的时间* @param ctx       上下文* @param out       采集器* @throws Exception*/@Overridepublic void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {super.onTimer(timestamp, ctx, out);String currentKey = ctx.getCurrentKey();System.out.println("key=" + currentKey + "现在时间是" + timestamp + "定时器触发");}});process.print();env.execute();}
}

运行:注意时间戳8s时水位线为8s-3s-1ms < 5s,即当前最大事件时间 - 等待延迟时间 - 1ms,因此未触发,且同一个key,同一个定时时间,只有一个定时器生效:

在这里插入图片描述

看下不同key的效果,注意,水位线是多少和key没关系,s1,9,9进去,直接水位线变成9-3-1ms > 5s,三个定时器都触发

在这里插入图片描述

再用处理时间下的定时器:

public class KeyedProcessTimerDemo {public static void main(String[] args) throws Exception {//...重复代码略,同上KeyedStream<WaterSensor, String> sensorKS = sensorDS.keyBy(sensor -> sensor.getId());// TODO Process:keyedSingleOutputStreamOperator<String> process = sensorKS.process(new KeyedProcessFunction<String, WaterSensor, String>() {@Overridepublic void processElement(WaterSensor value, Context ctx, Collector<String> out) throws Exception {//获取当前数据的keyString currentKey = ctx.getCurrentKey();TimerService timerService = ctx.timerService();//当前数据的处理时间long currentTs = timerService.currentProcessingTime();//定时器不用水位线为标杆,直接处理时间加5stimerService.registerProcessingTimeTimer(currentTs + 5000L);System.out.println("当前key=" + currentKey + ",当前时间=" + currentTs + ",注册了一个5s后的定时器");}//...重复代码略,同上
}

运行:

在这里插入图片描述

4、process重获取当前watermark

还是用上面的socket流,但process逻辑不玩定时器,验证下watermark:

//...重复代码略,同上@Override
public void processElement(WaterSensor value, Context ctx, Collector<String> out) throws Exception {     // 获取 process的 当前watermarklong currentWatermark = timerService.currentWatermark();System.out.println("当前数据=" + value + ",当前watermark=" + currentWatermark);}

此时可以看到,s1,1,1进去,水位线本应为1000ms-3000ms-1ms = -2001,但通过timerService获取到的却是起始值,就那个Long.MIN,直到s1,5进去,才获取到-2001,依次往下,都差一个

在这里插入图片描述

在process重获取当前的watermark,显示的是上一次的watermark,因为process还没接收到这条数据对应的生成的新的watermark。关键点:watermark也是一个数据,要跟着流中对应的那个数据往下游流。

在这里插入图片描述
在这里插入图片描述

上图示意了为什么s5,5获取到的水位线为-2001,因为此时process还没接收到这条数据对应的生成的新的watermark(1999还在process框外,框内只有一个-2001)

http://www.yayakq.cn/news/449904/

相关文章:

  • 电商平台网站开发文档注册网站卖东西
  • 应用网站如何做wordpress asp.net
  • 西安做网站电话四川中天建设有限公司网站
  • ps做网站好看的logo外包网络安全
  • 发果怎么做视频网站制作动画网站模板
  • 网站建设开发综合实训小结python做网站开发
  • 网站建设与管理题库京东网站建设案例论文
  • 专业网站设计是什么购物网站二级页面模板
  • 太原网站建站模板新房地产网站开发
  • 能免费创建网站吗朝阳市做网站
  • 福州开发企业网站本科学历提升
  • 网站备案信息传wordpress自带的会员中心
  • wordpress上传错误500南宁网站排名优化公司
  • 网站建设实习困难废品回收网站怎么做网站优化
  • 九九人才网赣州招聘手机优化大师官方免费下载
  • 山西网站建设营销qq深圳网站建设 东莞网站建设
  • 重庆网站商城微信朋友圈广告投放价格表
  • 中企动力双语网站绥中建设厅网站
  • 如何写网站优化目标南海小程序网站开发
  • 怎么用node做网站阿里云空间如何安装wordpress
  • 网站建设技术文档网站防止恶意注册
  • 网站常用的js效果大秀
  • asp.net 做网站文章是怎么存储的去哪里做网站
  • 注册域名不建设网站网页设计你若安好便是晴天作业
  • 网站内容建设的布局岳阳建设局网站
  • 免费的活动策划网站适合做设计公司的名字
  • php ajax网站开发典型实例 pdf信阳做网站公司汉狮价格
  • 网站建设条款Wordpress news模板
  • wordpress编辑网站的链接是中文小程序制作流程
  • 高校门户网站建设方案接单干活的平台