当前位置: 首页 > news >正文

热度网络网站建设wordpress配置七牛cdn

热度网络网站建设,wordpress配置七牛cdn,英文建站网站,校园网站建设软件背景 项目中有很多ods层(mysql 通过cannal)kafka,需要对这些ods kakfa做一些etl操作后写入下一层的kafka(dwd层)。 一开始采用的是executeSql方式来执行每个ods→dwd层操作,即类似: def main(…

背景

项目中有很多ods层(mysql 通过cannal)kafka,需要对这些ods kakfa做一些etl操作后写入下一层的kafka(dwd层)。

一开始采用的是executeSql方式来执行每个ods→dwd层操作,即类似:

 def main(args: Array[String]): Unit = {val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironmentval tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env)val configuration: Configuration = tableEnv.getConfig.getConfigurationtableEnv.createTemporarySystemFunction("etl_handle", classOf[ETLFunction])// source/sink ddltableEnv.executeSql(CREATE_DB_DDL)tableEnv.executeSql(SOURCE_KAFKA_ODS_TABLE1)tableEnv.executeSql(SINK_KAFKA_DWD_TABLE1)tableEnv.executeSql(SOURCE_KAFKA_ODS_TABLE2)tableEnv.executeSql(SINK_KAFKA_DWD_TABLE2)....// insert dml,在insert语句中调用etl_handle进行预处理和写入tableEnv.executeSql(INSERT_DWD_TABLE1)tableEnv.executeSql(INSERT_DWD_TABLE2)... 
}

当有多个ods->dwd操作放在同一个flink作业中时,发现这种方式会导致每次insert操作都是单独的DAG,非常消耗资源,特别是这些处理都是比较轻量级的,最好是能融合在同一个DAG中共享资源。

解决方案

查看flink文档:INSERT 语句 | Apache Flink

因此,可以采用statementset的方式,将不同insert sql进行分组执行,每组的insert sql会先被缓存到 StatementSet 中,并在StatementSet.execute() 方法被调用时,同一组的 insert sql(sink) 会被优化成一张DAG共用taskmanager,减少资源浪费,即类似:

def main(args: Array[String]): Unit = {val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironmentval tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env)val configuration: Configuration = tableEnv.getConfig.getConfigurationtableEnv.createTemporarySystemFunction("etl_handle", classOf[ETLFunction])// source/sink ddltableEnv.executeSql(CREATE_DB_DDL)tableEnv.executeSql(SOURCE_KAFKA_ODS_TABLE1)tableEnv.executeSql(SINK_KAFKA_DWD_TABLE1)tableEnv.executeSql(SOURCE_KAFKA_ODS_TABLE2)tableEnv.executeSql(SINK_KAFKA_DWD_TABLE2)....// insert dmltableEnv.createStatementSet().addInsertSql(INSERT_DWD_TABLE1).addInsertSql(INSERT_DWD_TABLE2).addInsertSql(INSERT_DWD_TABLE3).execute()tableEnv.createStatementSet().addInsertSql(INSERT_DWD_TABLE4).addInsertSql(INSERT_DWD_TABLE5).addInsertSql(INSERT_DWD_TABLE6).execute()
}

其他

如果是纯flink sql而不用data stream api,也是可以达到同样的效果的。

http://www.yayakq.cn/news/30976/

相关文章:

  • 上海网站制作上海网站制作邯郸网站开发定制
  • 网站关键词修改网站建设的技巧
  • 织梦开发网站芜湖市建设银行支行网站
  • 网站升级改版摄影作品投稿平台
  • 用phpnow搭建网站的整个流程网页修改和编辑的软件有哪些
  • 如何做自助搜券网站做视频网站用什么源码
  • wordpress 手机站目录seo在线诊断工具
  • 网页首站工商注册公司名称核准
  • 成都网站公司ccyy切换路线专线
  • 外贸网站建设 深圳网站建设课程设计内容
  • 石家庄中小企业网站制作重庆公司做网站
  • 青浦网站设计常州 网站设计
  • 100个免费推广网站定制网站与模板建站维护
  • 网站建设公司好发信息网网络销售推广是做什么的具体
  • 中国网站建设的利弊中资源 网站域名解析
  • 网站没有做伪静态是什么样子创鑫时代广告公司简介
  • 青岛网站设计公司成都手机网站建设价格
  • 网站建设工作总结范文用帝国做的网站只收录首页
  • 做甜品台的网站青岛网站制作服务
  • 阿里云上可以做网站吗百度seo搜索排名
  • win7局域网网站开发秦皇岛海三建设一分公司
  • 国外html5网站源码口腔医院网站做优化
  • dw用ps切片做网站公司资质查询官方网站
  • 旅行社网站开发 论文网站添加百度商桥
  • 做图片详情网站网页制作图片切换
  • 网站建设公司自贡网络培训的感受
  • 怎么查询网站开发时间wordpress调用搜索结果
  • 东莞企业网站公司wordpress优化
  • 网站建设和网络推广哪个难做大连建立网页
  • 广州做护肤品的网站推广计划方案模板