1.18.2.10 解释表:Table.explain物理执行计划等
Posted to.to
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了1.18.2.10 解释表:Table.explain物理执行计划等相关的知识,希望对你有一定的参考价值。
1.18.2.10.解释表
Table API 提供了一种机制来解释计算 Table 的逻辑和优化查询计划。 这是通过 Table.explain() 方法或者 StatementSet.explain() 方法来完成的。Table.explain() 返回一个 Table 的计划。StatementSet.explain() 返回多 sink 计划的结果。它返回一个描述三种计划的字符串:
关系查询的抽象语法树(the Abstract Syntax Tree),即未优化的逻辑查询计划
优化的逻辑查询计划,以及
物理执行计划。
可以用 TableEnvironment.explainSql() 方法和 TableEnvironment.executeSql() 方法支持执行一个 EXPLAIN 语句获取逻辑和优化查询计划,请参阅 EXPLAIN 页面.
以下代码展示了一个示例以及对给定Table使用Table.explain()方法的相应输出:
Java代码:
package com.toto.demo.sql;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import static org.apache.flink.table.api.Expressions.$;
public class Demo {
public static void main(String[] args) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
DataStream<Tuple2<Integer, String>> stream1 = env.fromElements(new Tuple2<>(1,"hello"));
DataStream<Tuple2<Integer, String>> stream2 = env.fromElements(new Tuple2<>(1, "hello"));
// explain Table API
Table table1 = tEnv.fromDataStream(stream1, $("count"), $("word"));
Table table2 = tEnv.fromDataStream(stream2, $("count"), $("word"));
Table table = table1
.where($("word").like("F%"))
.unionAll(table2);
System.out.println(table.explain());
}
}
输出结果:
== Abstract Syntax Tree ==
LogicalUnion(all=[true])
:- LogicalFilter(condition=[LIKE($1, _UTF-16LE'F%')])
: +- LogicalTableScan(table=[[Unregistered_DataStream_1]])
+- LogicalTableScan(table=[[Unregistered_DataStream_2]])
== Optimized Logical Plan ==
Union(all=[true], union=[count, word])
:- Calc(select=[count, word], where=[LIKE(word, _UTF-16LE'F%')])
: +- DataStreamScan(table=[[Unregistered_DataStream_1]], fields=[count, word])
+- DataStreamScan(table=[[Unregistered_DataStream_2]], fields=[count, word])
== Physical Execution Plan ==
Stage 1 : Data Source
content : Source: Collection Source
Stage 2 : Data Source
content : Source: Collection Source
Stage 3 : Operator
content : SourceConversion(table=[Unregistered_DataStream_1], fields=[count, word])
ship_strategy : FORWARD
Stage 4 : Operator
content : Calc(select=[count, word], where=[(word LIKE _UTF-16LE'F%')])
ship_strategy : FORWARD
Stage 5 : Operator
content : SourceConversion(table=[Unregistered_DataStream_2], fields=[count, word])
ship_strategy : FORWARD
Scala代码:
package com.toto.learn.sql
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
object Demo {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tEnv = StreamTableEnvironment.create(env)
val table1 = env.fromElements((1, "hello")).toTable(tEnv, $"count", $"word")
val table2 = env.fromElements((1, "hello")).toTable(tEnv, $"count", $"word")
val table = table1
.where($"word".like("F%"))
.unionAll(table2)
println(table.explain())
}
}
以下代码展示了一个示例以及使用StatementSet.explain()的多sink计划的相应输出:
EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
TableEnvironment tEnv = TableEnvironment.create(settings);
final Schema schema = new Schema()
.field("count", DataTypes.INT())
.field("word", DataTypes.STRING());
tEnv.connect(new FileSystem().path("/source/path1"))
.withFormat(new Csv().deriveSchema())
.withSchema(schema)
.createTemporaryTable("MySource1");
tEnv.connect(new FileSystem().path("/source/path2"))
.withFormat(new Csv().deriveSchema())
.withSchema(schema)
.createTemporaryTable("MySource2");
tEnv.connect(new FileSystem().path("/sink/path1"))
.withFormat(new Csv().deriveSchema())
.withSchema(schema)
.createTemporaryTable("MySink1");
tEnv.connect(new FileSystem().path("/sink/path2"))
.withFormat(new Csv().deriveSchema())
.withSchema(schema)
.createTemporaryTable("MySink2");
StatementSet stmtSet = tEnv.createStatementSet();
Table table1 = tEnv.from("MySource1").where($("word").like("F%"));
stmtSet.addInsert("MySink1", table1);
Table table2 = table1.unionAll(tEnv.from("MySource2"));
stmtSet.addInsert("MySink2", table2);
String explanation = stmtSet.explain();
System.out.println(explanation);
Scala代码:
val settings = EnvironmentSettings.newInstance.useBlinkPlanner.inStreamingMode.build
val tEnv = TableEnvironment.create(settings)
val schema = new Schema()
.field("count", DataTypes.INT())
.field("word", DataTypes.STRING())
tEnv.connect(new FileSystem().path("/source/path1"))
.withFormat(new Csv().deriveSchema())
.withSchema(schema)
.createTemporaryTable("MySource1")
tEnv.connect(new FileSystem().path("/source/path2"))
.withFormat(new Csv().deriveSchema())
.withSchema(schema)
.createTemporaryTable("MySource2")
tEnv.connect(new FileSystem().path("/sink/path1"))
.withFormat(new Csv().deriveSchema())
.withSchema(schema)
.createTemporaryTable("MySink1")
tEnv.connect(new FileSystem().path("/sink/path2"))
.withFormat(new Csv().deriveSchema())
.withSchema(schema)
.createTemporaryTable("MySink2")
val stmtSet = tEnv.createStatementSet()
val table1 = tEnv.from("MySource1").where($"word".like("F%"))
stmtSet.addInsert("MySink1", table1)
val table2 = table1.unionAll(tEnv.from("MySource2"))
stmtSet.addInsert("MySink2", table2)
val explanation = stmtSet.explain()
println(explanation)
以上是关于1.18.2.10 解释表:Table.explain物理执行计划等的主要内容,如果未能解决你的问题,请参考以下文章