当前位置: 首页 > news >正文

源码网站取名河北衡水建设网站公司电话

源码网站取名,河北衡水建设网站公司电话,室内装饰网站模板,电商小程序怎么做背景 在Flink中有两种基本的状态:Keyed State和Operator State,Operator State很好理解,一个特定的Operator算子共享同一个state,这是实现层面很好做到的。 但是 Keyed State 是怎么实现的?一般来说,正常的…

背景

在Flink中有两种基本的状态:Keyed State和Operator StateOperator State很好理解,一个特定的Operator算子共享同一个state,这是实现层面很好做到的。
但是 Keyed State 是怎么实现的?一般来说,正常的人第一眼就会想到:一个task绑定一个Keyd State,从网上随便查找资料就能发现正确的答案是:对于每一个Key会绑定一个State,但是这在Flink中是怎么实现的呢?
注意:这里我们只讲Flink中是怎么实现一个Key对应一个State的,其他细节并不细说,且state的backend为RocksDB

闲说杂谈

我们以ValueState类型的Keyed State举例:


ValueStateDescriptor<HoodieRecordGlobalLocation> indexStateDesc =new ValueStateDescriptor<>("indexState",TypeInformation.of(HoodieRecordGlobalLocation.class));
ValueState<HoodieRecordGlobalLocation> indexState = context.getKeyedStateStore().getState(indexStateDesc)
....
indexState.update((HoodieRecordGlobalLocation) indexRecord.getCurrentLocation())
  • context.getKeyedStateStore().getState是获取对应keyState,最终的调用链如下:

     DefaultKeyedStateStore.getState -> getPartitionedState||\/RocksDBKeyedStateBackend.getPartitionedState -> getOrCreateKeyedState -> createInternalState -> tryRegisterKvStateInformation||\/RocksDBValueState.create(创建RocksDBValueState)                                                                             

    这里的 tryRegisterKvStateInformation会涉及到RocksDB ColumnFamily的创建:

    RocksDBOperationUtils.createStateInfo -> createColumnFamilyDescriptor 
    // createColumnFamilyDescriptor的部分代码:
    ColumnFamilyOptions options =createColumnFamilyOptions(columnFamilyOptionsFactory, metaInfoBase.getName());
    if (ttlCompactFiltersManager != null) {ttlCompactFiltersManager.setAndRegisterCompactFilterIfStateTtl(metaInfoBase, options);
    }
    byte[] nameBytes = metaInfoBase.getName().getBytes(ConfigConstants.DEFAULT_CHARSET);
    ...
    return new ColumnFamilyDescriptor(nameBytes, options);

    其实最终会发现RocksDBColumnFamily是跟ValueStateDescriptor也就是描述符的名字有关的,这就是为什么描述符必须是唯一的,关于RocksDBColumnFamily,可以参考RocksDB 简介
    注意此时返回是key对应的一个State的ColumnFamily,该Family包括该task所有的key的value值

  • indexState.update 这里是更新indexState得值
    因为上一步得到只是该Task所对应的ColumanFamily所对应的所有的values,也就是* Flink中的Key-Groups*,(关于Key-Groups可以参考Apache-Flink深度解析-State)

      public void update(V value) {if (value == null) {clear();return;}try {backend.db.put(columnFamily,writeOptions,serializeCurrentKeyWithGroupAndNamespace(),serializeValue(value));} catch (Exception e) {throw new FlinkRuntimeException("Error while adding data to RocksDB", e);}}
    

    最终的调用链如下:

    RocksDBValueState.update -> serializeCurrentKeyWithGroupAndNamespace||\/
    SerializedCompositeKeyBuilder.buildCompositeKeyNamespace||\/
    serializeNamespace(namespace, namespaceSerializer) -> keyOutView.getCopyOfBuffer()   

    这里的keyOutView.getCopyOfBuffer是会获得的record的key,所以在backend.db.put方法中才会更新对应的Key值。
    但是什么时候Record的key信息会被写入到keyOutView中去呢?

  • Record的key何时被写到keyOutView

    AbstractStreamTaskNetworkInput.emitNext -> processElement||\/
    OneInputStreamTask.emitRecord||\/
    OneInputStreamOperator.setKeyContextElement -> setKeyContextElement1 -> setKeyContextElement||\/
    AbstractStreamOperator.setCurrentKey||\/
    StreamOperatorStateHandler.setCurrentKey||\/
    RocksDBKeyedStateBackend.setCurrentKey||\/
    SerializedCompositeKeyBuilder.setCurrentKey -> serializeKeyGroupAndKey||\/
    keySerializer.serialize(key, keyOutView);    

    最后一步keySerializer.serialize(key, keyOutView)一个Record的key就被写到keyOutView中,也就是说对应的key是从每个record中获取的,所以在backend.db.put方法中就能获取到对应的Key

其他

对于keyedStateStore是在哪里初始化的,可以看AbstractStreamOperatorinitializeState方法:

final StreamOperatorStateContext context =streamTaskStateManager.streamOperatorStateContext(getOperatorID(),getClass().getSimpleName(),getProcessingTimeService(),this,keySerializer,streamTaskCloseableRegistry,metrics,config.getManagedMemoryFractionOperatorUseCaseOfSlot(ManagedMemoryUseCase.STATE_BACKEND,runtimeContext.getTaskManagerRuntimeInfo().getConfiguration(),runtimeContext.getUserCodeClassLoader()),isUsingCustomRawKeyedState());stateHandler =new StreamOperatorStateHandler(context, getExecutionConfig(), streamTaskCloseableRegistry);

这个方法里也包括了keyedStatedBackendoperatorStateBackend等初始化, 具体的细节后续再解析。

http://www.yayakq.cn/news/630850/

相关文章:

  • 做市场浏览什么网站查商标
  • 网站建设销售一个月开几个单wordpress大学最新模板下载地址
  • 云梦县城乡建设局网站wordpress添加发布视频
  • 贵阳市城乡建设厅网站香奈儿网站建设策划书
  • 英文网站建设需求wordpress移动顶部导航菜单
  • 博客网站主页代码html做英文网站内容来源
  • 网站排名快速见效的方法网站怎么管理维护
  • 网站开发技术书籍oss挂载到wordpress
  • 企业网站推广内容融资平台有哪些
  • 中国免费网站服务器文化建设基金管理有限公司网站
  • 广州建设网站是什么样的广东品牌网站建设报价
  • 益阳购物网站开发设计网站富文本的内容怎么做
  • 门户网站建设多久东莞网推广网站建设
  • 企业网站维护的要求包括宿迁市区建设局网站
  • 建立一个个人介绍网站wordpress保存图片插件
  • 微网站如何做微信支付宝支付宝支付宝沈阳网站建设优化
  • 网站打开404错误怎么解决方法软件技术是什么专业类别
  • 网站名称要注册吗海外贸易网站
  • 夫妻网站开发荆门网站建设
  • 苏州外贸企业网站建设广州做网页的公司
  • 深圳市网站建设外包公司排名济南手机网站建设报价
  • 建站兔软件下载网站建制作公司
  • 做手机网站需要多少钱建设官方网站请示
  • 淮安百度网站建设网页设计图片切换
  • 古风网站建设模板下载北京最新消息今天新闻
  • wordpress能找工作吗盖州网站优化
  • 科技画网站制作带优化
  • 北京做公司网站的公司无锡公司网站建设服务
  • 谁会在阿里云建网站织梦免费自适应网站模板
  • 那家做网站好中国seo关键词优化工具