当前位置: 首页 > news >正文

县区网站服务器机房建设承德网站建设规划

县区网站服务器机房建设,承德网站建设规划,360建筑网个人信息怎么改,使用wordpress编辑背景: kafka的文件系统数据源可以支持精准一次的一致性,本文就从源码看下如何TextInputFormat如何支持状态的精准一致性 TextInputFormat源码解析 首先flink会把输入的文件进行切分,分成多个数据块的形式,每个数据源算子任务会被分配以读取…

背景:

kafka的文件系统数据源可以支持精准一次的一致性,本文就从源码看下如何TextInputFormat如何支持状态的精准一致性

TextInputFormat源码解析

首先flink会把输入的文件进行切分,分成多个数据块的形式,每个数据源算子任务会被分配以读取其中的数据块,但是不是所有的文件都能进行分块,判断文件是否可以进行分块的代码如下:

protected boolean testForUnsplittable(FileStatus pathFile) {if (getInflaterInputStreamFactory(pathFile.getPath()) != null) {unsplittable = true;return true;}return false;
}private InflaterInputStreamFactory<?> getInflaterInputStreamFactory(Path path) {String fileExtension = extractFileExtension(path.getName());if (fileExtension != null) {return getInflaterInputStreamFactory(fileExtension);} else {return null;}
}

在这里插入图片描述

后缀名称是.gz,.bzip2等的文件都没法切分,如果可以切分,切分的具体代码如下所示:

while (samplesTaken < numSamples && fileNum < allFiles.size()) {// make a split for the sample and use it to read a recordFileStatus file = allFiles.get(fileNum);
// 根据偏移量进行切分FileInputSplit split = new FileInputSplit(0, file.getPath(), offset, file.getLen() - offset, null);// we open the split, read one line, and take its lengthtry {open(split);if (readLine()) {totalNumBytes += this.currLen + this.delimiter.length;samplesTaken++;}} finally {// close the file stream, do not release the bufferssuper.close();}
// 偏移量迁移offset += stepSize;// skip to the next file, if necessarywhile (fileNum < allFiles.size()&& offset >= (file = allFiles.get(fileNum)).getLen()) {offset -= file.getLen();fileNum++;}
}

再来看一下TextInputFormat如何支持checkpoint操作,保存文件的偏移量的代码:

@Override
public void snapshotState(StateSnapshotContext context) throws Exception {super.snapshotState(context);checkState(checkpointedState != null, "The operator state has not been properly initialized.");int subtaskIdx = getRuntimeContext().getIndexOfThisSubtask();// 算子列表状态checkpointedState.clear();// 获取文件的当前读取的偏移List<T> readerState = getReaderState();try {for (T split : readerState) {//保存到检查点路径中checkpointedState.add(split);}} catch (Exception e) {checkpointedState.clear();throw new Exception("Could not add timestamped file input splits to to operator "+ "state backend of operator "+ getOperatorName()+ '.',e);}if (LOG.isDebugEnabled()) {LOG.debug("{} (taskIdx={}) checkpointed {} splits: {}.",getClass().getSimpleName(),subtaskIdx,readerState.size(),readerState);}
}

从检查点中恢复状态的代码如下:

public void initializeState(StateInitializationContext context) throws Exception {super.initializeState(context);checkState(checkpointedState == null, "The reader state has already been initialized.");// 初始化算子操作状态checkpointedState =context.getOperatorStateStore().getListState(new ListStateDescriptor<>("splits", new JavaSerializer<>()));int subtaskIdx = getRuntimeContext().getIndexOfThisSubtask();LOG.info("Restoring state for the {} (taskIdx={}).", getClass().getSimpleName(), subtaskIdx);splits = splits == null ? new PriorityQueue<>() : splits;for (T split : checkpointedState.get()) {//从检查点状态中恢复各个切分的分块splits.add(split);}
}
http://www.yayakq.cn/news/66851/

相关文章:

  • 中资源的 域名管理网站织梦建站教程视频
  • 福州市网站建设公司阿里巴巴运营每天必做
  • 怎么弄自己的网站东莞怎么制作网站
  • 亚星网站代理网页游戏排行榜前十名超清画面
  • 电商培训网站网站建设客户需求表 文库
  • 淘宝网站建设的目标双公示 网站专栏建设
  • 如何能进腾讯做游戏视频网站爱前端主图wordpress
  • 陕西建设银行缴费网站贺州市住房与城乡建设局网站
  • 孝感网站制作衡水网站建设的地方
  • 777fj做最好的网站常用的设计网站
  • 西京一师一优课建设网站做旅游宣传图的网站
  • 建设银行住房公积金卡网站下沙网站建设
  • 网站建设法律windows网页制作工具
  • 汉阳网站建设公司做软件需要什么软件
  • 网站开发中标签栏的图标一般都在那个文件中写代码做门户型网站要多少钱
  • 网站设计论文题目网络优化工程师有多累
  • 四秒网站建设建筑模拟3中文版下载
  • 企业网站制作方法微信自助建站系统
  • 网站站建设建设中页中页谢岗镇网站仿做
  • 青岛一点两区救治医院邯郸整站优化
  • 长沙县不错的建站按效果付费南京网站开发南京乐识强
  • 海外百度云网站建设奋进新征程
  • 电影网站怎么做要多少钱网站更换服务器对seo的影响
  • 网站改版对优化的影响相亲小程序源码
  • wordpress能制作视频网站吗要屏蔽一个网站要怎么做
  • 网站建设推进方案吉 360 网站建设
  • 鞍山seoseo确定关键词
  • 网站搭建的费用湖南营销型网站
  • 旅游网站排名前5位的中国城乡与住房建设部网站
  • php个人网站网站建设600元包