当前位置: 首页 > news >正文

网站的物理结构网站开发工作室简介

网站的物理结构,网站开发工作室简介,湘潭公司做网站,上海手机网站建设哪家专业背景 算子的联合列表状态是平时使用的比较少的一种状态,本文通过kafka的消费者实现来看一下怎么使用算子列表联合状态 算子联合列表状态 首先我们看一下算子联合列表状态的在进行故障恢复或者从某个保存点进行扩缩容启动应用时状态的恢复情况 算子联合列表状态主…

背景

算子的联合列表状态是平时使用的比较少的一种状态,本文通过kafka的消费者实现来看一下怎么使用算子列表联合状态

算子联合列表状态

首先我们看一下算子联合列表状态的在进行故障恢复或者从某个保存点进行扩缩容启动应用时状态的恢复情况
在这里插入图片描述
算子联合列表状态主要由这两个方法处理:
1初始化方法

public final void initializeState(FunctionInitializationContext context) throws Exception {OperatorStateStore stateStore = context.getOperatorStateStore();// 在初始化方法中获取联合列表状态this.unionOffsetStates =stateStore.getUnionListState(new ListStateDescriptor<>(OFFSETS_STATE_NAME,createStateSerializer(getRuntimeContext().getExecutionConfig())));if (context.isRestored()) {restoredState = new TreeMap<>(new KafkaTopicPartition.Comparator());
// 把联合列表状态的数据都恢复成类的本地变量中// populate actual holder for restored statefor (Tuple2<KafkaTopicPartition, Long> kafkaOffset : unionOffsetStates.get()) {restoredState.put(kafkaOffset.f0, kafkaOffset.f1);}LOG.info("Consumer subtask {} restored state: {}.",getRuntimeContext().getIndexOfThisSubtask(),restoredState);} else {LOG.info("Consumer subtask {} has no restore state.",getRuntimeContext().getIndexOfThisSubtask());}}

2.开始通知检查点开始的方法:

public final void snapshotState(FunctionSnapshotContext context) throws Exception {if (!running) {LOG.debug("snapshotState() called on closed source");} else {unionOffsetStates.clear();final AbstractFetcher<?, ?> fetcher = this.kafkaFetcher;if (fetcher == null) {// the fetcher has not yet been initialized, which means we need to return the// originally restored offsets or the assigned partitionsfor (Map.Entry<KafkaTopicPartition, Long> subscribedPartition :subscribedPartitionsToStartOffsets.entrySet()) {// 进行checkpoint时,把数据保存到联合列表状态中进行保存unionOffsetStates.add(Tuple2.of(subscribedPartition.getKey(), subscribedPartition.getValue()));}if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS) {// the map cannot be asynchronously updated, because only one checkpoint call// can happen// on this function at a time: either snapshotState() or// notifyCheckpointComplete()pendingOffsetsToCommit.put(context.getCheckpointId(), restoredState);}} else {HashMap<KafkaTopicPartition, Long> currentOffsets = fetcher.snapshotCurrentState();if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS) {// the map cannot be asynchronously updated, because only one checkpoint call// can happen// on this function at a time: either snapshotState() or// notifyCheckpointComplete()pendingOffsetsToCommit.put(context.getCheckpointId(), currentOffsets);}for (Map.Entry<KafkaTopicPartition, Long> kafkaTopicPartitionLongEntry :currentOffsets.entrySet()) {unionOffsetStates.add(Tuple2.of(kafkaTopicPartitionLongEntry.getKey(),kafkaTopicPartitionLongEntry.getValue()));}}if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS) {// truncate the map of pending offsets to commit, to prevent infinite growthwhile (pendingOffsetsToCommit.size() > MAX_NUM_PENDING_CHECKPOINTS) {pendingOffsetsToCommit.remove(0);}}}}
http://www.yayakq.cn/news/556652/

相关文章:

  • 厦门亚龙网站建设上海高端网站建设服务
  • 网页中网站设计规划流程建设银行顺德分行网站
  • 淮南做网站推广网站分辨率做多大
  • 与恶魔做交易的网站学校网站建设栏目设置
  • 深圳 手机网站建立网站的公司
  • 网站一般做多大像素写字楼装修风格
  • 郑州网站建设公司代运营校园文创产品设计
  • 王建设医生网站有名做网站公司
  • 深圳公司网站建设设计wordpress页面编辑器
  • 如何做pc网站适配网站建设教程软件下载
  • 自己做网站的费用网站项目策划书方案
  • 云主机多个网站智能响应式网站建设
  • 怎么建个公司网站网站流量被用完了
  • 电子商务网站建设教学力软框架做网站
  • 常德建设企业网站找程序员代写程序
  • 网站一年域名费用多少钱高清的广州网站建设
  • 交互网站是什么东莞网站制作及推广价格
  • 如何做网站模版海贼王网页设计素材
  • 做一个付费网站多少钱企业建设网站的一般过程
  • 室内设计可以做网站吗wordpress dux5.3
  • 台州网站推广技巧付费洛阳青峰网络科技有限公司
  • 做网站最好用的软件可以做试卷的网站英语怎么说
  • 苏州建筑类网站建设wordpress获取当前子分类
  • 网站设计是怎么设计的php主做哪种类型网站
  • 第一次开票网站建设怎么开做网站的生产方式
  • 神奇的工作室最新网站wordpress 导入的模板
  • 深圳网站建设lxhd企业展厅设计图片欣赏
  • 怎么做网站受众分析wordpress会员注册为
  • 企业网站设计需要了解网站推广计划包括哪些
  • 做设计的素材网站有哪些成都网站建设有名的