1.19.7.Table APISQL数据类型保留关键字查询语句指定查询执行查询语法操作符无排名输出优化去重分组窗口时间属性选择分组窗口的开始和结束时间戳模式匹配
Posted to.to
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了1.19.7.Table APISQL数据类型保留关键字查询语句指定查询执行查询语法操作符无排名输出优化去重分组窗口时间属性选择分组窗口的开始和结束时间戳模式匹配相关的知识,希望对你有一定的参考价值。
1.19.7.Table API
1.19.8.SQL
1.19.8.1.概述
1.19.8.1.1.SQL
1.19.8.1.2.数据类型
1.19.8.1.3.保留关键字
1.19.8.2.查询语句
1.19.8.2.1.指定查询
1.19.8.2.2.执行查询
1.19.8.2.3.语法
1.19.8.2.4.操作符
1.19.8.2.5.无排名输出优化
1.19.8.2.6.去重
1.19.8.2.7.分组窗口
1.19.8.2.7.1.时间属性
1.19.8.2.7.2.选择分组窗口的开始和结束时间戳
1.19.8.2.8.模式匹配
1.19.7.Table API
暂略,具体可以参考:https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/dev/table/tableApi.html
1.19.8.SQL
1.19.8.1.概述
1.19.8.1.1.SQL
本页面描述了 Flink 所支持的 SQL 语言,包括数据定义语言(Data Definition Language,DDL)、数据操纵语言(Data Manipulation Language,DML)以及查询语言。Flink 对 SQL 的支持基于实现了 SQL 标准的 Apache Calcite。
本页面列出了目前 Flink SQL 所支持的所有语句:
SELECT (Queries)
CREATE TABLE, DATABASE, VIEW, FUNCTION
DROP TABLE, DATABASE, VIEW, FUNCTION
ALTER TABLE, DATABASE, FUNCTION
INSERT
SQL HINTS
DESCRIBE
EXPLAIN
USE
SHOW
1.19.8.1.2.数据类型
请参考专门描述该主题的页面 数据类型(https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/dev/table/types.html)。
通用类型与(嵌套的)复合类型 (如:POJO、tuples、rows、Scala case 类) 都可以作为行的字段。
复合类型的字段任意的嵌套可被 值访问函数(https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/dev/table/functions/systemFunctions.html#value-access-functions) 访问。
通用类型将会被视为一个黑箱,且可以被 用户自定义函数(https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/dev/table/functions/udfs.html) 传递或引用。
对于 DDL 语句而言,我们支持所有在 数据类型(https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/dev/table/types.html) 页面中定义的数据类型。
注意: SQL查询不支持部分数据类型(cast 表达式或字符常量值)。如:STRING, BYTES, RAW, TIME§ WITHOUT TIME ZONE, TIME§ WITH LOCAL TIME ZONE, TIMESTAMP§ WITHOUT TIME ZONE, TIMESTAMP§ WITH LOCAL TIME ZONE, ARRAY, MULTISET, ROW.
1.19.8.1.3.保留关键字
虽然 SQL 的特性并未完全实现,但是一些字符串的组合却已经被预留为关键字以备未来使用。如果你希望使用以下字符串作为你的字段名,请在使用时使用反引号将该字段名包起来(如 value
, count
)。
A, ABS, ABSOLUTE, ACTION, ADA, ADD, ADMIN, AFTER, ALL, ALLOCATE, ALLOW, ALTER, ALWAYS, AND, ANY, ARE, ARRAY, AS, ASC, ASENSITIVE, ASSERTION, ASSIGNMENT, ASYMMETRIC, AT, ATOMIC, ATTRIBUTE, ATTRIBUTES, AUTHORIZATION, AVG, BEFORE, BEGIN, BERNOULLI, BETWEEN, BIGINT, BINARY, BIT, BLOB, BOOLEAN, BOTH, BREADTH, BY, BYTES, C, CALL, CALLED, CARDINALITY, CASCADE, CASCADED, CASE, CAST, CATALOG, CATALOG_NAME, CEIL, CEILING, CENTURY, CHAIN, CHAR, CHARACTER, CHARACTERISTICS, CHARACTERS, CHARACTER_LENGTH, CHARACTER_SET_CATALOG, CHARACTER_SET_NAME, CHARACTER_SET_SCHEMA, CHAR_LENGTH, CHECK, CLASS_ORIGIN, CLOB, CLOSE, COALESCE, COBOL, COLLATE, COLLATION, COLLATION_CATALOG, COLLATION_NAME, COLLATION_SCHEMA, COLLECT, COLUMN, COLUMN_NAME, COMMAND_FUNCTION, COMMAND_FUNCTION_CODE, COMMIT, COMMITTED, CONDITION, CONDITION_NUMBER, CONNECT, CONNECTION, CONNECTION_NAME, CONSTRAINT, CONSTRAINTS, CONSTRAINT_CATALOG, CONSTRAINT_NAME, CONSTRAINT_SCHEMA, CONSTRUCTOR, CONTAINS, CONTINUE, CONVERT, CORR, CORRESPONDING, COUNT, COVAR_POP, COVAR_SAMP, CREATE, CROSS, CUBE, CUME_DIST, CURRENT, CURRENT_CATALOG, CURRENT_DATE, CURRENT_DEFAULT_TRANSFORM_GROUP, CURRENT_PATH, CURRENT_ROLE, CURRENT_SCHEMA, CURRENT_TIME, CURRENT_TIMESTAMP, CURRENT_TRANSFORM_GROUP_FOR_TYPE, CURRENT_USER, CURSOR, CURSOR_NAME, CYCLE, DATA, DATABASE, DATE, DATETIME_INTERVAL_CODE, DATETIME_INTERVAL_PRECISION, DAY, DEALLOCATE, DEC, DECADE, DECIMAL, DECLARE, DEFAULT, DEFAULTS, DEFERRABLE, DEFERRED, DEFINED, DEFINER, DEGREE, DELETE, DENSE_RANK, DEPTH, DEREF, DERIVED, DESC, DESCRIBE, DESCRIPTION, DESCRIPTOR, DETERMINISTIC, DIAGNOSTICS, DISALLOW, DISCONNECT, DISPATCH, DISTINCT, DOMAIN, DOUBLE, DOW, DOY, DROP, DYNAMIC, DYNAMIC_FUNCTION, DYNAMIC_FUNCTION_CODE, EACH, ELEMENT, ELSE, END, END-EXEC, EPOCH, EQUALS, ESCAPE, EVERY, EXCEPT, EXCEPTION, EXCLUDE, EXCLUDING, EXEC, EXECUTE, EXISTS, EXP, EXPLAIN, EXTEND, EXTERNAL, EXTRACT, FALSE, FETCH, FILTER, FINAL, FIRST, FIRST_VALUE, FLOAT, FLOOR, FOLLOWING, FOR, FOREIGN, FORTRAN, FOUND, FRAC_SECOND, FREE, FROM, FULL, FUNCTION, FUSION, G, GENERAL, GENERATED, GET, GLOBAL, GO, GOTO, GRANT, GRANTED, GROUP, GROUPING, HAVING, HIERARCHY, HOLD, HOUR, IDENTITY, IMMEDIATE, IMPLEMENTATION, IMPORT, IN, INCLUDING, INCREMENT, INDICATOR, INITIALLY, INNER, INOUT, INPUT, INSENSITIVE, INSERT, INSTANCE, INSTANTIABLE, INT, INTEGER, INTERSECT, INTERSECTION, INTERVAL, INTO, INVOKER, IS, ISOLATION, JAVA, JOIN, K, KEY, KEY_MEMBER, KEY_TYPE, LABEL, LANGUAGE, LARGE, LAST, LAST_VALUE, LATERAL, LEADING, LEFT, LENGTH, LEVEL, LIBRARY, LIKE, LIMIT, LN, LOCAL, LOCALTIME, LOCALTIMESTAMP, LOCATOR, LOWER, M, MAP, MATCH, MATCHED, MAX, MAXVALUE, MEMBER, MERGE, MESSAGE_LENGTH, MESSAGE_OCTET_LENGTH, MESSAGE_TEXT, METHOD, MICROSECOND, MILLENNIUM, MIN, MINUTE, MINVALUE, MOD, MODIFIES, MODULE, MONTH, MORE, MULTISET, MUMPS, NAME, NAMES, NATIONAL, NATURAL, NCHAR, NCLOB, NESTING, NEW, NEXT, NO, NONE, NORMALIZE, NORMALIZED, NOT, NULL, NULLABLE, NULLIF, NULLS, NUMBER, NUMERIC, OBJECT, OCTETS, OCTET_LENGTH, OF, OFFSET, OLD, ON, ONLY, OPEN, OPTION, OPTIONS, OR, ORDER, ORDERING, ORDINALITY, OTHERS, OUT, OUTER, OUTPUT, OVER, OVERLAPS, OVERLAY, OVERRIDING, PAD, PARAMETER, PARAMETER_MODE, PARAMETER_NAME, PARAMETER_ORDINAL_POSITION, PARAMETER_SPECIFIC_CATALOG, PARAMETER_SPECIFIC_NAME, PARAMETER_SPECIFIC_SCHEMA, PARTIAL, PARTITION, PASCAL, PASSTHROUGH, PATH, PERCENTILE_CONT, PERCENTILE_DISC, PERCENT_RANK, PLACING, PLAN, PLI, POSITION, POWER, PRECEDING, PRECISION, PREPARE, PRESERVE, PRIMARY, PRIOR, PRIVILEGES, PROCEDURE, PUBLIC, QUARTER, RANGE, RANK, RAW, READ, READS, REAL, RECURSIVE, REF, REFERENCES, REFERENCING, REGR_AVGX, REGR_AVGY, REGR_COUNT, REGR_INTERCEPT, REGR_R2, REGR_SLOPE, REGR_SXX, REGR_SXY, REGR_SYY, RELATIVE, RELEASE, REPEATABLE, RESET, RESTART, RESTRICT, RESULT, RETURN, RETURNED_CARDINALITY, RETURNED_LENGTH, RETURNED_OCTET_LENGTH, RETURNED_SQLSTATE, RETURNS, REVOKE, RIGHT, ROLE, ROLLBACK, ROLLUP, ROUTINE, ROUTINE_CATALOG, ROUTINE_NAME, ROUTINE_SCHEMA, ROW, ROWS, ROW_COUNT, ROW_NUMBER, SAVEPOINT, SCALE, SCHEMA, SCHEMA_NAME, SCOPE, SCOPE_CATALOGS, SCOPE_NAME, SCOPE_SCHEMA, SCROLL, SEARCH, SECOND, SECTION, SECURITY, SELECT, SELF, SENSITIVE, SEQUENCE, SERIALIZABLE, SERVER, SERVER_NAME, SESSION, SESSION_USER, SET, SETS, SIMILAR, SIMPLE, SIZE, SMALLINT, SOME, SOURCE, SPACE, SPECIFIC, SPECIFICTYPE, SPECIFIC_NAME, SQL, SQLEXCEPTION, SQLSTATE, SQLWARNING, SQL_TSI_DAY, SQL_TSI_FRAC_SECOND, SQL_TSI_HOUR, SQL_TSI_MICROSECOND, SQL_TSI_MINUTE, SQL_TSI_MONTH, SQL_TSI_QUARTER, SQL_TSI_SECOND, SQL_TSI_WEEK, SQL_TSI_YEAR, SQRT, START, STATE, STATEMENT, STATIC, STDDEV_POP, STDDEV_SAMP, STREAM, STRING, STRUCTURE, STYLE, SUBCLASS_ORIGIN, SUBMULTISET, SUBSTITUTE, SUBSTRING, SUM, SYMMETRIC, SYSTEM, SYSTEM_USER, TABLE, TABLESAMPLE, TABLE_NAME, TEMPORARY, THEN, TIES, TIME, TIMESTAMP, TIMESTAMPADD, TIMESTAMPDIFF, TIMEZONE_HOUR, TIMEZONE_MINUTE, TINYINT, TO, TOP_LEVEL_COUNT, TRAILING, TRANSACTION, TRANSACTIONS_ACTIVE, TRANSACTIONS_COMMITTED, TRANSACTIONS_ROLLED_BACK, TRANSFORM, TRANSFORMS, TRANSLATE, TRANSLATION, TREAT, TRIGGER, TRIGGER_CATALOG, TRIGGER_NAME, TRIGGER_SCHEMA, TRIM, TRUE, TYPE, UESCAPE, UNBOUNDED, UNCOMMITTED, UNDER, UNION, UNIQUE, UNKNOWN, UNNAMED, UNNEST, UPDATE, UPPER, UPSERT, USAGE, USER, USER_DEFINED_TYPE_CATALOG, USER_DEFINED_TYPE_CODE, USER_DEFINED_TYPE_NAME, USER_DEFINED_TYPE_SCHEMA, USING, VALUE, VALUES, VARBINARY, VARCHAR, VARYING, VAR_POP, VAR_SAMP, VERSION, VIEW, WEEK, WHEN, WHENEVER, WHERE, WIDTH_BUCKET, WINDOW, WITH, WITHIN, WITHOUT, WORK, WRAPPER, WRITE, XML, YEAR, ZONE
1.19.8.2.查询语句
SELECT 语句和 VALUES 语句需要使用 TableEnvironment 的 sqlQuery() 方法加以指定。这个方法会以 Table 的形式返回 SELECT (或 VALUE)的查询结果。Table 可以被用于 随后的SQL 与 Table API 查询 (https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/dev/table/common.html#mixing-table-api-and-sql)、 转换为DataSet 或 DataStream(https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/dev/table/common.html#integration-with-datastream-and-dataset-api) 或 输出到 TableSink(https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/dev/table/common.html#emit-a-table)。SQL 与 Table API 的查询可以进行无缝融合、整体优化并翻译为单一的程序。
为了可以在SQL查询中访问到表,你需要先 在TableEnvironment 中注册表(https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/dev/table/common.html#register-tables-in-the-catalog)。表可以通过TableSource(https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/dev/table/common.html#register-a-tablesource)、Table(https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/dev/table/common.html#register-a-table)、CREATE TABLE 语句(https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/dev/table/sql/create.html)、 DataStream 或 DataSet(https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/dev/table/common.html#register-a-datastream-or-dataset-as-table)注册。 用户也可以通过 向 TableEnvironment 中注册 catalog(https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/dev/table/catalogs.html) 的方式指定数据源的位置。
为方便起见 Table.toString() 将会在其 TableEnvironment 中自动使用一个唯一的名字注册表并返回表名。 因此, Table 对象可以如下文所示样例,直接内联到 SQL 语句中。
注意: 查询若包括了不支持的 SQL 特性,将会抛出 TableException。批处理和流处理所支持的 SQL 特性将会在下述章节中列出。
1.19.8.2.1.指定查询
以下示例显示如何在已注册和内联表上指定 SQL 查询。
Java
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// 从外部数据源读取 DataStream
DataStream<Tuple3<Long, String, Integer>> ds = env.addSource(...);
// 使用 SQL 查询内联的(未注册的)表
Table table = tableEnv.fromDataStream(ds, $("user"), $("product"), $("amount"));
Table result = tableEnv.sqlQuery(
"SELECT SUM(amount) FROM " + table + " WHERE product LIKE '%Rubber%'");
// SQL 查询一个已经注册的表
// 根据视图 "Orders" 创建一个 DataStream
tableEnv.createTemporaryView("Orders", ds, $("user"), $("product"), $("amount"));
// 在表上执行 SQL 查询并得到以新表返回的结果
Table result2 = tableEnv.sqlQuery(
"SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'");
// 创建并注册一个 TableSink
final Schema schema = new Schema()
.field("product", DataTypes.STRING())
.field("amount", DataTypes.INT());
tableEnv.connect(new FileSystem().path("/path/to/file"))
.withFormat(...)
.withSchema(schema)
.createTemporaryTable("RubberOrders");
// 在表上执行插入语句并把结果发出到 TableSink
tableEnv.executeSql(
"INSERT INTO RubberOrders SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'");
Scala
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tableEnv = StreamTableEnvironment.create(env)
// 从外部数据源读取 DataStream
val ds: DataStream[(Long, String, Integer)] = env.addSource(...)
// 使用 SQL 查询内联的(未注册的)表
val table = ds.toTable(tableEnv, $"user", $"product", $"amount")
val result = tableEnv.sqlQuery(
s"SELECT SUM(amount) FROM $table WHERE product LIKE '%Rubber%'")
// 使用名称 "Orders" 注册一个 DataStream
tableEnv.createTemporaryView("Orders", ds, $"user", $"product", $"amount")
// 在表上执行 SQL 查询并得到以新表返回的结果
val result2 = tableEnv.sqlQuery(
"SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'")
// 创建并注册一个 TableSink
val schema = new Schema()
.field("product", DataTypes.STRING())
.field("amount", DataTypes.INT())
tableEnv.connect(new FileSystem().path("/path/to/file"))
.withFormat(...)
.withSchema(schema)
.createTemporaryTable("RubberOrders")
// 在表上执行插入操作,并把结果发出到 TableSink
tableEnv.executeSql(
"INSERT INTO RubberOrders SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'")
Python
env = StreamExecutionEnvironment.get_execution_environment()
table_env = StreamTableEnvironment.create(env)
# SQL 查询内联的(未注册的)表
# 元素数据类型: BIGINT, STRING, BIGINT
table = table_env.from_elements(..., ['user', 'product', 'amount'])
result = table_env \\
.sql_query("SELECT SUM(amount) FROM %s WHERE product LIKE '%%Rubber%%'" % table)
# 创建并注册 TableSink
t_env.connect(FileSystem().path("/path/to/file")))
.with_format(Csv()
.field_delimiter(',')
.deriveSchema())
.with_schema(Schema()
.field("product", DataTypes.STRING())
.field("amount", DataTypes.BIGINT()))
.create_temporary_table("RubberOrders")
# 在表上执行插入操作,并把结果发出到 TableSink
table_env \\
.execute_sql("INSERT INTO RubberOrders SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'")
1.19.8.2.2.执行查询
SELECT语句或者 VALUES 语句可以通过 TableEnvironment.execute_sql() 方法来执行,将选择的结果收集到本地。该方法返回 TableResult 对象用于包装查询的结果。和 SELECT 语句很像,一个 Table 对象可以通过 Table.execute() 方法执行从而将 Table 的内容收集到本地客户端。
TableResult.collect()方法返回一个可以关闭的行迭代器。除非所有的数据都被收集到本地,否则一个查询作业永远不会结束。所以我们应该通过 CloseableIterator#close() 方法主动地关闭作业以防止资源泄露。 我们还可以通过 TableResult.print() 方法将查询结果打印到本地控制台。TableResult中的结果数据只能被访问一次,因此一个 TableResult 实例中,collect() 方法和 print() 方法不能被同时使用。
TableResult.collect()与TableResult.print() 的行为在不同的 checkpointing 模式下略有不同(流作业开启 checkpointing 的方法可参考checkpointing配置(https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/deployment/config.html#checkpointing))。
对于批作业或没有配置任何 checkpointing 的流作业,TableResult.collect() 与 TableResult.print() 既不保证精确一次的数据交付、也不保证至少一次的数据交付。查询结果在产生后可被客户端即刻访问,但作业失败并重启时将会报错。
对于配置了精准一次checkpointing的流作业,TableResult.collect() 与 TableResult.print() 保证端到端精确一次的数据交付。一条结果数据只有在其对应的 checkpointing 完成后才能在客户端被访问。
对于配置了至少一次 checkpointing 的流作业,TableResult.collect() 与 TableResult.print() 保证端到端至少一次的数据交付。查询结果在产生后可被客户端即刻访问,但同一条结果可能被多次传递给客户端。
Java
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);
tableEnv.executeSql("CREATE TABLE Orders (`user` BIGINT, product STRING, amount INT) WITH (...)");
// execute SELECT statement
TableResult tableResult1 = tableEnv.executeSql("SELECT * FROM Orders");
// use try-with-resources statement to make sure the iterator will be closed automatically
try (CloseableIterator<Row> it = tableResult1.collect())
while(it.hasNext())
Row row = it.next();
// handle row
// execute Table
TableResult tableResult2 = tableEnv.sqlQuery("SELECT * FROM Orders").execute();
tableResult2.print();
Scala
val env = StreamExecutionEnvironment.getExecutionEnvironment()
val tableEnv = StreamTableEnvironment.create(env, settings)
// enable checkpointing
tableEnv.getConfig.getConfiguration.set(
ExecutionCheckpointingOptions.CHECKPOINTING_MODE, CheckpointingMode.EXACTLY_ONCE)
tableEnv.getConfig.getConfiguration.set(
ExecutionCheckpointingOptions以上是关于1.19.7.Table APISQL数据类型保留关键字查询语句指定查询执行查询语法操作符无排名输出优化去重分组窗口时间属性选择分组窗口的开始和结束时间戳模式匹配的主要内容,如果未能解决你的问题,请参考以下文章