大家好,我是老羊,今天我们来学习 Flink SQL 中的的 Explain、Show、Load、Set 共 4 个子句。 应用场景:EXPLAIN 子句其实就是用于查看当前这个 sql 查询的逻辑计划以及优化的执行计划。SQL 语法标准:EXPLAIN PLAN FOR 实际案例:public class Explain_Test { public static void main(String[] args) throws Exception { FlinkEnv flinkEnv = FlinkEnvUtils.getStreamTableEnv(args); flinkEnv.env().setParallelism(1); String sql = "CREATE TABLE source_table (\n" + " user_id BIGINT COMMENT 用户 id,\n" + " name STRING COMMENT 用户姓名,\n" + " server_timestamp BIGINT COMMENT 用户访问时间戳,\n" + " proctime AS PROCTIME()\n" + ") WITH (\n" + " connector = datagen,\n" + " rows-per-second = 1,\n" + " fields.name.length = 1,\n" + " fields.user_id.min = 1,\n" + " fields.user_id.max = 10,\n" + " fields.server_timestamp.min = 1,\n" + " fields.server_timestamp.max = 100000\n" + ");\n" + "\n" + "CREATE TABLE sink_table (\n" + " user_id BIGINT,\n" + " name STRING,\n" + " server_timestamp BIGINT\n" + ") WITH (\n" + " connector = print\n" + ");\n" + "\n" + "EXPLAIN PLAN FOR\n" + "INSERT INTO sink_table\n" + "select user_id,\n" + " name,\n" + " server_timestamp\n" + "from (\n" + " SELECT\n" + " user_id,\n" + " name,\n" + " server_timestamp,\n" + " row_number() over(partition by user_id order by proctime) as rn\n" + " FROM source_table\n" + ")\n" + "where rn = 1"; / * 算子 { @link org.apache.flink.streaming.api.operators.KeyedProcessOperator} * -- { @link org.apache.flink.table.runtime.operators.deduplicate.ProcTimeDeduplicateKeepFirstRowFunction} */ for (String innerSql : sql.split(";")) { TableResult tableResult = flinkEnv.streamTEnv().executeSql(innerSql); tableResult.print(); } } 上述代码执行结果如下: 1. 抽象语法树 == Abstract Syntax Tree == LogicalSink(table=[default_catalog.default_database.sink_table], fields=[user_id, name, server_timestamp]) +- LogicalProject(user_id=[$0], name=[$1], server_timestamp=[$2]) +- LogicalFilter(condition=[=($3, 1)]) +- LogicalProject(user_id=[$0], name=[$1], server_timestamp=[$2], rn=[ROW_NUMBER() OVER (PARTITION BY $0 ORDER BY PROCTIME() NULLS FIRST)]) +- LogicalTableScan(table=[[default_catalog, default_database, source_table]]) 2. 优化后的物理计划 == Optimized Physical Plan == Sink(table=[default_catalog.default_database.sink_table], fields=[user_id, name, server_timestamp]) +- Calc(select=[user_id, name, server_timestamp]) +- Deduplicate(keep=[FirstRow], key=[user_id], order=[PROCTIME]) +- Exchange(distribution=[hash[user_id]]) +- Calc(select=[user_id, name, server_timestamp, PROCTIME() AS $3]) +- TableSourceScan(table=[[default_catalog, default_database, source_table]], fields=[user_id, name, server_timestamp]) 3. 优化后的执行计划 == Optimized Execution Plan == Sink(table=[default_catalog.default_database.sink_table], fields=[user_id, name, server_timestamp]) +- Calc(select=[user_id, name, server_timestamp]) +- Deduplicate(keep=[FirstRow], key=[user_id], order=[PROCTIME]) +- Exchange(distribution=[hash[user_id]]) +- Calc(select=[user_id, name, server_timestamp, PROCTIME() AS $3]) 应用场景:如果熟悉 MySQL 的同学会非常熟悉这个子句,在 MySQL 中,USE 子句通常被用于切换库,那么在 Flink SQL 体系中,它的高防服务器作用也是和 MySQL 中 USE 子句的功能基本一致,用于切换 Catalog,DataBase,使用 Module。SQL 语法标准:切换 Catalog:USE CATALOG catalog_name使用 Module:USE MODULES module_name1[, module_name2, ...]切换 Database:USE db名称实际案例:StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); // create a catalog tEnv.executeSql("CREATE CATALOG cat1 WITH (...)"); tEnv.executeSql("SHOW CATALOGS").print(); // +-----------------+ // | catalog name | // +-----------------+ // | default_catalog | // | cat1 | // +-----------------+ // change default catalog tEnv.executeSql("USE CATALOG cat1"); tEnv.executeSql("SHOW DATABASES").print(); // databases are empty // +---------------+ // | database name | // +---------------+ // +---------------+ // create a database tEnv.executeSql("CREATE DATABASE db1 WITH (...)"); tEnv.executeSql("SHOW DATABASES").print(); // +---------------+ // | database name | // +---------------+ // | db1 | // +---------------+ // change default database tEnv.executeSql("USE db1"); // change module resolution order and enabled status tEnv.executeSql("USE MODULES hive"); tEnv.executeSql("SHOW FULL MODULES").print(); // +-------------+-------+ // | module name | used | // +-------------+-------+ // | hive | true | // | core | false | 应用场景:如果熟悉 MySQL 的同学会非常熟悉这个子句,在 MySQL 中,SHOW 子句常常用于查询库、表、函数等,在 Flink SQL 体系中也类似。Flink SQL 支持 SHOW 以下内容。SQL 语法标准:SHOW CATALOGS:展示所有 Catalog SHOW CURRENT CATALOG:展示当前的 Catalog SHOW DATABASES:展示当前 Catalog 下所有 Database SHOW CURRENT DATABASE:展示当前的 Database SHOW TABLES:展示当前 Database 下所有表 SHOW VIEWS:展示所有视图 SHOW FUNCTIONS:展示所有的函数 SHOW MODULES:展示所有的 Module(Module 是用于 UDF 扩展)实际案例:StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); // show catalogs tEnv.executeSql("SHOW CATALOGS").print(); // +-----------------+ // | catalog name | // +-----------------+ // | default_catalog | // +-----------------+ // show current catalog tEnv.executeSql("SHOW CURRENT CATALOG").print(); // +----------------------+ // | current catalog name | // +----------------------+ // | default_catalog | // +----------------------+ // show databases tEnv.executeSql("SHOW DATABASES").print(); // +------------------+ // | database name | // +------------------+ // | default_database | // +------------------+ // show current database tEnv.executeSql("SHOW CURRENT DATABASE").print(); // +-----------------------+ // | current database name | // +-----------------------+ // | default_database | // +-----------------------+ // create a table tEnv.executeSql("CREATE TABLE my_table (...) WITH (...)"); // show tables tEnv.executeSql("SHOW TABLES").print(); // +------------+ // | table name | // +------------+ // | my_table | // +------------+ // create a view tEnv.executeSql("CREATE VIEW my_view AS ..."); // show views tEnv.executeSql("SHOW VIEWS").print(); // +-----------+ // | view name | // +-----------+ // | my_view | // +-----------+ // show functions tEnv.executeSql("SHOW FUNCTIONS").print(); // +---------------+ // | function name | // +---------------+ // | mod | // | sha256 | // | ... | // +---------------+ // create a user defined function tEnv.executeSql("CREATE FUNCTION f1 AS ..."); // show user defined functions tEnv.executeSql("SHOW USER FUNCTIONS").print(); // +---------------+ // | function name | // +---------------+ // | f1 | // | ... | // +---------------+ // show modules tEnv.executeSql("SHOW MODULES").print(); // +-------------+ // | module name | // +-------------+ // | core | // +-------------+ // show full modules tEnv.executeSql("SHOW FULL MODULES").print(); // +-------------+-------+ // | module name | used | // +-------------+-------+ // | core | true | // | hive | false | 应用场景:我们可以使用 LOAD 子句去加载 Flink SQL 体系内置的或者用户自定义的 Module,UNLOAD 子句去卸载 Flink SQL 体系内置的或者用户自定义的 Module。SQL 语法标准:-- 加载 LOAD MODULE module_name [WITH (key1 = val1, key2 = val2, ...)] -- 卸载 UNLOAD MODULE module_name实际案例:LOAD 案例:StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); // 加载 Flink SQL 体系内置的 Hive module tEnv.executeSql("LOAD MODULE hive WITH (hive-version = 3.1.2)"); tEnv.executeSql("SHOW MODULES").print(); // +-------------+ // | module name | // +-------------+ // | core | // | hive | // +-------------+UNLOAD 案例:StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); // 卸载唯一的一个 CoreModule tEnv.executeSql("UNLOAD MODULE core"); tEnv.executeSql("SHOW MODULES").print(); 应用场景:SET 子句可以用于修改一些 Flink SQL 的环境配置,RESET 子句是可以将所有的环境配置恢复成默认配置,但只能在 SQL CLI 中进行使用,主要是为了让用户更纯粹的使用 SQL 而不必使用其他方式或者切换系统环境。SQL 语法标准:SET (key = value)? 启动一个 SQL CLI 之后,在 SQL CLI 中可以进行以下 SET 设置: Flink SQL> SET table.planner = blink; [INFO] Session property has been set. Flink SQL> SET; table.planner=blink; Flink SQL> RESET table.planner; [INFO] Session property has been reset. Flink SQL> RESET;EXPLAIN 子句
USE 子句
SHOW 子句
LOAD、亿华云UNLOAD 子句
SET、RESET 子句