当前位置: 首页 > news >正文

山姆超市网上购物网温州seo网络推广代理价格

山姆超市网上购物网,温州seo网络推广代理价格,南京科技网站设计多少钱,wordpress更改logo宽度Flink checkpoint Checkpoint是Flink实现容错机制最核心的功能,能够根据配置周期性地基于Stream中各个Operator的状态来生成Snapshot,从而将这些状态数据定期持久化存储下来,从而将这些状态数据定期持久化存储下来,当Flink程序一…

Flink checkpoint

Checkpoint是Flink实现容错机制最核心的功能,能够根据配置周期性地基于Stream中各个Operator的状态来生成Snapshot,从而将这些状态数据定期持久化存储下来,从而将这些状态数据定期持久化存储下来,当Flink程序一旦意外崩溃时,重新运行程序时可以有选择地从这些Snapshot进行恢复,从而修正因为故障带来的程序数据状态中断。

  1. Checkpoint指定触发生成时间间隔后,每当需要触发Checkpoint时,会向Flink程序运行时的多个分布式的Stream Source中插入一个Barrier标记
  2. 当一个Operator接收到一个Barrier时,它会暂停处理Steam中新接收到的数据记录
  3. 每个Stream中都会存在对应的Barrier,该Operator要等到所有的输入Stream中的Barrier都到达。当所有Stream中的Barrier都已经到达该Operator,这时所有的Barrier在时间上看来是同一个时刻点(表示已经对齐)
  4. 该Operator会将数据记录(Outgoing Records)发射(Emit)出去,作为下游Operator的输入
  5. 最后将Barrier对应Snapshot发射(Emit)出去作为此次Checkpoint的结果数据

开启checkpoint

 

        //1.1 开启CKenv.enableCheckpointing(5000);env.getCheckpointConfig().setCheckpointTimeout(10000);env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
//env.setStateBackend(new FsStateBackend("hdfs://hadoop102:8020/cdc-test/ck"));

ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION,表示一旦Flink处理程序被cancel后,会保留Checkpoint数据,以便根据实际需要恢复到指定的Checkpoint处理。

上面代码配置了执行Checkpointing的时间间隔为1分钟。

保存多个checkpoint
默认情况下,如果设置了Checkpoint选项,则Flink只保留最近成功生成的1个Checkpoint

Flink可以支持保留多个Checkpoint,需要在Flink的配置文件conf/flink-conf.yaml中,添加如下配置,指定最多需要保存Checkpoint的个数:

state.checkpoints.num-retained: 20

如果希望会退到某个Checkpoint点,只需要指定对应的某个Checkpoint路径即可实现。

从checkpoint 恢复
如果Flink程序异常失败,或者最近一段时间内数据处理错误,我们可以将程序从某一个Checkpoint点,比如chk-860进行回放,执行如下命令

bin/flink run -s hdfs://namenode01.td.com/flink-1.5.3/flink-checkpoints/582e17d2cc343e6c56255d111bae0191/chk-860/_metadata flink-app-jobs.jar
  • 所有的Checkpoint文件都在以Job ID为名称的目录里面
  • 当Job停掉后,重新从某个Checkpoint点(chk-860)进行恢复时,重新生成Job ID
  • Checkpoint编号会从该次运行基于的编号继续连续生成:chk-861、chk-862、chk-863

checkpoint的建议

  • Checkpoint 间隔不要太短
    • 过短的间对于底层分布式文件系统而言,会带来很大的压力。
    • Flink 作业处理 record 与执行 checkpoint 存在互斥锁,过于频繁的checkpoint,可能会影响整体的性能。
  • 合理设置超时时间

Flink savepoint

Savepoint会在Flink Job之外存储自包含(self-contained)结构的Checkpoint,它使用Flink的Checkpointing机制来创建一个非增量的Snapshot,里面包含Streaming程序的状态,并将Checkpoint的数据存储到外部存储系统中

Flink程序中包含两种状态数据:

用户定义的状态(User-defined State)是基于Flink的Transformation函数来创建或者修改得到的状态数据
系统状态(System State),是指作为Operator计算一部分的数据Buffer等状态数据,比如在使用Window Function时,在Window内部缓存Streaming数据记录
Flink提供了API来为程序中每个Operator设置ID,这样可以在后续更新/升级程序的时候,可以在Savepoint数据中基于Operator ID来与对应的状态信息进行匹配,从而实现恢复。

设置Operator ID:

DataStream<String> stream = env.// Stateful source (e.g. Kafka) with ID.addSource(new StatefulSource()).uid("source-id") // ID for the source operator.shuffle()// Stateful mapper with ID.map(new StatefulMapper()).uid("mapper-id") // ID for the mapper// Stateless printing sink.print(); // Auto-generated ID

