1.18.2.10 解释表:Table.explain物理执行计划等

Posted to.to

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了1.18.2.10 解释表:Table.explain物理执行计划等相关的知识,希望对你有一定的参考价值。

1.18.2.10.解释表

Table API 提供了一种机制来解释计算 Table 的逻辑和优化查询计划。 这是通过 Table.explain() 方法或者 StatementSet.explain() 方法来完成的。Table.explain() 返回一个 Table 的计划。StatementSet.explain() 返回多 sink 计划的结果。它返回一个描述三种计划的字符串:

关系查询的抽象语法树(the Abstract Syntax Tree),即未优化的逻辑查询计划
优化的逻辑查询计划,以及
物理执行计划。
可以用 TableEnvironment.explainSql() 方法和 TableEnvironment.executeSql() 方法支持执行一个 EXPLAIN 语句获取逻辑和优化查询计划,请参阅 EXPLAIN 页面.

以下代码展示了一个示例以及对给定Table使用Table.explain()方法的相应输出:
Java代码:

package com.toto.demo.sql;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

import static org.apache.flink.table.api.Expressions.$;

public class Demo {

    public static void main(String[] args) {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

        DataStream<Tuple2<Integer, String>> stream1 = env.fromElements(new Tuple2<>(1,"hello"));
        DataStream<Tuple2<Integer, String>> stream2 = env.fromElements(new Tuple2<>(1, "hello"));

        // explain Table API
        Table table1 = tEnv.fromDataStream(stream1, $("count"), $("word"));
        Table table2 = tEnv.fromDataStream(stream2, $("count"), $("word"));
        Table table = table1
                .where($("word").like("F%"))
                .unionAll(table2);
        System.out.println(table.explain());

    }

}

输出结果:

== Abstract Syntax Tree ==
LogicalUnion(all=[true])
:- LogicalFilter(condition=[LIKE($1, _UTF-16LE'F%')])
:  +- LogicalTableScan(table=[[Unregistered_DataStream_1]])
+- LogicalTableScan(table=[[Unregistered_DataStream_2]])

== Optimized Logical Plan ==
Union(all=[true], union=[count, word])
:- Calc(select=[count, word], where=[LIKE(word, _UTF-16LE'F%')])
:  +- DataStreamScan(table=[[Unregistered_DataStream_1]], fields=[count, word])
+- DataStreamScan(table=[[Unregistered_DataStream_2]], fields=[count, word])

== Physical Execution Plan ==
Stage 1 : Data Source
	content : Source: Collection Source

Stage 2 : Data Source
	content : Source: Collection Source

	Stage 3 : Operator
		content : SourceConversion(table=[Unregistered_DataStream_1], fields=[count, word])
		ship_strategy : FORWARD

		Stage 4 : Operator
			content : Calc(select=[count, word], where=[(word LIKE _UTF-16LE'F%')])
			ship_strategy : FORWARD

			Stage 5 : Operator
				content : SourceConversion(table=[Unregistered_DataStream_2], fields=[count, word])
				ship_strategy : FORWARD

Scala代码:

package com.toto.learn.sql

import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment

object Demo {

  def main(args: Array[String]): Unit = {

    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val tEnv = StreamTableEnvironment.create(env)

    val table1 = env.fromElements((1, "hello")).toTable(tEnv, $"count", $"word")
    val table2 = env.fromElements((1, "hello")).toTable(tEnv, $"count", $"word")
    val table = table1
      .where($"word".like("F%"))
      .unionAll(table2)
    println(table.explain())
    
  }

}

以下代码展示了一个示例以及使用StatementSet.explain()的多sink计划的相应输出:

EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
TableEnvironment tEnv = TableEnvironment.create(settings);

final Schema schema = new Schema()
    .field("count", DataTypes.INT())
    .field("word", DataTypes.STRING());

tEnv.connect(new FileSystem().path("/source/path1"))
    .withFormat(new Csv().deriveSchema())
    .withSchema(schema)
    .createTemporaryTable("MySource1");
tEnv.connect(new FileSystem().path("/source/path2"))
    .withFormat(new Csv().deriveSchema())
    .withSchema(schema)
    .createTemporaryTable("MySource2");
tEnv.connect(new FileSystem().path("/sink/path1"))
    .withFormat(new Csv().deriveSchema())
    .withSchema(schema)
    .createTemporaryTable("MySink1");
tEnv.connect(new FileSystem().path("/sink/path2"))
    .withFormat(new Csv().deriveSchema())
    .withSchema(schema)
    .createTemporaryTable("MySink2");

StatementSet stmtSet = tEnv.createStatementSet();

Table table1 = tEnv.from("MySource1").where($("word").like("F%"));
stmtSet.addInsert("MySink1", table1);

Table table2 = table1.unionAll(tEnv.from("MySource2"));
stmtSet.addInsert("MySink2", table2);

String explanation = stmtSet.explain();
System.out.println(explanation);

Scala代码:

val settings = EnvironmentSettings.newInstance.useBlinkPlanner.inStreamingMode.build
val tEnv = TableEnvironment.create(settings)

val schema = new Schema()
    .field("count", DataTypes.INT())
    .field("word", DataTypes.STRING())

tEnv.connect(new FileSystem().path("/source/path1"))
    .withFormat(new Csv().deriveSchema())
    .withSchema(schema)
    .createTemporaryTable("MySource1")
tEnv.connect(new FileSystem().path("/source/path2"))
    .withFormat(new Csv().deriveSchema())
    .withSchema(schema)
    .createTemporaryTable("MySource2")
tEnv.connect(new FileSystem().path("/sink/path1"))
    .withFormat(new Csv().deriveSchema())
    .withSchema(schema)
    .createTemporaryTable("MySink1")
tEnv.connect(new FileSystem().path("/sink/path2"))
    .withFormat(new Csv().deriveSchema())
    .withSchema(schema)
    .createTemporaryTable("MySink2")

val stmtSet = tEnv.createStatementSet()

val table1 = tEnv.from("MySource1").where($"word".like("F%"))
stmtSet.addInsert("MySink1", table1)

val table2 = table1.unionAll(tEnv.from("MySource2"))
stmtSet.addInsert("MySink2", table2)

val explanation = stmtSet.explain()
println(explanation)

以上是关于1.18.2.10 解释表:Table.explain物理执行计划等的主要内容,如果未能解决你的问题,请参考以下文章

我的解释语句中的派生表是啥

在线等,请解释LL(1)分析表是如何构造的,多谢

Oracle解释计划解析——Oracle做全表访问

Wordpress 样式表问题:资源解释为样式表,但使用 MIME 类型传输

资源解释为样式表,但使用 mime 传输

sql所有连接解释