首先说一下这个问题的背景: 大家想一下离线 Hive 环境中,有遇到过时区时区相关的问题问题吗? 至少博主目前没有碰到过,因为这个问题在底层的所然时区数据集成系统都已经给解决了,小伙伴萌拿到手的问题 ODS 层表都是已经按照所在地区的时区给格式化好的了。 举个例子:小伙伴萌看到日期分区为 2022-01-01 的所然时区 Hive 表时,可以默认认为该分区中的问题数据就对应到你所在地区的时区的 2022-01-01 日的数据。 但是所然时区 Flink 中时区问题要特别引起关注,不加小心就会误用。问题 而本节 SQL 时区旨在帮助大家了解到以下两个场景的所然时区问题: 而以上两个场景就会导致: 因此充分了解本节的知识内容可以很好的帮你避免时区问题错误。 以下 SQL 中的时间函数都会受到时区参数的影响,从而做到最后显示给用户的时间、窗口的划分都按照用户设置时区之内的时间。 在 Flink SQL client 中执行结果如下: Flink SQL> SET sql-client.execution.result-mode=tableau; Flink SQL> CREATE VIEW MyView1 AS SELECT LOCALTIME, LOCALTIMESTAMP, CURRENT_DATE, CURRENT_TIME, CURRENT_TIMESTAMP, CURRENT_ROW_TIMESTAMP(), NOW(), PROCTIME(); Flink SQL> DESC MyView1; +------------------------+-----------------------------+-------+-----+--------+-----------+ | name | type | null | key | extras | watermark | +------------------------+-----------------------------+-------+-----+--------+-----------+ | LOCALTIME | TIME(0) | false | | | | | LOCALTIMESTAMP | TIMESTAMP(3) | false | | | | | CURRENT_DATE | DATE | false | | | | | CURRENT_TIME | TIME(0) | false | | | | | CURRENT_TIMESTAMP | TIMESTAMP_LTZ(3) | false | | | | |CURRENT_ROW_TIMESTAMP() | TIMESTAMP_LTZ(3) | false | | | | | NOW() | TIMESTAMP_LTZ(3) | false | | | | | PROCTIME() | TIMESTAMP_LTZ(3) *PROCTIME* | false | | | | +------------------------+-----------------------------+-------+-----+--------+-----------+ Flink SQL> SET table.local-time-zone=UTC; Flink SQL> SELECT * FROM MyView1; +-----------+-------------------------+--------------+--------------+-------------------------+-------------------------+-------------------------+-------------------------+ | LOCALTIME | LOCALTIMESTAMP | CURRENT_DATE | CURRENT_TIME | CURRENT_TIMESTAMP | CURRENT_ROW_TIMESTAMP() | NOW() | PROCTIME() | +-----------+-------------------------+--------------+--------------+-------------------------+-------------------------+-------------------------+-------------------------+ | 15:18:36 | 2021-04-15 15:18:36.384 | 2021-04-15 | 15:18:36 | 2021-04-15 15:18:36.384 | 2021-04-15 15:18:36.384 | 2021-04-15 15:18:36.384 | 2021-04-15 15:18:36.384 | +-----------+-------------------------+--------------+--------------+-------------------------+-------------------------+-------------------------+-------------------------+ Flink SQL> SET table.local-time-zone=Asia/Shanghai; Flink SQL> SELECT * FROM MyView1; +-----------+-------------------------+--------------+--------------+-------------------------+-------------------------+-------------------------+-------------------------+ | LOCALTIME | LOCALTIMESTAMP | CURRENT_DATE | CURRENT_TIME | CURRENT_TIMESTAMP | CURRENT_ROW_TIMESTAMP() | NOW() | PROCTIME() | +-----------+-------------------------+--------------+--------------+-------------------------+-------------------------+-------------------------+-------------------------+ | 23:18:36 | 2021-04-15 23:18:36.384 | 2021-04-15 | 23:18:36 | 2021-04-15 23:18:36.384 | 2021-04-15 23:18:36.384 | 2021-04-15 23:18:36.384 | 2021-04-15 23:18:36.384 | +-----------+-------------------------+--------------+--------------+-------------------------+-------------------------+-------------------------+-------------------------+ Flink SQL> CREATE VIEW MyView2 AS SELECT TO_TIMESTAMP_LTZ(4001, 3) AS ltz, TIMESTAMP 1970-01-01 00:00:01.001 AS ntz; Flink SQL> DESC MyView2; +------+------------------+-------+-----+--------+-----------+ | name | type | null | key | extras | watermark | +------+------------------+-------+-----+--------+-----------+ | ltz | TIMESTAMP_LTZ(3) | true | | | | | ntz | TIMESTAMP(3) | false | | | | +------+------------------+-------+-----+--------+-----------+ Flink SQL> SET table.local-time-zone=UTC; Flink SQL> SELECT * FROM MyView2; +-------------------------+-------------------------+ | ltz | ntz | +-------------------------+-------------------------+ | 1970-01-01 00:00:04.001 | 1970-01-01 00:00:01.001 | +-------------------------+-------------------------+ Flink SQL> SET table.local-time-zone=Asia/Shanghai; Flink SQL> SELECT * FROM MyView2; +-------------------------+-------------------------+ | ltz | ntz | +-------------------------+-------------------------+ | 1970-01-01 08:00:04.001 | 1970-01-01 00:00:01.001 | +-------------------------+-------------------------+ Flink SQL> CREATE VIEW MyView3 AS SELECT ltz, CAST(ltz AS TIMESTAMP(3)), CAST(ltz AS STRING), ntz, CAST(ntz AS TIMESTAMP_LTZ(3)) FROM MyView2; Flink SQL> DESC MyView3; +-------------------------------+------------------+-------+-----+--------+-----------+ | name | type | null | key | extras | watermark | +-------------------------------+------------------+-------+-----+--------+-----------+ | ltz | TIMESTAMP_LTZ(3) | true | | | | | CAST(ltz AS TIMESTAMP(3)) | TIMESTAMP(3) | true | | | | | CAST(ltz AS STRING) | STRING | true | | | | | ntz | TIMESTAMP(3) | false | | | | | CAST(ntz AS TIMESTAMP_LTZ(3)) | TIMESTAMP_LTZ(3) | false | | | | +-------------------------------+------------------+-------+-----+--------+-----------+ Flink SQL> SELECT * FROM MyView3; +-------------------------+---------------------------+-------------------------+-------------------------+-------------------------------+ | ltz | CAST(ltz AS TIMESTAMP(3)) | CAST(ltz AS STRING) | ntz | CAST(ntz AS TIMESTAMP_LTZ(3)) | +-------------------------+---------------------------+-------------------------+-------------------------+-------------------------------+ | 1970-01-01 08:00:04.001 | 1970-01-01 08:00:04.001 | 1970-01-01 08:00:04.001 | 1970-01-01 00:00:01.001 | 1970-01-01 00:00:01.001 | 这里分两类,分别是 TIMESTAMP(不带时区信息的时间)、TIMESTAMP_LTZ(带时区信息的时间) 的事件时间 Flink SQL 任务。 TIMESTAMP(不带时区信息的时间)Flink SQL> CREATE TABLE MyTable2 ( item STRING, price DOUBLE, ts TIMESTAMP(3), -- TIMESTAMP 类型的时间戳 WATERMARK FOR ts AS ts - INTERVAL 10 SECOND ) WITH ( connector = socket, hostname = 127.0.0.1, port = 9999, format = csv ); Flink SQL> CREATE VIEW MyView4 AS SELECT TUMBLE_START(ts, INTERVAL 10 MINUTES) AS window_start, TUMBLE_END(ts, INTERVAL 10 MINUTES) AS window_end, TUMBLE_ROWTIME(ts, INTERVAL 10 MINUTES) as window_rowtime, item, MAX(price) as max_price FROM MyTable2 GROUP BY TUMBLE(ts, INTERVAL 10 MINUTES), item; Flink SQL> DESC MyView4; +----------------+------------------------+------+-----+--------+-----------+ | name | type | null | key | extras | watermark | +----------------+------------------------+------+-----+--------+-----------+ | window_start | TIMESTAMP(3) | true | | | | | window_end | TIMESTAMP(3) | true | | | | | window_rowtime | TIMESTAMP(3) *ROWTIME* | true | | | | | item | STRING | true | | | | | max_price | DOUBLE | true | | | | 将数据写入到 MyTable2 中: > nc -lk 9999 A,1.1,2021-04-15 14:01:00 B,1.2,2021-04-15 14:02:00 A,1.8,2021-04-15 14:03:00 B,2.5,2021-04-15 14:04:00 C,3.8,2021-04-15 14:05:00 最终结果如下: Flink SQL> SET table.local-time-zone=UTC; Flink SQL> SELECT * FROM MyView4; +-------------------------+-------------------------+-------------------------+------+-----------+ | window_start | window_end | window_rowtime | item | max_price | +-------------------------+-------------------------+-------------------------+------+-----------+ | 2021-04-15 14:00:00.000 | 2021-04-15 14:10:00.000 | 2021-04-15 14:09:59.999 | A | 1.8 | | 2021-04-15 14:00:00.000 | 2021-04-15 14:10:00.000 | 2021-04-15 14:09:59.999 | B | 2.5 | | 2021-04-15 14:00:00.000 | 2021-04-15 14:10:00.000 | 2021-04-15 14:09:59.999 | C | 3.8 | +-------------------------+-------------------------+-------------------------+------+-----------+ Flink SQL> SET table.local-time-zone=Asia/Shanghai; Flink SQL> SELECT * FROM MyView4; +-------------------------+-------------------------+-------------------------+------+-----------+ | window_start | window_end | window_rowtime | item | max_price | +-------------------------+-------------------------+-------------------------+------+-----------+ | 2021-04-15 14:00:00.000 | 2021-04-15 14:10:00.000 | 2021-04-15 14:09:59.999 | A | 1.8 | | 2021-04-15 14:00:00.000 | 2021-04-15 14:10:00.000 | 2021-04-15 14:09:59.999 | B | 2.5 | | 2021-04-15 14:00:00.000 | 2021-04-15 14:10:00.000 | 2021-04-15 14:09:59.999 | C | 3.8 | 通过上述结果可见,使用 TIMESTAMP(不带时区信息的时间) 进开窗,在 UTC 时区下的计算结果与在 Asia/Shanghai 时区下计算的窗口开始时间,窗口结束时间和窗口的时间是相同的。 TIMESTAMP_LTZ(带时区信息的时间)Flink SQL> CREATE TABLE MyTable3 ( item STRING, price DOUBLE, ts BIGINT, -- long 类型的时间戳 ts_ltz AS TO_TIMESTAMP_LTZ(ts, 3), -- 转为 TIMESTAMP_LTZ 类型的时间戳 WATERMARK FOR ts_ltz AS ts_ltz - INTERVAL 10 SECOND ) WITH ( connector = socket, hostname = 127.0.0.1, port = 9999, format = csv ); Flink SQL> CREATE VIEW MyView5 AS SELECT TUMBLE_START(ts_ltz, INTERVAL 10 MINUTES) AS window_start, TUMBLE_END(ts_ltz, INTERVAL 10 MINUTES) AS window_end, TUMBLE_ROWTIME(ts_ltz, INTERVAL 10 MINUTES) as window_rowtime, item, MAX(price) as max_price FROM MyTable3 GROUP BY TUMBLE(ts_ltz, INTERVAL 10 MINUTES), item; Flink SQL> DESC MyView5; +----------------+----------------------------+-------+-----+--------+-----------+ | name | type | null | key | extras | watermark | +----------------+----------------------------+-------+-----+--------+-----------+ | window_start | TIMESTAMP(3) | false | | | | | window_end | TIMESTAMP(3) | false | | | | | window_rowtime | TIMESTAMP_LTZ(3) *ROWTIME* | true | | | | | item | STRING | true | | | | | max_price | DOUBLE | true | | | | 将数据写入 MyTable3: A,1.1,1618495260000 # 对应到 UTC 时区的时间为 2021-04-15 14:01:00 B,1.2,1618495320000 # 对应到 UTC 时区的时间为 2021-04-15 14:02:00 A,1.8,1618495380000 # 对应到 UTC 时区的时间为 2021-04-15 14:03:00 B,2.5,1618495440000 # 对应到 UTC 时区的时间为 2021-04-15 14:04:00 C,3.8,1618495500000 # 对应到 UTC 时区的时间为 2021-04-15 14:05:00 最终结果如下: Flink SQL> SET table.local-time-zone=UTC; Flink SQL> SELECT * FROM MyView5; +-------------------------+-------------------------+-------------------------+------+-----------+ | window_start | window_end | window_rowtime | item | max_price | +-------------------------+-------------------------+-------------------------+------+-----------+ | 2021-04-15 14:00:00.000 | 2021-04-15 14:10:00.000 | 2021-04-15 14:09:59.999 | A | 1.8 | | 2021-04-15 14:00:00.000 | 2021-04-15 14:10:00.000 | 2021-04-15 14:09:59.999 | B | 2.5 | | 2021-04-15 14:00:00.000 | 2021-04-15 14:10:00.000 | 2021-04-15 14:09:59.999 | C | 3.8 | +-------------------------+-------------------------+-------------------------+------+-----------+ Flink SQL> SET table.local-time-zone=Asia/Shanghai; Flink SQL> SELECT * FROM MyView5; +-------------------------+-------------------------+-------------------------+------+-----------+ | window_start | window_end | window_rowtime | item | max_price | +-------------------------+-------------------------+-------------------------+------+-----------+ | 2021-04-15 22:00:00.000 | 2021-04-15 22:10:00.000 | 2021-04-15 22:09:59.999 | A | 1.8 | | 2021-04-15 22:00:00.000 | 2021-04-15 22:10:00.000 | 2021-04-15 22:09:59.999 | B | 2.5 | | 2021-04-15 22:00:00.000 | 2021-04-15 22:10:00.000 | 2021-04-15 22:09:59.999 | C | 3.8 | 通过上述结果可见,使用 TIMESTAMP_LTZ(带时区信息的时间) 进开窗,在 UTC 时区下的计算结果与在 Asia/Shanghai 时区下计算的窗口开始时间,窗口结束时间和窗口的时间是不同的,都是按照时区进行格式化的。 Flink SQL 定义处理时间属性列是通过 PROCTIME() 函数来指定的,其返回值类型是 TIMESTAMP_LTZ。 注意: 在 Flink 1.13 之前,PROCTIME() 函数返回类型是 TIMESTAMP,返回值是 UTC 时区的时间戳,例如,上海时间显示为 2021-03-01 12:00:00 时,PROCTIME() 返回值显示 2021-03-01 04:00:00,我们进行使用是错误的。Flink 1.13 修复了这个问题,使用 TIMESTAMP_LTZ 作为 PROCTIME() 的返回类型,这样 Flink 就会自动获取当前时区信息,然后进行处理,不需要用户再进行时区的格式化处理了。 如下案例: Flink SQL> SET table.local-time-zone=UTC; Flink SQL> SELECT PROCTIME(); +-------------------------+ | PROCTIME() | +-------------------------+ | 2021-04-15 14:48:31.387 | +-------------------------+ Flink SQL> SET table.local-time-zone=Asia/Shanghai; Flink SQL> SELECT PROCTIME(); +-------------------------+ | PROCTIME() | +-------------------------+ | 2021-04-15 22:48:31.387 | +-------------------------+ Flink SQL> CREATE TABLE MyTable1 ( item STRING, price DOUBLE, proctime as PROCTIME() ) WITH ( connector = socket, hostname = 127.0.0.1, port = 9999, format = csv ); Flink SQL> CREATE VIEW MyView3 AS SELECT TUMBLE_START(proctime, INTERVAL 10 MINUTES) AS window_start, TUMBLE_END(proctime, INTERVAL 10 MINUTES) AS window_end, TUMBLE_PROCTIME(proctime, INTERVAL 10 MINUTES) as window_proctime, item, MAX(price) as max_price FROM MyTable1 GROUP BY TUMBLE(proctime, INTERVAL 10 MINUTES), item; Flink SQL> DESC MyView3; +-----------------+-----------------------------+-------+-----+--------+-----------+ | name | type | null | key | extras | watermark | +-----------------+-----------------------------+-------+-----+--------+-----------+ | window_start | TIMESTAMP(3) | false | | | | | window_end | TIMESTAMP(3) | false | | | | | window_proctime | TIMESTAMP_LTZ(3) *PROCTIME* | false | | | | | item | STRING | true | | | | | max_price | DOUBLE | true | | | | 将数据写入到 MyTable1 中: > nc -lk 9999 A,1.1 B,1.2 A,1.8 B,2.5 其输出结果如下: Flink SQL> SET table.local-time-zone=UTC; Flink SQL> SELECT * FROM MyView3; +-------------------------+-------------------------+-------------------------+------+-----------+ | window_start | window_end | window_procime | item | max_price | +-------------------------+-------------------------+-------------------------+------+-----------+ | 2021-04-15 14:00:00.000 | 2021-04-15 14:10:00.000 | 2021-04-15 14:10:00.005 | A | 1.8 | | 2021-04-15 14:00:00.000 | 2021-04-15 14:10:00.000 | 2021-04-15 14:10:00.007 | B | 2.5 | | 2021-04-15 14:00:00.000 | 2021-04-15 14:10:00.000 | 2021-04-15 14:10:00.007 | C | 3.8 | +-------------------------+-------------------------+-------------------------+------+-----------+ Flink SQL> SET table.local-time-zone=Asia/Shanghai; Flink SQL> SELECT * FROM MyView3; +-------------------------+-------------------------+-------------------------+------+-----------+ | window_start | window_end | window_procime | item | max_price | +-------------------------+-------------------------+-------------------------+------+-----------+ | 2021-04-15 22:00:00.000 | 2021-04-15 22:10:00.000 | 2021-04-15 22:10:00.005 | A | 1.8 | | 2021-04-15 22:00:00.000 | 2021-04-15 22:10:00.000 | 2021-04-15 22:10:00.007 | B | 2.5 | | 2021-04-15 22:00:00.000 | 2021-04-15 22:10:00.000 | 2021-04-15 22:10:00.007 | C | 3.8 | 通过上述结果可见,使用处理时间进行开窗,在 UTC 时区下的计算结果与在 Asia/Shanghai 时区下计算的窗口开始时间,窗口结束时间和窗口的时间是不同的,都是按照时区进行格式化的。 以下函数: 在 Streaming 模式下这些函数是每条记录都会计算一次,但在 Batch 模式下,只会在 query 开始时计算一次,所有记录都使用相同的时间结果。 以下时间函数无论是在 Streaming 模式还是 Batch 模式下,都会为每条记录计算一次结果:SQL 时区问题
1.SQL 时区解决的所然时区问题