当前位置: 首页 > news >正文

银行内部网站建设建议镇平县两学一做网站

银行内部网站建设建议,镇平县两学一做网站,深圳网站建设推广公司,智通人才东莞招聘网背景: kafka的文件系统数据源可以支持精准一次的一致性,本文就从源码看下如何TextInputFormat如何支持状态的精准一致性 TextInputFormat源码解析 首先flink会把输入的文件进行切分,分成多个数据块的形式,每个数据源算子任务会被分配以读取…

背景:

kafka的文件系统数据源可以支持精准一次的一致性,本文就从源码看下如何TextInputFormat如何支持状态的精准一致性

TextInputFormat源码解析

首先flink会把输入的文件进行切分,分成多个数据块的形式,每个数据源算子任务会被分配以读取其中的数据块,但是不是所有的文件都能进行分块,判断文件是否可以进行分块的代码如下:

protected boolean testForUnsplittable(FileStatus pathFile) {if (getInflaterInputStreamFactory(pathFile.getPath()) != null) {unsplittable = true;return true;}return false;
}private InflaterInputStreamFactory<?> getInflaterInputStreamFactory(Path path) {String fileExtension = extractFileExtension(path.getName());if (fileExtension != null) {return getInflaterInputStreamFactory(fileExtension);} else {return null;}
}

在这里插入图片描述

后缀名称是.gz,.bzip2等的文件都没法切分,如果可以切分,切分的具体代码如下所示:

while (samplesTaken < numSamples && fileNum < allFiles.size()) {// make a split for the sample and use it to read a recordFileStatus file = allFiles.get(fileNum);
// 根据偏移量进行切分FileInputSplit split = new FileInputSplit(0, file.getPath(), offset, file.getLen() - offset, null);// we open the split, read one line, and take its lengthtry {open(split);if (readLine()) {totalNumBytes += this.currLen + this.delimiter.length;samplesTaken++;}} finally {// close the file stream, do not release the bufferssuper.close();}
// 偏移量迁移offset += stepSize;// skip to the next file, if necessarywhile (fileNum < allFiles.size()&& offset >= (file = allFiles.get(fileNum)).getLen()) {offset -= file.getLen();fileNum++;}
}

再来看一下TextInputFormat如何支持checkpoint操作,保存文件的偏移量的代码:

@Override
public void snapshotState(StateSnapshotContext context) throws Exception {super.snapshotState(context);checkState(checkpointedState != null, "The operator state has not been properly initialized.");int subtaskIdx = getRuntimeContext().getIndexOfThisSubtask();// 算子列表状态checkpointedState.clear();// 获取文件的当前读取的偏移List<T> readerState = getReaderState();try {for (T split : readerState) {//保存到检查点路径中checkpointedState.add(split);}} catch (Exception e) {checkpointedState.clear();throw new Exception("Could not add timestamped file input splits to to operator "+ "state backend of operator "+ getOperatorName()+ '.',e);}if (LOG.isDebugEnabled()) {LOG.debug("{} (taskIdx={}) checkpointed {} splits: {}.",getClass().getSimpleName(),subtaskIdx,readerState.size(),readerState);}
}

从检查点中恢复状态的代码如下:

public void initializeState(StateInitializationContext context) throws Exception {super.initializeState(context);checkState(checkpointedState == null, "The reader state has already been initialized.");// 初始化算子操作状态checkpointedState =context.getOperatorStateStore().getListState(new ListStateDescriptor<>("splits", new JavaSerializer<>()));int subtaskIdx = getRuntimeContext().getIndexOfThisSubtask();LOG.info("Restoring state for the {} (taskIdx={}).", getClass().getSimpleName(), subtaskIdx);splits = splits == null ? new PriorityQueue<>() : splits;for (T split : checkpointedState.get()) {//从检查点状态中恢复各个切分的分块splits.add(split);}
}
http://www.yayakq.cn/news/210620/

相关文章:

  • 公司招聘网站 哪个部门做建个人博客网站
  • 北京网站优化外包用虚拟主机好还是阿里云wordpress
  • 深圳网站高端建设网站网页设计html
  • 阿里云网站建设考试认证题深圳的网站建设公司pestl分析
  • 哪些网站是用php做的wordpress 去掉底部版权
  • 一流的企业网站建设竞价推广运营
  • 做网站时候图片和视频放在哪里wordpress 换 ip
  • 网站建设整体方案广州番禺房价最新楼盘价格
  • 昆山做网站哪家好响应式儿童网站源码
  • eclipse做网站表格东莞创意网站设计
  • 网站众筹网站开发
  • 自己做网站自己买服务器wordpress添加首页友情链接
  • 桓台网站自己建网站流程
  • wordpress 显示指定分类淄博seo外包公司
  • 南宁网站建设公司怎么赚钱网络营销课程总结与心得体会
  • 济南网站优化做网站 域名是怎么回事
  • 渭南网站建设电话wordpress 所有页面空白页
  • 有什么做论文的网站做个营销网站
  • 新网站多久收录内页如何新建一个网页页面
  • 手机做任务网站网络营销试题
  • 企业网站建设立项书物联网平台是什么意思
  • 深圳企业企业网站建设免费平面设计教程全集
  • asp网站开发教案南京米雅途做网站如何
  • 电子商务网站建设策划书的流程门户网站直接登录系统
  • 永清住房和城乡建设部网站uc官方网站开发者中心
  • 网站 linux 服务器配置沈阳网络营销推广的公司
  • 网站数据库备份怎么做秦皇岛吧最新事件
  • 沈阳网站建设024w长春市网站建设
  • 网站系统是一个典型的wordpress文字占满
  • 做网站和优化的公司微信公众平台官网入口