当前位置: 首页 > news >正文

哪些网站可以做养殖的广告太原建站的模板

哪些网站可以做养殖的广告,太原建站的模板,导航网站开发,收录平台在《0基础学习PyFlink——使用PyFlink的SQL进行字数统计》一文中,我们直接执行了Select查询操作,在终端中直接看到了查询结果。 select word, count(1) as count from source group by word; ------------------------------------------------------ |…

在《0基础学习PyFlink——使用PyFlink的SQL进行字数统计》一文中,我们直接执行了Select查询操作,在终端中直接看到了查询结果。

select word, count(1) as `count` from source group by word;
+--------------------------------+----------------------+
|                           word |                count |
+--------------------------------+----------------------+
|                              A |                    3 |
|                              B |                    1 |
|                              C |                    2 |
|                              D |                    2 |
|                              E |                    1 |
+--------------------------------+----------------------+

在生产环境,我们往往要将计算结果保存到外部系统中,比如Mysql等。这个时候我们就要使用Sink。

Sink

Sink用于将Reduce结果输出到外部系统。它也是通过一个表(Table)来表示结构。这个和MapReduce思路中的Map很类似。

Print

为了简单起见,我们让Sink的表连接的外部系统是print。这样我们就可以在控制台上看到数据。

    # define the sinkmy_sink_ddl = """CREATE TABLE WordsCountTableSink (`word` STRING,`count` BIGINT) WITH ('connector' = 'print');"""t_env.execute_sql(my_sink_ddl).print()

需要强调的是,我们没有给sink的表创建主键。这个会在后面文章中作为一个对比案例进行分析。
这一步只能创建表和连接器,具体执行还要执行下一步。

Execute

因为source和WordsCountTableSink是两张表,分别表示数据的输入和输出结构。如果要打通输入和输出,则需要将source表中的数据通过某些计算,插入到WordsCountTableSink表中。于是我们主要使用的是insert into指令。

    # execute insertmy_select_ddl = """insert into WordsCountTableSinkselect word, count(1) as `count`from sourcegroup by word"""t_env.execute_sql(my_select_ddl).wait()

完整代码如下

import argparse
import logging
import sysfrom pyflink.common import Configuration
from pyflink.table import (EnvironmentSettings, TableEnvironment)def word_count(input_path):config = Configuration()# write all the data to one fileconfig.set_string('parallelism.default', '1')env_settings = EnvironmentSettings \.new_instance() \.in_batch_mode() \.with_configuration(config) \.build()t_env = TableEnvironment.create(env_settings)# define the sourcemy_source_ddl = """create table source (word STRING) with ('connector' = 'filesystem','format' = 'csv','path' = '{}')""".format(input_path)t_env.execute_sql(my_source_ddl).print()tab = t_env.from_path('source')# define the sinkmy_sink_ddl = """CREATE TABLE WordsCountTableSink (`word` STRING,`count` BIGINT) WITH ('connector' = 'print');"""t_env.execute_sql(my_sink_ddl).print()# execute insertmy_select_ddl = """insert into WordsCountTableSinkselect word, count(1) as `count`from sourcegroup by word"""t_env.execute_sql(my_select_ddl).wait()if __name__ == '__main__':logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s")parser = argparse.ArgumentParser()parser.add_argument('--input',dest='input',required=False,help='Input file to process.')argv = sys.argv[1:]known_args, _ = parser.parse_known_args(argv)word_count(known_args.input)

执行命令如下

python sql_print.py --input input1.csv

输出结果如下

Using Any for unsupported type: typing.Sequence[~T]
No module named google.cloud.bigquery_storage_v1. As a result, the ReadFromBigQuery transform CANNOT be used with method=DIRECT_READ.
OK
OK
+I[A, 3]
+I[B, 1]
+I[C, 2]
+I[D, 2]
+I[E, 1]

因为使用的是批处理模式(in_batch_mode),我们看到Flink将所有数据计算完整成,成批的执行了新增操作(+代表新增)。这块对比我们将在后续将流处理时介绍区别。
附上input1.csv内容

"A",
"B",
"C",
"D",
"A",
"E",
"C",
"D",
"A",
http://www.yayakq.cn/news/943219/

相关文章:

  • 网络商务网站seo优化是什么意思
  • 网站域名销售公司做网站走什么费
  • 桂林技术交流站网站学什么
  • 内部网站建设计划东莞建设网站的位置
  • 石家庄百度推广家庄网站建设应急管理部
  • WordPress网站根目录有哪些营销智库网站
  • 太原网站制作哪里便宜企业推广策略
  • 网站项目设计与制作优化系统功能
  • 做网站的时候想要满屏简述电子政务网站设计的技术
  • 网站做行测题租远程服务器
  • 网站建设如何使图片翻转重庆市建设工程信息网官网入口
  • 大连网站建设公司wordpress sqll
  • 写作网站大全营销模板WordPress
  • 各类网站厦门网站排名优化费用
  • 找单位做网站需要注意什么祝明电子商务网站建设实验报告
  • 长沙网站建设 鼎誉郴州网站推广
  • 中铁建设集团北京工程有限公司网站如何做搜索引擎优化
  • 医疗网站seo怎么做新产品上市的营销策划方案
  • 商水住房城乡建设网站贵阳专业做网站公司有哪些
  • 网站建设管理规定传媒公司主要做什么
  • 个人或主题网站建设实验报告网络营销产生的基础包括
  • 江苏省高职重点专业群建设网站跨越网站建设科技有限公司
  • 制图网站品牌建设调研
  • 在网站上做漂浮网络公司怎么优化网站
  • 网站功能优化的意义网站的优点有哪些方面
  • 如何写一份网站优化建设的方案百度app下载并安装
  • 在哪请人做网站外贸优秀网站
  • 东莞营销型网站装修设计用什么软件
  • 志愿者网站建设wordpress修改网页
  • 策划的网站推广公司哪家好