当前位置: 首页 > news >正文

深圳网站制作就找兴田德润零基础wordpress

深圳网站制作就找兴田德润,零基础wordpress,个人可以做网站导航的网站吗,旅游网站建设实施方案在Flink中状态主要分为三种: Operator State(算子状态)Keyed State(键控状态)Broadcast State(广播状态) 这里简单介绍一下Operator State的使用,说到使用State就必然要使用到Flink的容错机制也就是Checkpoint.具体内容见代码注解 数据源 这里选用Socket作为Source输入,便于…

在Flink中状态主要分为三种:

  • Operator State(算子状态)
  • Keyed State(键控状态)
  • Broadcast State(广播状态)

这里简单介绍一下Operator State的使用,说到使用State就必然要使用到Flink的容错机制也就是Checkpoint.具体内容见代码注解

  • 数据源
    这里选用Socket作为Source输入,便于测试
    ➜  ~ nc -lk 8888
    a
    b
    c
    k
    k
    k
    
  • 状态算子代码
    /**
    * @Description TODO 自定义状态MapFunc
    **/
    // 状态算子必须要实现对应的算子接口和CheckpointFunction接口
    class StateMapFunc implements MapFunction<String, String>, CheckpointedFunction{private ListState<String> strListState;/*** @Param o* @return String* @Description TODO map方法的正常处理逻辑**/@Overridepublic String map(String s) throws Exception {// 模拟Task失败if (s.equals("k") && RandomUtils.nextInt(0, 5) == 3) {throw new Exception("Task 异常");}// 将数据添加到状态存储器中strListState.add(s);Iterable<String> strings = strListState.get();StringBuilder builder = new StringBuilder();for (String string : strings) {builder.append(string);}return builder.toString();}/*** @Param functionSnapshotContext* @return void* @Description TODO 系统对状态数据做快照(持久化)会调用此方法, 用户使用此方法在持久化前对状态数据可以做一些操控**/@Overridepublic void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {System.out.println("快照生成, checkpointId: " + functionSnapshotContext.getCheckpointId());}/*** @Param functionInitializationContext* @return void* @Description TODO 算子任务在启动前会调用此方法,未用户状态数据进行初始化**/@Overridepublic void initializeState(FunctionInitializationContext functionInitializationContext) throws Exception {// 获取算子状态存储器OperatorStateStore operatorStateStore = functionInitializationContext.getOperatorStateStore();/*** ListStateDescriptor状态描述* 参数1:一个自定义名称* 参数2:存储的数据类型**/ListStateDescriptor<String> stateDescriptor = new ListStateDescriptor<>("demo", String.class);/*** 算子状态存储器, 只提供ListSate的形式(和Java中的List不是一回事)来存储状态数据* getListSate方法,会在Task失败后,task自动重启时,会帮助用户加载最近一次的快照数据,如果是job重启则不会加载**/strListState = operatorStateStore.getListState(stateDescriptor);}
    }
    
    要注意代码注释中的内容,getListState只作用于Task的自动重启,如果是整个Job重启时不生效的,如果是想Job重启后从重启前的State获取数据需要在Job提交时就指定checkpoint镜像文件.
  • 业务代码
      public class FlinkOperatorState {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 设置并行度1env.setParallelism(1);// 开启Checkpoint, 8秒一个周期并开启一次性语义env.enableCheckpointing(8000, CheckpointingMode.EXACTLY_ONCE);// 指定checkpoint持久化路径env.getCheckpointConfig().setCheckpointStorage("file:///Users/xxx/data/testData/checkpoint");// 开启Task级别故障自动failover,通过fixedDelayRestart设置Task重启上限和重启间隔,这里设置的重启次数为2次,一旦Task重启次数超过这个次数,整个job也会停止env.setRestartStrategy(RestartStrategies.fixedDelayRestart(2, Time.seconds(5)));// 获取Socket数据源DataStreamSource<String> socketSource = env.socketTextStream("localhost", 8888);// 将自定义的StateOperator传入SingleOutputStreamOperator<String> map = socketSource.map(new StateMapFunc());// 打印结果map.print();env.execute("Operator State");}
    }
    

具体的代码模板和API的介绍大概就这些内容,具体实践要根据业务逻辑而定.

http://www.yayakq.cn/news/375351/

相关文章:

  • 酒店预订网站建设网站开发要用什么工具软件
  • 网站栏目做树形结构图建设银行网站怎么能转账
  • 网站活动怎么做app开发公司tianpinkeji
  • 西安做网站的公司有网站的相对路径
  • 区块链资讯网站建设嵊州做网站
  • 计算机网络服务宁波正规seo企业优化
  • 外国人企业网站怎么做济南做网站互联网公司排名
  • 网站整站优化推广方案营销型网站四大功能
  • 中国制造网网站建设的优势网站后台怎么做外部链接
  • 网站怎样做网银支付电商怎么做?如何从零开始学做电商赚钱
  • 设计网站导航大全网站上线模板
  • 商业网站设计如何建造网站链接
  • 普宁旅游网站设计方案seo双标题软件
  • 如何做百度站长绑定网站网站实名认证功能怎么做
  • 做曖网站嘉兴网站网站建设
  • 网站建设加推广话术网站开发怎么根据设计稿的尺寸算图片高度
  • 免费域名网站重庆网上房地产网站
  • 莆田中建建设发展有限公司网站建设项目查询官网
  • 建立网站需要哪几个国企门户网站建设情况汇报
  • 广州网站建设开发正能量无遮掩图片全屏
  • 西乡网站建设百度最新版app下载安装
  • ftp两个网站子域名的绑定制作ppt的软件叫什么
  • php网站本地搭建网站开发证书
  • 小江高端网站建设公司网站被黑有不良信息 做笔录
  • 汕头网站建设推荐站内关键词排名优化软件
  • 网站首页推荐站长网站工具
  • 石家庄建设银行河北分行招聘网站做网站上海
  • wordpress网站文章形式做网站需要多钱
  • 佛山智唯网站建设京东商城网站风格
  • 网站开发的选题依据百度推广需要多少钱