1.18.2.8与DataStream和DataSet API结合,Scala隐式转换,通过DataSet或DataStream创建视图,将DataStream或DataSet转换成表 等

Posted to.to

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了1.18.2.8与DataStream和DataSet API结合,Scala隐式转换,通过DataSet或DataStream创建视图,将DataStream或DataSet转换成表 等相关的知识,希望对你有一定的参考价值。

1.18.2.8.与DataStream和DataSet API结合
1.18.2.8.1.Scala隐式转换
1.18.2.8.2.通过DataSet或DataStream创建视图
1.18.2.8.3.将DataStream或DataSet转换成表
1.18.2.8.4.将表转换成DataStream或DataSet
1.18.2.8.5.将表转换成DataStream
1.18.2.8.5.1.将表转换成DataSet
1.18.2.8.6.数据类型到Table Schema的映射
1.18.2.8.6.1.原子类型
1.18.2.8.6.2.Tuple类型(Scala和Java)和 Case Class类型(仅Scala)
1.18.2.8.6.3.POJO类型(Java和Scala)
1.18.2.8.6.4.Row类型

1.18.2.8.与DataStream和DataSet API结合

在流处理方面两种计划器都可以与 DataStream API 结合。只有旧计划器可以与 DataSet API 结合。在批处理方面,Blink 计划器不能同两种计划器中的任何一个结合。

**注意:**下文讨论的DataSet API只与旧计划器有关。

Table API 和 SQL 可以被很容易地集成并嵌入到 DataStream 和 DataSet 程序中。例如,可以查询外部表(例如从 RDBMS),进行一些预处理,例如过滤,投影,聚合或与元数据 join,然后使用 DataStream 或 DataSet API(以及在这些 API 之上构建的任何库,例如 CEP 或 Gelly)。相反,也可以将 Table API 或 SQL 查询应用于 DataStream 或 DataSet 程序的结果。

这种交互可以通过 DataStream 或 DataSet 与 Table 的相互转化实现。

1.18.2.8.1.Scala隐式转换

Scala Table API 含有对 DataSet、DataStream 和 Table 类的隐式转换。 通过为 Scala DataStream API 导入org.apache.flink.table.api.bridge.scala._ 包以及 org.apache.flink.api.scala._ 包,可以启用这些转换。

1.18.2.8.2.通过DataSet或DataStream创建视图

