当前位置: 首页 > news >正文

找个为公司做网站的专业外贸网站建设公司价格

找个为公司做网站的,专业外贸网站建设公司价格,网站vi设计公司,郑州网站建设培训班背景 使用RichFlatMapFunction可以带状态来决定如何对数据流进行转换,而且这种用法非常常见,根据之前遇到过的某个key的状态来决定再次遇到同样的key时要如何进行数据转换,本文就来简单举个例子说明下RichFlatMapFunction的使用方法 RichFl…

背景

使用RichFlatMapFunction可以带状态来决定如何对数据流进行转换,而且这种用法非常常见,根据之前遇到过的某个key的状态来决定再次遇到同样的key时要如何进行数据转换,本文就来简单举个例子说明下RichFlatMapFunction的使用方法

RichFlatMapFunction使用示例

下面的例子的输入是不用name下的count数量值,当本次name的数量和前一次name的数量相差超过配置的阈值100时,打印出来一条告警日志,详细代码如下:

package wikiedits.func.state;import java.util.Objects;import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.util.Collector;/*** Tuple2<String, Integer> 是输入的数据类型 String 是监控到异常值后的输出数据类型*/
public class MyRichFlatMapFunction extends RichFlatMapFunction<Tuple2<String, Integer>, String> {// 键值分区状态,对应每个name一个值ValueState<StateEntity> nameState;@Overridepublic void open(Configuration parameters) throws Exception {// 创建一个键值分区状态ValueStateDescriptor<StateEntity> state = new ValueStateDescriptor<>("nameState", StateEntity.class);nameState = getRuntimeContext().getState(state);}@Overridepublic void flatMap(Tuple2<String, Integer> input, Collector<String> collector) throws Exception {// 判断状态值是否为空(状态默认值是空)if (Objects.isNull(nameState.value())) {StateEntity sFalg = new StateEntity(input.f0, input.f1);nameState.update(sFalg);return;}// 和上一次的状态值比较StateEntity value = nameState.value();if (Math.abs(value.count - input.f1) > 100) {collector.collect(new String("监控到异常值,名称: " + input.f0 + " 上次的值:" + value + " 本次的值:" + input));}value.setName(input.f0);value.setCount(input.f1);// 更新状态值nameState.update(value);}}package wikiedits.func.state;import java.text.SimpleDateFormat;
import java.util.Date;import org.apache.commons.lang3.RandomUtils;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;public class RichFlatMapFunctionTest {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 设置数据源,一共三个元素DataStream<Tuple2<String, Integer>> dataStream = env.addSource(new SourceFunction<Tuple2<String, Integer>>() {@Overridepublic void run(SourceContext<Tuple2<String, Integer>> ctx) throws Exception {for (int i = 1; i < Integer.MAX_VALUE; i++) {// 只有XXX,YYY,ZZZ三种nameString name = (0 == i % 3) ? "XXX" : ((i % 3 == 1) ? "YYY" : "ZZZ");int count = RandomUtils.nextInt(0, 1000);// 使用当前时间作为时间戳long timeStamp = System.currentTimeMillis();// 发射一个元素,并且戴上了时间戳ctx.collectWithTimestamp(new Tuple2<String, Integer>(name, count), timeStamp);// 每发射一次就延时1秒Thread.sleep(5000);}}@Overridepublic void cancel() {}});dataStream.keyBy((f) -> {return f.f0;}).flatMap(new MyRichFlatMapFunction()).print();env.execute();}public static String time(long timeStamp) {return new SimpleDateFormat("yyyy-MM-dd hh:mm:ss").format(new Date(timeStamp));}}

结果
在这里插入图片描述

http://www.yayakq.cn/news/375478/

相关文章:

  • 诸城哪有做公司网站的仿uehtml WordPress
  • 如何创建商业网站网站建设设计风格描述
  • 外链网站推荐网站怎么对接微信支付宝
  • 网页设计和网站建设毕业设计网站建设多少钱宋柯
  • 物流企业网站建设规划书wordpress主题怎么使用教程
  • 网站百度不到验证码怎么办做文学网站算不算开公司
  • 宁波做网站优化多少钱wordpress 综合主题
  • 彩票网站什么做成都网站建设电话
  • 湖州城市投资建设集团网站宁波企业建网站报价
  • 北京华诚传媒有限公司官方网站苏州响应式网站建设
  • 网站建设的pest分析网络优化内容有哪些
  • 旅游公司的网站怎么做广东队对阵广州队
  • 丹阳网站建设哪家好深圳外贸网站建设公司价格
  • 音乐网站开发需求文档模板免费做爰网站
  • 北京网站设计网站公司网店代运营公司哪家好
  • 望城门户网站广告机自建站模板
  • 白石洲附近做网站公司房地产营销策略有哪些
  • 上海网站搜索排名提升学历的好处有哪些
  • 搭建企业网站宽带多大怎么建个公司网站
  • 做网站市场分析辽宁省建设厅特种工查询网站
  • 河南智慧团建网站登录做一个浏览器需要多少钱
  • 毕业设计网站成品酒店宣传软文
  • 个人网站建设计划表品牌网站源码asp
  • 体育馆做网站公司游戏推广员怎么做
  • 网站flash引导页下载网站建设规划方案ppt模板
  • 安徽网站建设哪家有网站内容与目录结构图
  • 济南网站哪家做的好无线网络网站dns解析失败
  • 成都网站建设好多科技做海报的专业网站
  • 哪里的网络推广培训好如何对网站做进一步优化
  • 单纯python能完成网站开发吗wordpress音乐加载慢