SQL 的所然时间时间语义 hello,我是语义老羊,今天跟着老羊的所然时间思路学习 Flink SQL 的时间语义: time 小伙伴萌要注意到: 讲到这里,xdm 会问,博主上面写的 3 种时间属性到底对我们的任务有啥影响呢?3 种时间属性的应用场景是啥? 先说结论,在 Flink 中时间的作用: 博主以 滚动窗口 的聚合任务为例来介绍一下事件时间和处理时间的对比区别。 1. 事件时间案例:还是以之前的 clicks 表拿来举例。 tumble window 上面这个案例的窗口大小是 1 小时,需求方需要按照用户点击时间戳 cTime 划分数据(划分滚动窗口),然后计算出 count 聚合结果(这样计算能反映出事件的真实发生时间),那么就需要把 cTime 设置为窗口的划分时间戳,即代码中 tumble(cTime, interval 1 hour)。 上面这种就叫做事件时间。即用数据中自带的时间戳进行窗口的划分(点击操作真实的发生时间)。 后续 Flink SQL 任务在运行的过程中也会实际按照 cTime 的当前时间作为一小时窗口结束触发条件并计算一个小时窗口内的数据。 2.处理时间案例:还是以之前的 clicks 表拿来举例。 还是上面那个案例,但是这次需求方不需要按照数据上的时间戳划分数据(划分滚动窗口),只需要数据来了之后, 在 Flink 机器上的时间作为一小时窗口结束的书法条件并计算。 那么这种触发机制就是处理时间。 3. 摄入时间案例:在 Flink 从外部数据源读取到数据时,给这条数据带上的当前数据源算子的本地时间戳。下游可以用这个时间戳进行窗口聚合,不过这种几乎不使用。 如果要满足 Flink SQL 时间窗口类的聚合操作,SQL 或 Table API 中的 数据源表 就需要提供时间属性(相当于我们把这个时间属性在 数据源表 上面进行声明),以及支持时间相关的操作。 那么来看看 Flink SQL 为我们提供的两种指定时间戳的方式: 一旦时间属性定义好,它就可以像普通列一样使用,也可以在时间相关的操作中使用。 来看看 Flink 中如何指定事件时间。 1. CREATE TABLE DDL 指定时间戳的方式。 CREATE TABLE user_actions ( user_name STRING, data STRING, user_action_time TIMESTAMP(3), -- 使用下面这句来将 user_action_time 声明为事件时间,并且声明 watermark 的生成规则,即 user_action_time 减 5 秒 -- 事件时间列的字段类型必须是 TIMESTAMP 或者 TIMESTAMP_LTZ 类型 WATERMARK FOR user_action_time AS user_action_time - INTERVAL 5 SECOND ) WITH ( ... ); SELECT TUMBLE_START(user_action_time, INTERVAL 10 MINUTE), COUNT(DISTINCT user_name) FROM user_actions -- 然后就可以在窗口算子中使用 user_action_time 从上面这条语句可以看到,如果想使用事件时间,那么我们的时间戳类型必须是 TIMESTAMP 或者 TIMESTAMP_LTZ 类型。很多小伙伴会想到,我们的时间戳一般不都是秒或者是毫秒(BIGINT 类型)嘛,那这种情况怎么办? 解决方案必须要有啊。如下。 CREATE TABLE user_actions ( user_name STRING, data STRING, -- 1. 这个 ts 就是常见的毫秒级别时间戳 ts BIGINT, -- 2. 将毫秒时间戳转换成 TIMESTAMP_LTZ 类型 time_ltz AS TO_TIMESTAMP_LTZ(ts, 3), -- 3. 使用下面这句来将 user_action_time 声明为事件时间,并且声明 watermark 的生成规则,即 user_action_time 减 5 秒 -- 事件时间列的字段类型必须是 TIMESTAMP 或者 TIMESTAMP_LTZ 类型 WATERMARK FOR time_ltz AS time_ltz - INTERVAL 5 SECOND ) WITH ( ... ); SELECT TUMBLE_START(time_ltz, INTERVAL 10 MINUTE), COUNT(DISTINCT user_name) FROM user_actions 2.DataStream 中指定事件时间。 之前介绍了 Table 和 DataStream 可以互转,那么 Flink 也提供了一个能力,就是在 Table 转为 DataStream 时,指定时间戳字段。如下案例: public class DataStreamSourceEventTimeTest { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration()); EnvironmentSettings settings = EnvironmentSettings .newInstance() .useBlinkPlanner() .inStreamingMode() .build(); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings); // 1. 分配 watermark DataStream r = env.addSource(new UserDefinedSource()) .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor(Time.minutes(0L)) { @Override public long extractTimestamp(Row element) { return (long) element.getField("f2"); } }); // 2. 使用 f2.rowtime 的方式将 f2 字段指为事件时间时间戳 Table sourceTable = tEnv.fromDataStream(r, "f0, f1, f2.rowtime"); tEnv.createTemporaryView("source_table", sourceTable); // 3. 在 tumble window 中使用 f2 String tumbleWindowSql = "SELECT TUMBLE_START(f2, INTERVAL 5 SECOND), COUNT(DISTINCT f0)\n" + "FROM source_table\n" + "GROUP BY TUMBLE(f2, INTERVAL 5 SECOND)" ; Table resultTable = tEnv.sqlQuery(tumbleWindowSql); tEnv.toDataStream(resultTable, Row.class).print(); env.execute(); } private static class UserDefinedSource implements SourceFunction, ResultTypeQueryable { private volatile boolean isCancel; @Override public void run(SourceContextsourceContext) throws Exception { int i = 0; while (!this.isCancel) { sourceContext.collect(Row.of("a" + i, "b", System.currentTimeMillis())); Thread.sleep(10L); i++; } } @Override public void cancel() { this.isCancel = true; } @Override public TypeInformation getProducedType() { return new RowTypeInfo(TypeInformation.of(String.class), TypeInformation.of(String.class), TypeInformation.of(Long.class)); } } 来看看 Flink SQL 中如何指定处理时间。 1.CREATE TABLE DDL 指定时间戳的方式。 CREATE TABLE user_actions ( user_name STRING, data STRING, -- 使用下面这句来将 user_action_time 声明为处理时间 user_action_time AS PROCTIME() ) WITH ( ... ); SELECT TUMBLE_START(user_action_time, INTERVAL 10 MINUTE), COUNT(DISTINCT user_name) FROM user_actions -- 然后就可以在窗口算子中使用 user_action_time ⭐ DataStream 中指定处理时间。 public class DataStreamSourceProcessingTimeTest { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration()); EnvironmentSettings settings = EnvironmentSettings .newInstance() .useBlinkPlanner() .inStreamingMode() .build(); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings); // 1. 分配 watermark DataStream r = env.addSource(new UserDefinedSource()); // 2. 使用 proctime.proctime 的方式将 f2 字段指为处理时间时间戳 Table sourceTable = tEnv.fromDataStream(r, "f0, f1, f2, proctime.proctime"); tEnv.createTemporaryView("source_table", sourceTable); // 3. 在 tumble window 中使用 f2 String tumbleWindowSql = "SELECT TUMBLE_START(proctime, INTERVAL 5 SECOND), COUNT(DISTINCT f0)\n" + "FROM source_table\n" + "GROUP BY TUMBLE(proctime, INTERVAL 5 SECOND)" ; Table resultTable = tEnv.sqlQuery(tumbleWindowSql); tEnv.toDataStream(resultTable, Row.class).print(); env.execute(); } private static class UserDefinedSource implements SourceFunction, ResultTypeQueryable { private volatile boolean isCancel; @Override public void run(SourceContext sourceContext) throws Exception { int i = 0; while (!this.isCancel) { sourceContext.collect(Row.of("a" + i, "b", System.currentTimeMillis())); Thread.sleep(10L); i++; } } @Override public void cancel() { this.isCancel = true; } @Override public TypeInformation getProducedType() { return new RowTypeInfo(TypeInformation.of(String.class), TypeInformation.of(String.class), TypeInformation.of(Long.class)); } }