佛山快速建站哪家服务专业,wordpress搬运小红书内容,郑州平台类网站,为企业做好服务优化营商环境Flink 系列文章
1、Flink 部署、概念介绍、source、transformation、sink使用示例、四大基石介绍和示例等系列综合文章链接
13、Flink 的table api与sql的基本概念、通用api介绍及入门示例 14、Flink 的table api与sql之数据类型: 内置数据类型以及它们的属性 15、Flink 的ta…Flink 系列文章
1、Flink 部署、概念介绍、source、transformation、sink使用示例、四大基石介绍和示例等系列综合文章链接
13、Flink 的table api与sql的基本概念、通用api介绍及入门示例 14、Flink 的table api与sql之数据类型: 内置数据类型以及它们的属性 15、Flink 的table api与sql之流式概念-详解的介绍了动态表、时间属性配置如何处理更新结果、时态表、流上的join、流上的确定性以及查询配置 16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及FileSystem示例1 16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及Elasticsearch示例2 16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及Apache Kafka示例3 16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及JDBC示例4 16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及Apache Hive示例6 17、Flink 之Table API: Table API 支持的操作1 17、Flink 之Table API: Table API 支持的操作2 18、Flink的SQL 支持的操作和语法 19、Flink 的Table API 和 SQL 中的内置函数及示例1 19、Flink 的Table API 和 SQL 中的自定义函数及示例2 19、Flink 的Table API 和 SQL 中的自定义函数及示例3 19、Flink 的Table API 和 SQL 中的自定义函数及示例4 20、Flink SQL之SQL Client: 不用编写代码就可以尝试 Flink SQL可以直接提交 SQL 任务到集群上
22、Flink 的table api与sql之创建表的DDL 24、Flink 的table api与sql之Catalogs介绍、类型、java api和sql实现ddl、java api和sql操作catalog-1 24、Flink 的table api与sql之Catalogsjava api操作数据库、表-2 24、Flink 的table api与sql之Catalogsjava api操作视图-3 24、Flink 的table api与sql之Catalogsjava api操作分区与函数-4 25、Flink 的table api与sql之函数(自定义函数示例) 26、Flink 的SQL之概览与入门示例 27、Flink 的SQL之SELECT (select、where、distinct、order by、limit、集合操作和去重)介绍及详细示例1 27、Flink 的SQL之SELECT (SQL Hints 和 Joins)介绍及详细示例2 27、Flink 的SQL之SELECT (窗口函数)介绍及详细示例3 27、Flink 的SQL之SELECT (窗口聚合)介绍及详细示例4 27、Flink 的SQL之SELECT (Group Aggregation分组聚合、Over Aggregation Over聚合 和 Window Join 窗口关联)介绍及详细示例5 27、Flink 的SQL之SELECT (Top-N、Window Top-N 窗口 Top-N 和 Window Deduplication 窗口去重)介绍及详细示例6 27、Flink 的SQL之SELECT (Pattern Recognition 模式检测)介绍及详细示例7 28、Flink 的SQL之DROP 、ALTER 、INSERT 、ANALYZE 语句 29、Flink SQL之DESCRIBE、EXPLAIN、USE、SHOW、LOAD、UNLOAD、SET、RESET、JAR、JOB Statements、UPDATE、DELETE1 29、Flink SQL之DESCRIBE、EXPLAIN、USE、SHOW、LOAD、UNLOAD、SET、RESET、JAR、JOB Statements、UPDATE、DELETE2 30、Flink SQL之SQL 客户端通过kafka和filesystem的例子介绍了配置文件使用-表、视图等 32、Flink table api和SQL 之用户自定义 Sources Sinks实现及详细示例 33、Flink 的Table API 和 SQL 中的时区 41、Flink之Hive 方言介绍及详细示例 42、Flink 的table api与sql之Hive Catalog 43、Flink之Hive 读写及详细验证示例 44、Flink之module模块介绍及使用示例和Flink SQL使用hive内置函数及自定义函数详细示例–网上有些说法好像是错误的 文章目录 Flink 系列文章一、时区1、TIMESTAMP vs TIMESTAMP_LTZ1、TIMESTAMP 类型2、TIMESTAMP_LTZ 类型 2、时区的作用1、确定时间函数的返回值2、TIMESTAMP_LTZ 字符串表示 3、时间属性和时区1、处理时间和时区2、事件时间和时区1、TIMESTAMP 上的事件时间属性2、TIMESTAMP_LTZ 上的事件时间属性 4、夏令时支持5、Batch 模式和 Streaming 模式的区别 本文简单的介绍了Flink 中关于时区的概念并以具体的示例进行说明。 本文依赖flink、kafka集群能正常使用。 本文分为5个部分即TIMESTAMP vs TIMESTAMP_LTZ介绍、时区的作用、时区属性与时区、夏令时支持与流批关于时间的处理区别。 本文的示例是在Flink 1.17版本中运行。
一、时区
Flink 为日期和时间提供了丰富的数据类型 包括 DATE TIME TIMESTAMP TIMESTAMP_LTZ INTERVAL YEAR TO MONTH INTERVAL DAY TO SECOND (更多详情请参考 14、Flink 的table api与sql之数据类型: 内置数据类型以及它们的属性 中的 Date and Time)。 Flink 支持在 session 会话级别设置时区更多详情请参考 15、Flink 的table api与sql之流式概念-详解的介绍了动态表、时间属性配置如何处理更新结果、时态表、流上的join、流上的确定性以及查询配置 中的 Planner 配置 table.local-time-zone 部分。 Flink 对多种时间类型和时区的支持使得跨时区的数据处理变得非常容易。
1、TIMESTAMP vs TIMESTAMP_LTZ
1、TIMESTAMP 类型
TIMESTAMP§ 是 TIMESTAMP§ WITHOUT TIME ZONE 的简写 精度 p 支持的范围是0-9 默认是6。TIMESTAMP 用于描述年 月 日 小时 分钟 秒 和 小数秒对应的时间戳。TIMESTAMP 可以通过一个字符串来指定例如
Flink SQL SELECT TIMESTAMP 1970-01-01 00:00:04.001;
-------------------------
| 1970-01-01 00:00:04.001 |
-------------------------2、TIMESTAMP_LTZ 类型
TIMESTAMP_LTZp 是 TIMESTAMPp WITH LOCAL TIME ZONE 的简写 精度 p 支持的范围是0-9 默认是6。TIMESTAMP_LTZ 用于描述时间线上的绝对时间点 使用 long 保存从 epoch 至今的毫秒数 使用int保存毫秒中的纳秒数。 epoch 时间是从 java 的标准 epoch 时间 1970-01-01T00:00:00Z 开始计算。 在计算和可视化时 每个 TIMESTAMP_LTZ 类型的数据都是使用的 session 会话中配置的时区。TIMESTAMP_LTZ 没有字符串表达形式因此无法通过字符串来指定 可以通过一个 long 类型的 epoch 时间来转化(例如: 通过 Java 来产生一个 long 类型的 epoch 时间 System.currentTimeMillis())
Flink SQL CREATE VIEW T1 AS SELECT TO_TIMESTAMP_LTZ(4001, 3);
Flink SQL SET table.local-time-zone UTC;
Flink SQL SELECT * FROM T1;
---------------------------
| TO_TIMESTAMP_LTZ(4001, 3) |
---------------------------
| 1970-01-01 00:00:04.001 |
---------------------------Flink SQL SET table.local-time-zone Asia/Shanghai;
Flink SQL SELECT * FROM T1;
---------------------------
| TO_TIMESTAMP_LTZ(4001, 3) |
---------------------------
| 1970-01-01 08:00:04.001 |
---------------------------TIMESTAMP_LTZ 可以用于跨时区的计算因为它是一个基于 epoch 的绝对时间点比如上例中的 4001 毫秒代表的就是不同时区的同一个绝对时间点。 补充一个背景知识在同一个时间点 全世界所有的机器上执行 System.currentTimeMillis() 都会返回同样的值。 (比如上例中的 4001 milliseconds), 这就是绝对时间的定义。
2、时区的作用
本地时区定义了当前 session会话所在的时区 你可以在 Sql client 或者应用程序中配置。
java 代码片段示例
EnvironmentSettings envSetting EnvironmentSettings.inStreamingMode();TableEnvironment tEnv TableEnvironment.create(envSetting);// 设置为 UTC 时区tEnv.getConfig().setLocalTimeZone(ZoneId.of(UTC));// 设置为上海时区tEnv.getConfig().setLocalTimeZone(ZoneId.of(Asia/Shanghai));// 设置为 Los_Angeles 时区tEnv.getConfig().setLocalTimeZone(ZoneId.of(America/Los_Angeles));sql client
-- 设置为 UTC 时区
Flink SQL SET table.local-time-zone UTC;
[INFO] Execute statement succeed.-- 设置为上海时区
Flink SQL SET table.local-time-zone America/Los_Angeles;
[INFO] Execute statement succeed.-- 设置为Los_Angeles时区
Flink SQL SET table.local-time-zone Asia/Shanghai;
[INFO] Execute statement succeed.
session会话的时区设置在 Flink SQL 中非常有用 它的主要用法如下:
1、确定时间函数的返回值
session 会话中配置的时区会对以下函数生效。
LOCALTIMELOCALTIMESTAMPCURRENT_DATECURRENT_TIMECURRENT_TIMESTAMPCURRENT_ROW_TIMESTAMP()NOW()PROCTIME()
Flink SQL SET sql-client.execution.result-mode tableau;
[INFO] Execute statement succeed.Flink SQL CREATE VIEW MyView1 AS SELECT LOCALTIME, LOCALTIMESTAMP, CURRENT_DATE, CURRENT_TIME, CURRENT_TIMESTAMP, CURRENT_ROW_TIMESTAMP(), NOW(), PROCTIME();
[INFO] Execute statement succeed.Flink SQL DESC MyView1;
-------------------------------------------------------------------------------
| name | type | null | key | extras | watermark |
-------------------------------------------------------------------------------
| LOCALTIME | TIME(0) | FALSE | | | |
| LOCALTIMESTAMP | TIMESTAMP(3) | FALSE | | | |
| CURRENT_DATE | DATE | FALSE | | | |
| CURRENT_TIME | TIME(0) | FALSE | | | |
| CURRENT_TIMESTAMP | TIMESTAMP_LTZ(3) | FALSE | | | |
| EXPR$5 | TIMESTAMP_LTZ(3) | FALSE | | | |
| EXPR$6 | TIMESTAMP_LTZ(3) | FALSE | | | |
| EXPR$7 | TIMESTAMP_LTZ(3) *PROCTIME* | FALSE | | | |
-------------------------------------------------------------------------------
8 rows in setFlink SQL SET table.local-time-zone UTC;
[INFO] Execute statement succeed.Flink SQL SELECT * FROM MyView1;
------------------------------------------------------------------------------------------------------------------------------------------------------------------------
| op | LOCALTIME | LOCALTIMESTAMP | CURRENT_DATE | CURRENT_TIME | CURRENT_TIMESTAMP | EXPR$5 | EXPR$6 | EXPR$7 |
------------------------------------------------------------------------------------------------------------------------------------------------------------------------
| I | 06:52:14 | 2023-11-10 06:52:14.144 | 2023-11-10 | 06:52:14 | 2023-11-10 06:52:14.144 | 2023-11-10 06:52:14.144 | 2023-11-10 06:52:14.144 | 2023-11-10 06:52:14.145 |
------------------------------------------------------------------------------------------------------------------------------------------------------------------------
Received a total of 1 rowFlink SQL SET table.local-time-zone Asia/Shanghai;
[INFO] Execute statement succeed.Flink SQL SELECT * FROM MyView1;
------------------------------------------------------------------------------------------------------------------------------------------------------------------------
| op | LOCALTIME | LOCALTIMESTAMP | CURRENT_DATE | CURRENT_TIME | CURRENT_TIMESTAMP | EXPR$5 | EXPR$6 | EXPR$7 |
------------------------------------------------------------------------------------------------------------------------------------------------------------------------
| I | 14:52:52 | 2023-11-10 14:52:52.305 | 2023-11-10 | 14:52:52 | 2023-11-10 14:52:52.305 | 2023-11-10 14:52:52.305 | 2023-11-10 14:52:52.305 | 2023-11-10 14:52:52.305 |
------------------------------------------------------------------------------------------------------------------------------------------------------------------------
Received a total of 1 row
2、TIMESTAMP_LTZ 字符串表示
当一个 TIMESTAMP_LTZ 值转为 string 格式时 session 中配置的时区会生效。 例如打印这个值将类型强制转化为 STRING 类型 将类型强制转换为 TIMESTAMP 将 TIMESTAMP 的值转化为 TIMESTAMP_LTZ 类型
Flink SQL CREATE VIEW MyView2 AS SELECT TO_TIMESTAMP_LTZ(4001, 3) AS ltz, TIMESTAMP 1970-01-01 00:00:01.001 AS ntz;
[INFO] Execute statement succeed.Flink SQL DESC MyView2;
-------------------------------------------------------
| name | type | null | key | extras | watermark |
-------------------------------------------------------
| ltz | TIMESTAMP_LTZ(3) | TRUE | | | |
| ntz | TIMESTAMP(3) | FALSE | | | |
-------------------------------------------------------
2 rows in setFlink SQL SET table.local-time-zone UTC;
[INFO] Execute statement succeed.Flink SQL SELECT * FROM MyView2;
------------------------------------------------------
| op | ltz | ntz |
------------------------------------------------------
| I | 1970-01-01 00:00:04.001 | 1970-01-01 00:00:01.001 |
------------------------------------------------------
Received a total of 1 rowFlink SQL SET table.local-time-zone Asia/Shanghai;
[INFO] Execute statement succeed.Flink SQL SELECT * FROM MyView2;
------------------------------------------------------
| op | ltz | ntz |
------------------------------------------------------
| I | 1970-01-01 08:00:04.001 | 1970-01-01 00:00:01.001 |
------------------------------------------------------
Received a total of 1 rowFlink SQL CREATE VIEW MyView3 AS SELECT ltz, CAST(ltz AS TIMESTAMP(3)), CAST(ltz AS STRING), ntz, CAST(ntz AS TIMESTAMP_LTZ(3)) FROM MyView2;
[INFO] Execute statement succeed.Flink SQL SELECT * FROM MyView3;
----------------------------------------------------------------------------------------------------------------------------------------
| op | ltz | EXPR$1 | EXPR$2 | ntz | EXPR$4 |
----------------------------------------------------------------------------------------------------------------------------------------
| I | 1970-01-01 08:00:04.001 | 1970-01-01 08:00:04.001 | 1970-01-01 08:00:04.001 | 1970-01-01 00:00:01.001 | 1970-01-01 00:00:01.001 |
----------------------------------------------------------------------------------------------------------------------------------------
Received a total of 1 row
3、时间属性和时区
更多时间属性相关的详细介绍 请参考15、Flink 的table api与sql之流式概念-详解的介绍了动态表、时间属性配置如何处理更新结果、时态表、流上的join、流上的确定性以及查询配置 中的时间属性配置部分。
1、处理时间和时区
Flink SQL 使用函数 PROCTIME() 来定义处理时间属性 该函数返回的类型是 TIMESTAMP_LTZ 。 在 Flink1.13 之前 PROCTIME() 函数返回的类型是 TIMESTAMP 返回值是UTC时区下的 TIMESTAMP 。 例如 当上海的时间为 2021-11-11 12:00:00 时 PROCTIME() 显示的时间却是错误的 2021-11-11 04:00:00 。 这个问题在 Flink 1.13 中修复了 因此用户不用再去处理时区的问题了。 PROCTIME() 返回的是本地时区的时间 使用 TIMESTAMP_LTZ 类型也可以支持夏令时时间。
Flink SQL SET table.local-time-zone UTC;
[INFO] Execute statement succeed.Flink SQL SELECT PROCTIME();
-----------------------------
| op | EXPR$0 |
-----------------------------
| I | 2023-11-10 06:59:30.998 |
-----------------------------
Received a total of 1 rowFlink SQL SET table.local-time-zone Asia/Shanghai;
[INFO] Execute statement succeed.Flink SQL SELECT PROCTIME();
-----------------------------
| op | EXPR$0 |
-----------------------------
| I | 2023-11-10 14:59:54.031 |
-----------------------------
Received a total of 1 rowFlink SQL CREATE TABLE MyTable1 (item STRING,price DOUBLE,proctime as PROCTIME()) WITH (connector kafka,topic MyTable1,properties.bootstrap.servers 192.168.10.41:9092,192.168.10.42:9092,192.168.10.43:9092,properties.group.id testGroup,scan.startup.mode earliest-offset,format csv);
[INFO] Execute statement succeed.Flink SQL CREATE VIEW MyView3 ASSELECTTUMBLE_START(proctime, INTERVAL 10 MINUTES) AS window_start,TUMBLE_END(proctime, INTERVAL 10 MINUTES) AS window_end,TUMBLE_PROCTIME(proctime, INTERVAL 10 MINUTES) as window_proctime,item,MAX(price) as max_priceFROM MyTable1GROUP BY TUMBLE(proctime, INTERVAL 10 MINUTES), item;
[INFO] Execute statement succeed.Flink SQL DESC MyView3;
-----------------------------------------------------------------------------
| name | type | null | key | extras | watermark |
-----------------------------------------------------------------------------
| window_start | TIMESTAMP(3) | FALSE | | | |
| window_end | TIMESTAMP(3) | FALSE | | | |
| window_proctime | TIMESTAMP_LTZ(3) *PROCTIME* | FALSE | | | |
| item | STRING | TRUE | | | |
| max_price | DOUBLE | TRUE | | | |
-----------------------------------------------------------------------------
5 rows in set
在终端执行以下命令写入数据到 MyTable1
[alanchanserver1 bin]$ kafka-console-producer.sh --broker-list server1:9092 --topic MyTable1
A,1.1
B,1.2
A,1.8
B,2.5
C,3.8Flink SQL SET table.local-time-zone UTC;
[INFO] Execute statement succeed.Flink SQL SELECT * FROM MyView3;
-----------------------------------------------------------------------------------------------------------------------------------------------
| op | window_start | window_end | window_proctime | item | max_price |
-----------------------------------------------------------------------------------------------------------------------------------------------
| I | 2023-11-10 07:10:00.000 | 2023-11-10 07:20:00.000 | 2023-11-10 07:20:00.000 | A | 1.8 |
| I | 2023-11-10 07:10:00.000 | 2023-11-10 07:20:00.000 | 2023-11-10 07:20:00.000 | C | 3.8 |
| I | 2023-11-10 07:10:00.000 | 2023-11-10 07:20:00.000 | 2023-11-10 07:20:00.001 | B | 2.5 |
received a total of 3 rowsFlink SQL SET table.local-time-zone Asia/Shanghai;
[INFO] Execute statement succeed.相比在 UTC 时区下的计算结果 在 Asia/Shanghai 时区下计算的窗口开始时间 窗口结束时间和窗口处理时间是不同的。
Flink SQL SELECT * FROM MyView3;
-----------------------------------------------------------------------------------------------------------------------------------------------
| op | window_start | window_end | window_proctime | item | max_price |
-----------------------------------------------------------------------------------------------------------------------------------------------
| I | 2023-11-10 15:10:00.000 | 2023-11-10 15:20:00.000 | 2023-11-10 15:20:00.000 | A | 1.8 |
| I | 2023-11-10 15:10:00.000 | 2023-11-10 15:20:00.000 | 2023-11-10 15:20:00.000 | C | 3.8 |
| I | 2023-11-10 15:10:00.000 | 2023-11-10 15:20:00.000 | 2023-11-10 15:20:00.001 | B | 2.5 |
received a total of 3 rows处理时间窗口是不确定的 每次运行都会返回不同的窗口和聚合结果。 以上的示例只用于说明时区如何影响处理时间窗口。 2、事件时间和时区
Flink 支持在 TIMESTAMP 列和 TIMESTAMP_LTZ 列上定义时间属性。
1、TIMESTAMP 上的事件时间属性
如果 source 中的时间用于表示年-月-日-小时-分钟-秒 通常是一个不带时区的字符串 例如: 2023-11-13 08:13:40.564。 推荐在 TIMESTAMP 列上定义事件时间属性。
准备测试环境即表、视图和数据
Flink SQL CREATE TABLE MyTable2 (item STRING,price DOUBLE,ts TIMESTAMP(3), -- TIMESTAMP data typeWATERMARK FOR ts AS ts - INTERVAL 10 SECOND) WITH (connector kafka,topic MyTable2,properties.bootstrap.servers 192.168.10.41:9092,192.168.10.42:9092,192.168.10.43:9092,properties.group.id testGroup,scan.startup.mode earliest-offset,format csv);
[INFO] Execute statement succeed.Flink SQL CREATE VIEW MyView4 ASSELECTTUMBLE_START(ts, INTERVAL 10 MINUTES) AS window_start,TUMBLE_END(ts, INTERVAL 10 MINUTES) AS window_end,TUMBLE_ROWTIME(ts, INTERVAL 10 MINUTES) as window_rowtime,item,MAX(price) as max_priceFROM MyTable2GROUP BY TUMBLE(ts, INTERVAL 10 MINUTES), item;
[INFO] Execute statement succeed.Flink SQL DESC MyView4;
-----------------------------------------------------------------------
| name | type | null | key | extras | watermark |
-----------------------------------------------------------------------
| window_start | TIMESTAMP(3) | FALSE | | | |
| window_end | TIMESTAMP(3) | FALSE | | | |
| window_rowtime | TIMESTAMP(3) *ROWTIME* | TRUE | | | |
| item | STRING | TRUE | | | |
| max_price | DOUBLE | TRUE | | | |
-----------------------------------------------------------------------
5 rows in set
在终端执行以下命令用于写入数据到 MyTable2
[alanchanserver1 bin]$ kafka-console-producer.sh --broker-list server1:9092 --topic alan_MyTable2
A,1.1,2023-11-13 08:21:00
B,1.2,2023-11-13 08:22:00
A,1.8,2023-11-13 08:23:00
B,2.5,2023-11-13 08:24:00
C,3.8,2023-11-13 08:25:00
C,3.8,2023-11-13 08:41:00
查看UTC与Asia/Shanghai的查询结果
Flink SQL SET table.local-time-zone UTC;
[INFO] Execute statement succeed.Flink SQL SELECT * FROM MyView4;
-----------------------------------------------------------------------------------------------------------------------------------------------
| op | window_start | window_end | window_rowtime | item | max_price |
-----------------------------------------------------------------------------------------------------------------------------------------------
| I | 2023-11-13 08:20:00.000 | 2023-11-13 08:30:00.000 | 2023-11-13 08:29:59.999 | A | 1.8 |
| I | 2023-11-13 08:20:00.000 | 2023-11-13 08:30:00.000 | 2023-11-13 08:29:59.999 | B | 2.5 |
| I | 2023-11-13 08:20:00.000 | 2023-11-13 08:30:00.000 | 2023-11-13 08:29:59.999 | C | 3.8 |
相比在 UTC 时区下的计算结果 在 Asia/Shanghai 时区下计算的窗口开始时间 窗口结束时间和窗口的 rowtime 是相同的。
Flink SQL SET table.local-time-zone Asia/Shanghai;
[INFO] Execute statement succeed.Flink SQL SELECT * FROM MyView4;
-----------------------------------------------------------------------------------------------------------------------------------------------
| op | window_start | window_end | window_rowtime | item | max_price |
-----------------------------------------------------------------------------------------------------------------------------------------------
| I | 2023-11-13 08:20:00.000 | 2023-11-13 08:30:00.000 | 2023-11-13 08:29:59.999 | A | 1.8 |
| I | 2023-11-13 08:20:00.000 | 2023-11-13 08:30:00.000 | 2023-11-13 08:29:59.999 | B | 2.5 |
| I | 2023-11-13 08:20:00.000 | 2023-11-13 08:30:00.000 | 2023-11-13 08:29:59.999 | C | 3.8 |
2、TIMESTAMP_LTZ 上的事件时间属性
如果源数据中的时间为一个 epoch 时间 通常是一个 long 值 例如: 1618989564564 推荐将事件时间属性定义在 TIMESTAMP_LTZ 列上。
准备测试环境即准备表、视图和数据
Flink SQL CREATE TABLE MyTable3 (item STRING,price DOUBLE,ts BIGINT, -- long time value in epoch millisecondsts_ltz AS TO_TIMESTAMP_LTZ(ts, 3),WATERMARK FOR ts_ltz AS ts_ltz - INTERVAL 10 SECOND) WITH (connector kafka,topic alan_MyTable3,properties.bootstrap.servers 192.168.10.41:9092,192.168.10.42:9092,192.168.10.43:9092,properties.group.id testGroup,scan.startup.mode earliest-offset,format csv);
[INFO] Execute statement succeed.Flink SQL CREATE VIEW MyView5 AS SELECT TUMBLE_START(ts_ltz, INTERVAL 10 MINUTES) AS window_start, TUMBLE_END(ts_ltz, INTERVAL 10 MINUTES) AS window_end,TUMBLE_ROWTIME(ts_ltz, INTERVAL 10 MINUTES) as window_rowtime,item,MAX(price) as max_priceFROM MyTable3GROUP BY TUMBLE(ts_ltz, INTERVAL 10 MINUTES), item;
[INFO] Execute statement succeed.Flink SQL DESC MyView5;
---------------------------------------------------------------------------
| name | type | null | key | extras | watermark |
---------------------------------------------------------------------------
| window_start | TIMESTAMP(3) | FALSE | | | |
| window_end | TIMESTAMP(3) | FALSE | | | |
| window_rowtime | TIMESTAMP_LTZ(3) *ROWTIME* | TRUE | | | |
| item | STRING | TRUE | | | |
| max_price | DOUBLE | TRUE | | | |
---------------------------------------------------------------------------
5 rows in set
在终端执行以下命令用于写入数据到 MyTable3
[alanchanserver1 bin]$ kafka-console-producer.sh --broker-list server1:9092 --topic alan_MyTable3
A,1.1,1699836971034 # The corresponding utc timestamp is 2023-11-13 08:56:xx
B,1.2,1699837031044 # The corresponding utc timestamp is 2023-11-13 08:57:xx
A,1.8,1699837091052 # The corresponding utc timestamp is 2023-11-13 08:58:xx
B,2.5,1699837091052 # The corresponding utc timestamp is 2023-11-13 08:59:xx
C,3.8,1699837211069 # The corresponding utc timestamp is 2023-11-13 09:00:xx
C,3.8,1699837271070 # The corresponding utc timestamp is 2023-11-13 09:01:xx
查看UTC与Asia/Shanghai的查询结果
Flink SQL SET table.local-time-zone UTC;
[INFO] Execute statement succeed.Flink SQL SELECT * FROM MyView5;
-----------------------------------------------------------------------------------------------------------------------------------------------
| op | window_start | window_end | window_rowtime | item | max_price |
-----------------------------------------------------------------------------------------------------------------------------------------------
| I | 2023-11-13 00:50:00.000 | 2023-11-13 01:00:00.000 | 2023-11-13 00:59:59.999 | A | 1.8 |
| I | 2023-11-13 00:50:00.000 | 2023-11-13 01:00:00.000 | 2023-11-13 00:59:59.999 | B | 2.5 |
相比在 UTC 时区下的计算结果 在 Asia/Shanghai 时区下计算的窗口开始时间 窗口结束时间和窗口的 rowtime 是不同的。
Flink SQL SET table.local-time-zone Asia/Shanghai;
[INFO] Execute statement succeed.Flink SQL SELECT * FROM MyView5;
-----------------------------------------------------------------------------------------------------------------------------------------------
| op | window_start | window_end | window_rowtime | item | max_price |
-----------------------------------------------------------------------------------------------------------------------------------------------
| I | 2023-11-13 08:50:00.000 | 2023-11-13 09:00:00.000 | 2023-11-13 08:59:59.999 | A | 1.8 |
| I | 2023-11-13 08:50:00.000 | 2023-11-13 09:00:00.000 | 2023-11-13 08:59:59.999 | B | 2.5 |
4、夏令时支持
Flink SQL支持在 TIMESTAMP_LTZ列上定义时间属性 基于这一特征Flink SQL 在窗口中使用 TIMESTAMP 和 TIMESTAMP_LTZ 类型优雅地支持了夏令时。
Flink 使用时间戳的字符格式来分割窗口并通过每条记录对应的 epoch 时间来分配窗口。 这意味着 Flink 窗口开始时间和窗口结束时间使用的是 TIMESTAMP 类型例如: TUMBLE_START 和 TUMBLE_END 窗口的时间属性使用的是 TIMESTAMP_LTZ 类型例如: TUMBLE_PROCTIME TUMBLE_ROWTIME。
给定一个 tumble window示例 在 Los_Angeles 时区下夏令时从 2021-03-14 02:00:00 开始
long epoch1 1615708800000L; // 2021-03-14 00:00:00
long epoch2 1615712400000L; // 2021-03-14 01:00:00
long epoch3 1615716000000L; // 2021-03-14 03:00:00, 手表往前拨一小时跳过 (2021-03-14 02:00:00)
long epoch4 1615719600000L; // 2021-03-14 04:00:00 在 Los_angele 时区下 tumble window [2021-03-14 00:00:00, 2021-03-14 00:04:00] 将会收集3个小时的数据 在其他非夏令时的时区下将会收集4个小时的数据用户只需要在 TIMESTAMP_LTZ 列上声明时间属性即可。
Flink 的所有窗口如 Hop window Session window Cumulative window都会遵循这种方式 Flink SQL 中的所有操作都很好地支持了 TIMESTAMP_LTZ 类型因此Flink可以非常优雅的支持夏令时。
5、Batch 模式和 Streaming 模式的区别
以下函数
LOCALTIMELOCALTIMESTAMPCURRENT_DATECURRENT_TIMECURRENT_TIMESTAMPNOW()
Flink 会根据执行模式来进行不同计算在 Streaming 模式下这些函数是每条记录都会计算一次但在 Batch 模式下只会在 query 开始时计算一次所有记录都使用相同的结果。
以下时间函数无论是在 Streaming 模式还是 Batch 模式下都会为每条记录计算一次结果
CURRENT_ROW_TIMESTAMP()PROCTIME()
以上简单的介绍了Flink 中关于时区的概念并以具体的示例进行说明。