当前位置: 首页 > news >正文

网站建设维护费怎么说视频网站建设技术方案

网站建设维护费怎么说,视频网站建设技术方案,成都做公司网站,百度官网下载电脑版文章目录 源码解析Flink源节点数据读取是如何与checkpoint串行执行Checkpoint阶段StreamTask类变量actionExecutor的实现和初始化小结 数据读取阶段小结 总结 源码解析Flink源节点数据读取是如何与checkpoint串行执行 Flink版本:1.13.6 前置知识:源节点…

文章目录

        • 源码解析Flink源节点数据读取是如何与checkpoint串行执行
          • Checkpoint阶段
            • StreamTask类变量actionExecutor的实现和初始化
            • 小结
          • 数据读取阶段
            • 小结
          • 总结

源码解析Flink源节点数据读取是如何与checkpoint串行执行

Flink版本:1.13.6

前置知识:源节点的Checkpoint是由Checkpointcoordinate触发,具体是通过RPC调用TaskManager中对应的Task的StreamTask类的performChecpoint方法执行Checkpoint。

本文思路:本文先分析checkpoint阶段,然后再分析数据读取阶段,最后得出结论:源节点Checkpoint时和源节点读取数据时,都需要抢SourceStreamTask类中lock变量的锁,最终实现串行执行checkpoint与写数据

Checkpoint阶段

Checkpoint在StreamTask的performCheckpoint方法中执行,该方法调用过程如下

