当前位置: 首页 > news >正文

东莞品牌网站定制免费tickle网站

东莞品牌网站定制,免费tickle网站,cms系统源码,网站广告位制作背景 TwoPhaseCommitSinkFunction是flink中基于二阶段事务提交和检查点机制配合使用实现的精准一次的输出数据汇,但是想要实现精准一次的输出,实际使用中需要注意几个方面,否则不仅仅达不到精准一次输出,反而可能导致数据丢失&am…

背景

TwoPhaseCommitSinkFunction是flink中基于二阶段事务提交和检查点机制配合使用实现的精准一次的输出数据汇,但是想要实现精准一次的输出,实际使用中需要注意几个方面,否则不仅仅达不到精准一次输出,反而可能导致数据丢失,连至少一次的语义都不能达到

TwoPhaseCommitSinkFunction注意事项

TwoPhaseCommitSinkFunction是通过在两阶段提交协议实现的事务,大概简化为一下步骤:
1 在收到检查点分隔符的时候,开启事务,并把记录都写到开启的事务中,
2. 开始进行状态的保存时,把检查点id对应的事务结束掉,做好准备提交的准备,并开启下一个事务

public void snapshotState(FunctionSnapshotContext context) throws Exception {// this is like the pre-commit of a 2-phase-commit transaction// we are ready to commit and remember the transactioncheckState(currentTransactionHolder != null,"bug: no transaction object when performing state snapshot");long checkpointId = context.getCheckpointId();LOG.debug("{} - checkpoint {} triggered, flushing transaction '{}'",name(),context.getCheckpointId(),currentTransactionHolder);//当前检查点对应的事务做好准备,比如进行stream.flush等,准备好提交事务preCommit(currentTransactionHolder.handle);// 把当前检查点id对应的事务添加到状态中pendingCommitTransactions.put(checkpointId, currentTransactionHolder);LOG.debug("{} - stored pending transactions {}", name(), pendingCommitTransactions);currentTransactionHolder = beginTransactionInternal();LOG.debug("{} - started new transaction '{}'", name(), currentTransactionHolder);// 把当前检查点id对应的事务添加到状态中state.clear();state.add(new State<>(this.currentTransactionHolder,new ArrayList<>(pendingCommitTransactions.values()),userContext));}
  1. 收到检查点完成的通知notify方法,提交第二步中检查点id对应的事务,注意这一步不是每次flink在进行检查点的时候都会通知,这种情况下,某一次的notify方法就需要把前几次的事务一起进行提交了,另外,如果提交某个检查点的事务失败,那么应用会重启,并且在重启后的initSnapshot方法中再次进行事务提交,如果还是失败,这个过程一直持续
    public final void notifyCheckpointComplete(long checkpointId) throws Exception {Iterator<Map.Entry<Long, TransactionHolder<TXN>>> pendingTransactionIterator =pendingCommitTransactions.entrySet().iterator();Throwable firstError = null;while (pendingTransactionIterator.hasNext()) {Map.Entry<Long, TransactionHolder<TXN>> entry = pendingTransactionIterator.next();Long pendingTransactionCheckpointId = entry.getKey();TransactionHolder<TXN> pendingTransaction = entry.getValue();if (pendingTransactionCheckpointId > checkpointId) {continue;}LOG.info("{} - checkpoint {} complete, committing transaction {} from checkpoint {}",name(),checkpointId,pendingTransaction,pendingTransactionCheckpointId);logWarningIfTimeoutAlmostReached(pendingTransaction);try {// 提交事务commit(pendingTransaction.handle);} catch (Throwable t) {//事务失败时记录异常,后面会把异常抛出导致应用重启if (firstError == null) {firstError = t;}}LOG.debug("{} - committed checkpoint transaction {}", name(), pendingTransaction);// 事务成功后移除当前的事务pendingTransactionIterator.remove();}if (firstError != null) {// 事务提交失败会抛出异常,导致job异常中止throw new FlinkRuntimeException("Committing one of transactions failed, logging first encountered failure",firstError);}}

总结:

1。事务不能提交失败,如果失败会导致作业失败然后重新提交,如果最终没有成功提交,那么数据会丢失
2。数据库服务端的事务超时时间不能设置太短,不能仅仅大于检查点的间隔大小,原因是上面说的,flink有可能丢失检查点完成后的通知消息,所以服务端的事务超时时间要设置的足够大.

http://www.yayakq.cn/news/906655/

相关文章:

  • 搜狐视频网站联盟怎么做wordpress 聘用
  • 保定 网站建设软件开发重庆网站到首页排名
  • 建网站的服务器网页界面设计的定义
  • 网站建设工作年报临猗县 保障住房和建设住建网站
  • 长沙设备建站按效果付费贺卡制作
  • 网站设计提成多少钱唐兴数码网站
  • 安装网站时出现dir专业的丹阳网站建设
  • 展示网站欣赏室内设计联盟官网app
  • 荆州网站推广怎么做网站美化
  • 南阳网站建设口碑VPS如何做网站服务器
  • 网文网站开发方案怎么关闭自己公司网站
  • 艺术网站制作龙岗网站建设过程
  • 网站怎么做扫码微信支付微信功能定制开发
  • 2021软件公司排名网站树状型结构优化
  • 做网站应该用什么配置的电脑网站设计编辑
  • 网站建设后台实训体会网页设计作品集图片
  • 网站建设可以经营吗成都医院手机网站建设
  • 台州知名的网站建设推广文章的推广渠道
  • 网站推广专员的岗位职责是什么wordpress 自定义分类 模板
  • 两个域名指向同一个网站怎么做apsx做的网站怎么发布
  • 中小企业网站优化ios移动网站开发详解 pdf
  • 开发一个网站需要几个人公司网站建设哪儿济南兴田德润实惠吗
  • 网站使用什么语言好网站建设与管理试题及答案
  • 设计师网站1688免费播放电视剧的app有哪些
  • wordpress 微博兰州seo优化
  • wordpress充值提现网站seo是什么意
  • 广州网站设计有哪些专业ftp建网站
  • 做深度游网站 知乎最近免费中文在线电影
  • 网站开发用什么电子商务网站建设新手
  • 网站建设合同附加协议网站服务器失去响应什么意思