创建Savepoint

创建一个Savepoint,需要指定对应Savepoint目录,有两种方式来指定

需要配置Savepoint的默认路径,需要在Flink的配置文件conf/flink-conf.yaml中,添加如下配置,设置Savepoint存储目录

state.savepoints.dir: hdfs://namenode01.td.com/flink/flink-savepoints

手动执行savepoint命令的时候,指定Savepoint存储目录

bin/flink savepoint :jobId [:targetDirectory]

使用默认配置

bin/flink savepoint 40dcc6d2ba90f13930abce295de8d038

为正在运行的Flink Job指定一个目录存储Savepoint数据

bin/flink savepoint 40dcc6d2ba90f13930abce295de8d038 hdfs://namenode01.td.com/tmp/flink/savepoints

从Savepoint恢复

bin/flink run -s :savepointPath [:runArgs]

以上面保存的Savepoint为例,恢复Job运行

bin/flink run -s hdfs://namenode01.td.com/tmp/flink/savepoints/savepoint-40dcc6-a90008f0f82f flink-app-jobs.jar

会启动一个新的Flink Job,ID为cdbae3af1b7441839e7c03bab0d0eefd

Savepoint 目录结构

1bbc5是Flink Job ID字符串前6个字符,后面bd967f90709b是随机生成的字符串
_metadata文件包含了Savepoint的元数据信息
其他文件内容都是序列化的状态信息

总结

checkpoint和savepoint是Flink为我们提供的作业快照机制,它们都包含有作业状态的持久化副本。

用几句话总结一下。

  1. checkpoint的侧重点是“容错”,即Flink作业意外失败并重启之后,能够直接从早先打下的checkpoint恢复运行,且不影响作业逻辑的准确性。而savepoint的侧重点是“维护”,即Flink作业需要在人工干预下手动重启、升级、迁移或A/B测试时,先将状态整体写入可靠存储,维护完毕之后再从savepoint恢复现场。

  2. savepoint是“通过checkpoint机制”创建的,所以savepoint本质上是特殊的checkpoint。

  3. checkpoint面向Flink Runtime本身,由Flink的各个TaskManager定时触发快照并自动清理,一般不需要用户干预;savepoint面向用户,完全根据用户的需要触发与清理。

  4. checkpoint的频率往往比较高(因为需要尽可能保证作业恢复的准确度),所以checkpoint的存储格式非常轻量级,但作为trade-off牺牲了一切可移植(portable)的东西,比如不保证改变并行度和升级的兼容性。savepoint则以二进制形式存储所有状态数据和元数据,执行起来比较慢而且“贵”,但是能够保证portability,如并行度改变或代码升级之后,仍然能正常恢复。

  5. checkpoint是支持增量的(通过RocksDB),特别是对于超大状态的作业而言可以降低写入成本。savepoint并不会连续自动触发,所以savepoint没有必要支持增量。

http://www.yayakq.cn/news/56972/

相关文章:

  • 鹰潭建设网站公司简述网站开发过程
  • 男同志做爰网站淘宝网页版评价管理在哪里
  • 哪个网站可以悬赏做图帝国cms做视频网站性能如何
  • 河北网站建设联系方式苏州大学网站建设目标
  • 设计感网站有哪些方面浙江省住建厅四库一平台
  • 西安网站到首页排名房地产市场名词解释
  • 二 加强门户网站建设大型网站响应式
  • 南充做网站的公司专业网站设计力荐亿企邦
  • 江苏省住房和城乡建设厅网站首页哈尔滨今天新闻头条
  • 静态网站注入工程建设监理网站
  • wordpress自定义搜索页面石家庄网络推广优化
  • 自建外贸网站如何推广网页搜索青少年普法网官网
  • 网站开发语言比例怎么自己做微网站
  • 无锡网站的优化网络运维工程师教程
  • 香河做网站公司外贸客户哪里找
  • 网站信任的体验如何做专业的网站制作正规公司
  • 做美妆网站的关键词有数据库的网站
  • 交易 网站备案营销网站价格
  • 做网站时为什么导航时两行字腾讯企点
  • 网站建设软件开发公司网页设计ppt模板
  • 公司网站建设解决方案微金所网站谁做的
  • 广安网站建设推荐做网站公司融资多少
  • 怎么把在EXCEL做的查询系统做到网站上开发一个软件需要多久
  • 家居网站应该怎么做淘宝客搜索网站怎么做
  • 做网站的而程序重庆自助建站软件
  • 手机做ppt的免费模板下载网站金富通青岛建设工程有限公司网站
  • 网站开发技术基础教程上海的网站建设
  • 帝国做的网站删除域名后缀网站建设重庆
  • 软文网站大全网站内页制作
  • 做pc端网站特色wordpress中文分类