// 在StreamTask类中 执行checkpoint操作
private boolean performCheckpoint(CheckpointMetaData checkpointMetaData,CheckpointOptions checkpointOptions,CheckpointMetricsBuilder checkpointMetrics )throws Exception {if (isRunning) {//使用actionExecutor 同步触发checkpointactionExecutor.runThrowing(() -> {....//经过一系列检查subtaskCheckpointCoordinator.checkpointState(checkpointMetaData,checkpointOptions,checkpointMetrics,operatorChain,this::isRunning);});return true;} else {....}}

从上述代码可以看出,Checkpoint执行是由actionExecutor执行器执行

StreamTask类变量actionExecutor的实现和初始化

StreamTask类变量actionExecution的实现

通过代码注释可以知道该执行器的实现是StreamTaskActionExecutor.SynchronizedStreamTaskActionExecutor;从SynchronizedStreamTaskActionExecutor源代码可知,该执行器每次执行都需要获得mutex对象锁

  /*** All actions outside of the task {@link #mailboxProcessor mailbox} (i.e. performed by another* thread) must be executed through this executor to ensure that we don't have concurrent method* calls that void consistent checkpoints.** <p>CheckpointLock is superseded by {@link MailboxExecutor}, with {@link* StreamTaskActionExecutor.SynchronizedStreamTaskActionExecutor* SynchronizedStreamTaskActionExecutor} to provide lock to {@link SourceStreamTask}.*/
private final StreamTaskActionExecutor actionExecutor;class SynchronizedStreamTaskActionExecutor implements StreamTaskActionExecutor {private final Object mutex;public SynchronizedStreamTaskActionExecutor(Object mutex) {this.mutex = mutex;}@Overridepublic void run(RunnableWithException runnable) throws Exception {synchronized (mutex) {runnable.run();}}
}

StreamTask变量actionExecution初始化

actionExecutor变量在StreamTask中定义,在构造方法中初始化;该构造方法由SourceStreamTask调用,并传入SynchronizedStreamTaskActionExecutor对象,代码如下所示

//   SourceStreamTask的方法
private SourceStreamTask(Environment env, Object lock) throws Exception {//调用的StreamTask构造函数,传入SynchronizedStreamTaskActionExecutor对象super(env,null,FatalExitExceptionHandler.INSTANCE,//初始化actionExecutorStreamTaskActionExecutor.synchronizedExecutor(lock));//将lock对象赋值给类变量lockthis.lock = Preconditions.checkNotNull(lock);this.sourceThread = new LegacySourceFunctionThread();getEnvironment().getMetricGroup().getIOMetricGroup().setEnableBusyTime(false);
}//  StreamTask的方法
protected StreamTask(Environment environment,@Nullable TimerService timerService,Thread.UncaughtExceptionHandler uncaughtExceptionHandler,//初始化actionExecutorStreamTaskActionExecutor actionExecutor)throws Exception {this(environment,timerService,uncaughtExceptionHandler,actionExecutor,new TaskMailboxImpl(Thread.currentThread()));
}protected StreamTask(Environment environment,@Nullable TimerService timerService,Thread.UncaughtExceptionHandler uncaughtExceptionHandler,StreamTaskActionExecutor actionExecutor,TaskMailbox mailbox)throws Exception {super(environment);this.configuration = new StreamConfig(getTaskConfiguration());this.recordWriter = createRecordWriterDelegate(configuration, environment);//初始化actionExecutorthis.actionExecutor = Preconditions.checkNotNull(actionExecutor);this.mailboxProcessor = new MailboxProcessor(this::processInput, mailbox, actionExecutor);.......}
小结

actionExecutor执行器每次执行都需要获得mutex对象,mutex对象就是SourceStreamTask类中的lock对象;即算子每次执行Checkpoint时都需要获得SourceStreamTask类中lock对象锁才能进行

数据读取阶段

在执行Checkpoint时控制读取源端,则控制点必定是在调用SourceContext的collect方法时

@Override
public void run(SourceContext<String> ctx) throws Exception {int i = 0;while (true) {//在这个方法里处理ctx.collect(String.valueOf(i));}
}

点击collection查看实现,选择NonTimestampContext查看代码,collect()实现如下

@Override
public void collect(T element) {synchronized (lock) {output.collect(reuse.replace(element));}
}

所以这里控制数据读取发送是通过lock来控制,lock是如何初始化的?

通过NonTimestampContext构造方法可以定位到StreamSourceContexts->getSourceContext方法;

public static <OUT> SourceFunction.SourceContext<OUT> getSourceContext(TimeCharacteristic timeCharacteristic,ProcessingTimeService processingTimeService,Object checkpointLock,StreamStatusMaintainer streamStatusMaintainer,Output<StreamRecord<OUT>> output,long watermarkInterval,long idleTimeout) {final SourceFunction.SourceContext<OUT> ctx;switch (timeCharacteristic) {....case ProcessingTime://初始化NonTimestampContextctx = new NonTimestampContext<>(checkpointLock, output);break;default:throw new IllegalArgumentException(String.valueOf(timeCharacteristic));}return ctx;
}

向上追踪,在StreamSource类中调用getSourceContext:

public void run(final Object lockingObject,final StreamStatusMaintainer streamStatusMaintainer,final Output<StreamRecord<OUT>> collector,final OperatorChain<?, ?> operatorChain)throws Exception {....this.ctx =StreamSourceContexts.getSourceContext(timeCharacteristic,getProcessingTimeService(),lockingObject,streamStatusMaintainer,collector,watermarkInterval,-1);....}
// 再向上最终run方法的调用点->是由内部方法run调用
public void run(final Object lockingObject,final StreamStatusMaintainer streamStatusMaintainer,final OperatorChain<?, ?> operatorChain)throws Exception {run(lockingObject, streamStatusMaintainer, output, operatorChain);
}//再向上最终run方法的调用点->SourceStreamTask 调用run 然后再代用mainOpterator run方法
@Override
public void run() {try {// 使用的是类变量lockmainOperator.run(lock, getStreamStatusMaintainer(), operatorChain);if (!wasStoppedExternally && !isCanceled()) {synchronized (lock) {operatorChain.setIgnoreEndOfInput(false);}}completionFuture.complete(null);} catch (Throwable t) {// Note, t can be also an InterruptedExceptioncompletionFuture.completeExceptionally(t);}
}
小结

所以在源端写数据时,必须获得SourceStreamTask中的类变量lock的锁才能进行写数据;类变量lock刚好和执行器时同一个对象

总结

flink的source算子在Checkpoint时,是通过锁对象SourceStreamTask.lock,来控制源端数据产生和Checkpoint的有序进行

http://www.yayakq.cn/news/483471/

相关文章:

  • jsp个人网站怎样做温州网站设计图片大全
  • 无锡做公司网站的域名备案进度查询
  • 苏州企业黄页seo网站优化服务合同
  • 东台建设网站的公司垫江网站建设价格
  • 交互式网站制作金华网站建设方案策划
  • 商标设计网站免费wordpress 制作手机站
  • 青岛建网站多少钱网站数据库分离怎么做
  • 天津网站建设座机号免费注册网站
  • 淄博网站建设详细策划男女性做那个视频网站
  • 网站专业优化鱼头seo推广
  • windows7建设网站什么公司可以做网站
  • wordpress 即时站内搜索网页版qq登录入口版qq账号登录界面
  • 有做的小说网站pc网站转换成app
  • 网站开发需要人员东莞最好的网络公司找火速
  • 佛山网站建设骏域网站建设专家优化网站排名推荐公司
  • 织梦系统做的网站忘记登录密码做快递网站制作
  • 网站规划建设方案农业绿化风格官方网站开发与定制
  • 网站运行费用一般多少营销策划方案的内容
  • 网页建站的费用想要黑掉一个网站 要怎么做
  • 连云港做网站制作科技布沙发脏了用什么办法清洗
  • 烟台高新区规划国土建设局网站辽宁省住房和城乡建设厅网站打不开
  • wordpress 移动到回收站发生错误网络技术包括哪些具体内容
  • 做网站一屏有多大保险网站推荐
  • 网站制作学费多少钱建设学校网站论文
  • 网站页面相关产品链接怎么做江海区建设局网站
  • 公司网站建设服务费怎么做账wordpress修改网址
  • 优秀网站设计案例中国做网站代码编辑工具
  • 网站备案怎么查询什么是网页设计培训
  • 玉山网站制作商业设计师是做什么的
  • 网站制作国际连锁低价服装网站建设