在TableEnvironment中可以将DataStream或DataSet注册成视图。结果视图的schema取决于注册的DataStream或DataSet的数据类型。请参阅文档“数据类型到table schema的映射”(https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/dev/table/common.html#mapping-of-data-types-to-table-schema)获取详细信息。

**注意:**通过 DataStream 或 DataSet 创建的视图只能注册成临时视图。
Java代码版

package com.toto.demo.sql;

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import scala.Tuple2;

import static org.apache.flink.table.api.Expressions.$;

public class Demo {

    public static void main(String[] args) {
        // get StreamTableEnvironment
        StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
        EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(bsEnv, bsSettings);
        // or TableEnvironment bsTableEnv = TableEnvironment.create(bsSettings);

        // get StreamTableEnvironment
// registration of a DataSet in a BatchTableEnvironment is equivalent
StreamTableEnvironment tableEnv = ...; // see "Create a TableEnvironment" section

DataStream<Tuple2<Long, String>> stream = ...

// register the DataStream as View "myTable" with fields "f0", "f1"
tableEnv.createTemporaryView("myTable", stream);

// register the DataStream as View "myTable2" with fields "myLong", "myString"
tableEnv.createTemporaryView("myTable2", stream, $("myLong"), $("myString"));        
}

}

Scala版

package com.toto.learn.sql

import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.table.api.EnvironmentSettings
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment

object Demo {

  def main(args: Array[String]): Unit = {
    val bsEnv = StreamExecutionEnvironment.getExecutionEnvironment
    val bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
    val tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(bsEnv, bsSettings)
    // or val bsTableEnv = TableEnvironment.create(bsSettings)

   // get TableEnvironment 
// registration of a DataSet is equivalent
val tableEnv: StreamTableEnvironment = ... // see "Create a TableEnvironment" section

val stream: DataStream[(Long, String)] = ...

// register the DataStream as View "myTable" with fields "f0", "f1"
tableEnv.createTemporaryView("myTable", stream)

// register the DataStream as View "myTable2" with fields "myLong", "myString"
tableEnv.createTemporaryView("myTable2", stream, 'myLong, 'myString')
  }

}
1.18.2.8.3.将DataStream或DataSet转换成表

与在TableEnvironment 注册 DataStream 或 DataSet 不同,DataStream 和 DataSet 还可以直接转换成Table。如果你想在 Table API 的查询中使用表,这将非常便捷。
Java代码版

package com.toto.demo.sql;

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import scala.Tuple2;

import static org.apache.flink.table.api.Expressions.$;

public class Demo {

    public static void main(String[] args) {
        // get StreamTableEnvironment
        StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
        EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(bsEnv, bsSettings);
        // or TableEnvironment bsTableEnv = TableEnvironment.create(bsSettings);

        DataStream<Tuple2<Long, String>> stream = ...

        //Convert the DataStream into a Table with default fields "f0", "f1"
        Table table1 = tableEnv.fromDataStream(stream);

        // Convert the DataStream into a Table with fields "myLong", "myString"
        Table table2 = tableEnv.fromDataStream(stream, $("myLong"), $("myString"));

    }

}

Scala版

package com.toto.learn.sql

import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.table.api.{EnvironmentSettings, Table}
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment

object Demo {

  def main(args: Array[String]): Unit = {
    val bsEnv = StreamExecutionEnvironment.getExecutionEnvironment
    val bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
    val tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(bsEnv, bsSettings)
    // or val bsTableEnv = TableEnvironment.create(bsSettings)

    // get TableEnvironment
    // registration of a DataSet is equivalent
    val tableEnv = ... // see "Create a TableEnvironment" section

    val stream: DataStream[(Long, String)] = ...

    // convert the DataStream into a Table with default fields "_1", "_2"
    val table1: Table = tableEnv.fromDataStream(stream)

    // convert the DataStream into a Table with fields "myLong", "myString"
    val table2: Table = tableEnv.fromDataStream(stream, $"myLong", $"myString")
  }

}
1.18.2.8.4.将表转换成DataStream或DataSet

Table可以被转换成 DataStream 或 DataSet。通过这种方式,定制的 DataSet 或 DataStream 程序就可以在 Table API 或者 SQL 的查询结果上运行了。

将 Table 转换为 DataStream 或者 DataSet 时,你需要指定生成的 DataStream 或者 DataSet 的数据类型,即,Table 的每行数据要转换成的数据类型。通常最方便的选择是转换成 Row 。以下列表概述了不同选项的功能:
Row: 字段按位置映射,字段数量任意,支持 null 值,无类型安全(type-safe)检查。
POJO: 字段按名称映射(POJO 必须按Table 中字段名称命名),字段数量任意,支持 null 值,无类型安全检查。
Case Class: 字段按位置映射,不支持 null 值,有类型安全检查。
Tuple: 字段按位置映射,字段数量少于 22(Scala)或者 25(Java),不支持 null 值,无类型安全检查。
Atomic Type: Table 必须有一个字段,不支持 null 值,有类型安全检查。

1.18.2.8.5.将表转换成DataStream

流式查询(streaming query)的结果表会动态更新,即,当新纪录到达查询的输入流时,查询结果会改变。因此,像这样将动态查询结果转换成 DataStream 需要对表的更新方式进行编码。

将Table 转换为 DataStream 有两种模式:
1.Append Mode: 仅当动态Table 仅通过INSERT更改进行修改时,才可以使用此模式,即,它仅是追加操作,并且之前输出的结果永远不会更新。
2.Retract Mode: 任何情形都可以使用此模式。它使用 boolean 值对 INSERT 和 DELETE 操作的数据进行标记。
Java代码

package com.toto.demo.sql;

import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;

public class Demo {

    public static void main(String[] args) {
        // get StreamTableEnvironment
        StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
        EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(bsEnv, bsSettings);
        // or TableEnvironment bsTableEnv = TableEnvironment.create(bsSettings);

        Table table = ...

        // convert the Table into an append DataStream of Row by specifying the class
        DataStream<Row> dsRow = tableEnv.toAppendStream(table, Row.class);

        // convert the Table into an append DataStream of Tuple2<String, Integer>
        //   via a TypeInformation
        TupleTypeInfo<Tuple2<String, Integer>> tupleType = new TupleTypeInfo<>(
                Types.STRING,
                Types.INT);
        DataStream<Tuple2<String, Integer>> dsTuple = tableEnv.toAppendStream(table, tupleType);

        //convert the Table into a retract DataStream of Row.
        //A retract stream of type X  is a DataStream<Tuple2<Boolean,X>>.
        //The boolean field indicates the type of change.
        //True is INSERT,false is DELETE.
        DataStream<Tuple2<Boolean,Row>> retractStream =
                tableEnv.toRetractStream(table, Row.class);
    }

}

Scala代码

package com.toto.demo.sql;

import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;

public class Demo {

    public static void main(String[] args) {
        // get StreamTableEnvironment
        StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
        EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(bsEnv, bsSettings);
        // or TableEnvironment bsTableEnv = TableEnvironment.create(bsSettings);

        Table table = ...

        // convert the Table into an append DataStream of Row by specifying the class
        DataStream<Row> dsRow = tableEnv.toAppendStream(table, Row.class);

        // convert the Table into an append DataStream of Tuple2<String, Integer>
        //   via a TypeInformation
        TupleTypeInfo<Tuple2<String, Integer>> tupleType = new TupleTypeInfo<>(
                Types.STRING,
                Types.INT);
        DataStream<Tuple2<String, Integer>> dsTuple = tableEnv.toAppendStream(table, tupleType);

        //convert the Table into a retract DataStream of Row.
        //A retract stream of type X  is a DataStream<Tuple2<Boolean,X>>.
        //The boolean field indicates the type of change.
        //True is INSERT,false is DELETE.
        DataStream<Tuple2<Boolean,Row>> retractStream =
                tableEnv.toRetractStream(table, Row.class);
    }

}

注意: 文档动态表给出了有关动态表及其属性的详细讨论。
**注意:**一旦Table被转化为DataStream,必须使用StreamExecutionEnvironment 的 execute 方法执行该 DataStream作业。

1.18.2.8.5.1.将表转换成DataSet

将Table转换成DataSet的过程如下:
Java代码

package com.toto.demo.sql;

import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.BatchTableEnvironment;
import org.apache.flink.types.Row;

public class Demo {

    public static void main(String[] args) {
        ExecutionEnvironment fbEnv = ExecutionEnvironment.getExecutionEnvironment();
        BatchTableEnvironment tableEnv = BatchTableEnvironment.create(fbEnv);

        // Table with two fields (String name, Integer age)
        Table table = ...

        // convert the Table into a DataSet of Row by specifying a class
        DataSet<Row> dsRow = tableEnv.toDataSet(table, Row.class);

        //convert the Table into a DataSet of Tuple2<String, Integer> via a TypeInformation
        TupleTypeInfo<Tuple2<String,Integer>> tupleType = new TupleTypeInfo<>(
                Types.STRING,
                Types.INT
        );
        DataSet<Tuple2<String,Integer>> dsTuple = tableEnv.toDataSet(table,tupleType);

    }

}

Scala代码:

package com.toto.learn.sql

import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment}
import org.apache.flink.table.api.Table
import org.apache.flink.table.api.bridge.scala.BatchTableEnvironment
import org.apache.flink.types.Row

object Demo {

  def main(args: Array[String]): Unit = {
    val env = ExecutionEnvironment.getExecutionEnvironment
    val tableEnv = BatchTableEnvironment.create(env)

    // Table with two fields (String name, Integer age)
    val table: Table = ...

    // convert the Table into a DataSet of Row
    val dsRow: DataSet[Row] = tableEnv.toDataSet[Row](table)

    // convert the Table into a DataSet of Tuple2[String, Int]
    val dsTuple: DataSet[(String, Int)] = tableEnv.toDataSet[(String, Int)](table)

  }

}

**注意:**一旦Table被转化为DataSet,必须使用ExecutionEnvironment 的 execute 方法执行该 DataSet作业。

1.18.2.8.6.数据类型到Table Schema的映射

Flink 的 DataStream 和 DataSet APIs 支持多样的数据类型。例如 Tuple(Scala 内置以及Flink Java tuple)、POJO 类型、Scala case class 类型以及 Flink 的 Row 类型等允许嵌套且有多个可在表的表达式中访问的字段的复合数据类型。其他类型被视为原子类型。下面,我们讨论 Table API 如何将这些数据类型类型转换为内部 row 表示形式,并提供将 DataStream 转换成 Table 的样例。

数据类型到table schema的映射有两种方式:基于字段位置或基于字段名称。

基于位置映射
基于位置的映射可在保持字段顺序的同时为字段提供更有意义的名称。这种映射方式可用于具有特定的字段顺序的复合数据类型以及原子类型。如 tuple、row 以及 case class 这些复合数据类型都有这样的字段顺序。然而,POJO 类型的字段则必须通过名称映射(参见下一章)。可以将字段投影出来,但不能使用as重命名。

定义基于位置的映射时,输入数据类型中一定不能存在指定的名称,否则 API 会假定应该基于字段名称进行映射。如果未指定任何字段名称,则使用默认的字段名称和复合数据类型的字段顺序,或者使用 f0 表示原子类型。

Java代码

package com.toto.demo.sql;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

import static org.apache.flink.table.api.Expressions.$;

public class Demo {

    public static void main(String[] args) {
        EnvironmentSettings fsSettings = EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build();
        StreamExecutionEnvironment fsEnv = StreamExecutionEnvironment.getExecutionEnvironment()以上是关于1.18.2.8与DataStream和DataSet API结合,Scala隐式转换,通过DataSet或DataStream创建视图,将DataStream或DataSet转换成表 等的主要内容,如果未能解决你的问题,请参考以下文章

Flink 1.13,面向流批一体的运行时与 DataStream API 优化

DataStream与DataSet

数据湖(十七):Flink与Iceberg整合DataStream API操作

数据湖(十七):Flink与Iceberg整合DataStream API操作

Flink 源码解读系列 DataStream 时间服务管理器 TimeServiceManager 设计与实现

Flink 源码解读系列 DataStream 时间服务管理器 TimeServiceManager 设计与实现