当前位置: 首页 > news >正文

最早做淘宝返利的网站电子商务网站的特色

最早做淘宝返利的网站,电子商务网站的特色,网站免费空间,wordpress 主题 导出背景 在某个场景中,需要从Kafka中获取数据,经过转换处理后,需要同时sink到多个输出源中(kafka、mysql、hologres)等。两次调用execute, 阿里云Flink vvr引擎报错: public static void main(String[] args) {final StreamExecuti…

背景

在某个场景中,需要从Kafka中获取数据,经过转换处理后,需要同时sink到多个输出源中(kafka、mysql、hologres)等。两次调用execute, 阿里云Flink vvr引擎报错:

public static void main(String[] args) {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);StreamStatementSet streamStatementSet = tEnv.createStatementSet();String s = LocalDateTimeUtils.getDateTime(System.currentTimeMillis());DataStream<String> dataStream = env.fromElements(s, LocalDateTimeUtils.getDateTime(System.currentTimeMillis()));tEnv.executeSql(KAFKA_TABLE_SQL);tEnv.executeSql(KAFKA_TABLE_SQL_1);Table table = tEnv.fromDataStream(dataStream);table.insertInto("kafka_sink").execute();table.insertInto("kafka_sink_1").execute();streamStatementSet.execute();}
Caused by: org.apache.flink.util.FlinkRuntimeException: Cannot have more than one execute() or executeAsync() call in a single environment.at org.apache.flink.client.program.StreamContextEnvironment.validateAllowedExecution(StreamContextEnvironment.java:199) ~[flink-dist-1.15-vvr-6.0.7-1-SNAPSHOT.jar:1.15-vvr-6.0.7-1-SNAPSHOT]at org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:187) ~[flink-dist-1.15-vvr-6.0.7-1-SNAPSHOT.jar:1.15-vvr-6.0.7-1-SNAPSHOT]at org.apache.flink.table.planner.delegation.DefaultExecutor.executeAsync(DefaultExecutor.java:110) ~[?:?]at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:877) ~[flink-table-api-java-uber-1.15-vvr-6.0.7-1-SNAPSHOT.jar:1.15-vvr-6.0.7-1-SNAPSHOT]at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:756) ~[flink-table-api-java-uber-1.15-vvr-6.0.7-1-SNAPSHOT.jar:1.15-vvr-6.0.7-1-SNAPSHOT]at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:955) ~[flink-table-api-java-uber-1.15-vvr-6.0.7-1-SNAPSHOT.jar:1.15-vvr-6.0.7-1-SNAPSHOT]at org.apache.flink.table.api.internal.TablePipelineImpl.execute(TablePipelineImpl.java:57) ~[flink-table-api-java-uber-1.15-vvr-6.0.7-1-SNAPSHOT.jar:1.15-vvr-6.0.7-1-SNAPSHOT]

解决

使用 StreamStatementSet. 具体参考官网:
https://nightlies.apache.org/flink/flink-docs-release-1.15/zh/docs/dev/table/data_stream_api/#converting-between-datastream-and-table

改良后的代码:

public static void main(String[] args) {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);StreamStatementSet streamStatementSet = tEnv.createStatementSet();String s = LocalDateTimeUtils.getDateTime(System.currentTimeMillis());DataStream<String> dataStream = env.fromElements(s, LocalDateTimeUtils.getDateTime(System.currentTimeMillis()));tEnv.executeSql(KAFKA_TABLE_SQL);tEnv.executeSql(KAFKA_TABLE_SQL_1);Table table = tEnv.fromDataStream(dataStream);streamStatementSet.addInsert("kafka_sink", table);streamStatementSet.addInsert("kafka_sink_1", table);streamStatementSet.execute();}
http://www.yayakq.cn/news/813217/

相关文章:

  • 站长工具seo综合查询全面解析做pc端网站价位
  • 宝安西乡做网站丽江网站开发
  • 无锡网站制作排名微信商城模板
  • 做网站用哪个写比较好wordpress评论区插件
  • 做类似58同城的网站创建网站时可使用的数据库有
  • 深圳企业网站建设制作wordpress留言标签板
  • 做骗子网站学网站建设需要什么软件有哪些
  • 北京网站开发网络公司吉林最新消息今天新增
  • 兰州展柜公司网站建设上海市有哪些公司
  • 建设网站申请空间需要多少钱wordpress调用二级分类目录
  • 国内最大的软件开发商成都官网seo厂家
  • 做医药代表去什么招聘网站seo网站优化方法
  • 网站建设费计入什么科目比较好<网站建设与运营》
  • 河北通信网站建设网站建设怎么用长尾做标题
  • 建设银行网站查余额阜阳市建设工程质量检测站网站
  • 惠州营销网站建设公司互联网技术包括哪些
  • 网站的代码在哪里设置北京王府井在几环
  • 苏州网络科技公司建网站整个网站全是图片做的
  • 制作网页心得做网站优化好的网络公司
  • 网站程序 seo基于wordpress的博客
  • 梦扬科技 合肥网站建设jsp网站开发工具
  • 商城网站建设 优帮云下什么软件做网站
  • 重庆市建设工程管理协会网站活动线报资源网
  • iis网站权限怎么设置网页加速器手机版
  • 如何区分官方网站和空壳网站深圳南园网站建设
  • 盐城市网站建设公司布局网站开发
  • 更新网站怎么弄seo提升关键词排名
  • 网站建设与运行的盈利收入黑龙江省中国建设银行网站首页
  • 专业做室内设计的网站有哪些内容建设网站女装名字大全
  • 网站网络投票建设步骤合肥网站建合肥网站建设找蓝领商务