Flink流式计算从入门到实战 四

Posted roykingw

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flink流式计算从入门到实战 四相关的知识,希望对你有一定的参考价值。

Flink流式计算实战专题四

Flink的Table API和SQL

==楼兰

六、Flink Table API 和Flink SQL

1、Table API和SQL是什么?

​ 接下来理解下Flink的整个客户端API体系,Flink为流式/批量处理应用程序提供了不同级别的抽象:

​ 这四层API是一个依次向上支撑的关系。

  • Flin API最底层的抽象就是有状态实时流处理 Stateful Stream Processing,是最底层的Low-Level API。实际上就是基于ProcessFunction提供的一整套API。在上面侧输出流部分,已经接触到了一个示例。这是最灵活,功能最全面的一层客户端API,允许应用程序可以定制复杂的计算过程。但是这一层大部分的常用的功能都已经封装在了上层的Core API当中,大部分的应用都不会需要使用到这一层API。
  • Core APIs主要是DataStream API以及针对批处理的DataSet API。这是最为常用的一套API。其中,又以DataStream API为主。他们其实就是基于一系列ProcessFunction做的一些高层次的封装,可以极大的简化客户端应用程序的开发。
  • Table API主要是表(Table)为中心的声明式编程API。他允许应用程序像操作关系型数据库一样对数据进行一些select\\join\\groupby等典型的逻辑操作,并且也可以通过用户自定义函数进行功能扩展,而不用确切地指定程序指定的代码。当然,Table API的表达能力还是不如Core API灵活。大部分情况下,用户程序应该将Table API和DataStream API混合使用。
  • SQL是Flink API中最顶层的抽象。功能类似于Table API,只是程序实现的是直接的SQL语句支持。本质上还是基于Table API的一层抽象。

​ Table API和Flink SQL是一套给Java和Scalal语言提供的快速查询数据的API,在Python语言客户端中也可以使用。他们是集成在一起的一整套API。通过Table API,用户可以像操作数据库中的表一样查询流式数据。 这里注意Table API主要是针对数据查询操作,而"表"中数据的本质还是对流式数据的抽象。而SQL则是直接在"表"上提供SQL语句支持。

​ 其实这种思路在流式计算中是非常常见的,像kafka Streams中提供了KTable封装,Spark中也提供了SparkSQL进行表操作。不过目前版本的Flink中的Table API和SQL还处在活跃开发阶段,很多特性还并没有完全稳固。

2、如何使用Table API

​ 使用Table API和SQL,需要引入maven依赖。

​ 首先需要引入一个语言包

<!-- java客户端 -->
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-table-api-java-bridge_2.11</artifactId>
  <version>1.12.3</version>
  <scope>provided</scope>
</dependency>

另外也提供了scala语言的依赖版本

<!-- scala客户端 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-scala-bridge_2.11</artifactId>
<version>1.12.3</version>
<scope>provided</scope>
</dependency>

​ 然后需要引入一个Planner

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-table-planner-blink_2.11</artifactId>
  <version>1.12.3</version>
  <scope>provided</scope>
</dependency>

1.9版本之前还有另一个老版本的planner,但是从1.11版本开始官方就已经不建议使用了。

<!-- 针对Flink 1.9以前的老版本,1.11之后官方不建议使用 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_2.11</artifactId>
<version>1.12.3</version>
<scope>provided</scope>
</dependency>

​ 接下来如果要使用一些自定义函数的话,还需要引入一个扩展依赖

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-table-common</artifactId>
  <version>1.12.3</version>
  <scope>provided</scope>
</dependency>

注意下,为什么这些依赖都使用了provided的scope呢?因为这些maven依赖的jara包,在flink的部署环境中都有。如果需要添加一些新的jar包,那就需要手动把jar包复制进去。

3、基础编程框架

​ Flink中对批处理和流处理的Table API 和SQL 程序都遵循一个相同的模式,都是象下面示例中的这种结构。

// create a TableEnvironment for specific planner batch or streaming
TableEnvironment tableEnv = ...;

// create an input Table
tableEnv.executeSql("CREATE TEMPORARY TABLE table1 ... WITH ( 'connector' = ... )");
// register an output Table
tableEnv.executeSql("CREATE TEMPORARY TABLE outputTable ... WITH ( 'connector' = ... )");

// create a Table object from a Table API query
Table table2 = tableEnv.from("table1").select(...);
// create a Table object from a SQL query
Table table3 = tableEnv.sqlQuery("SELECT ... FROM table1 ... ");

// emit a Table API result Table to a TableSink, same for SQL result
TableResult tableResult = table2.executeInsert("outputTable");
tableResult...

​ 基本的步骤都是这么几个:

  • 创建TableEnvironment
  • 将流数据转换成动态表 Table
  • 在动态表上计算一个连续查询,生成一个新的动态表
  • 生成的动态表再次转换回流数据

3.1 创建TableEnvironment

​ TableEnvironment是Table API 和SQL 的核心概念。未来的所有重要操作,例如窗口注册,自定义函数(UDF)注册等,都需要用到这个环境。

​ 对于流式数据,直接通过StreamExecutionEnvironment就可以创建。

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

​ 在构建Table运行环境时,还可以指定一个配置对象。

final EnvironmentSettings environmentSettings = EnvironmentSettings.newInstance()
                .useBlinkPlanner()
                .withBuiltInCatalogName("default_catalog")
                .withBuiltInDatabaseName("default_database").build();
        final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, environmentSettings);

​ 示例中这个配置对象,设置了三个属性,都是取的默认值。

​ 首先关于Planner,Flink从1.11版本开始,就已经将默认的Planner改为了Blink。

​ 然后在配置中指定了Catalog和Database的名字。在Flink中,表对象的层次结构是Catalog -> Database -> Table。这就相当于是mysql中的schema。示例中指定的两个值就是Flink提供的默认值,也可以自行进行指定。

3.2 将流数据转换成动态表 Table

​ Flink中的表Table与关系型数据库中的表Table是有区别的。Flink中的表是随时间不短变化的,流中的每条记录都被解释为对结果表的insert操作。而Flink的TableAPI是让应用程序可以像查询静态表一样查询这些动态表。但是基于动态表的查询,其结果也是动态的,这个查询永远不会停止。所以,也需要用一个动态表来接收动态的查询结果。

final URL resource = FileRead.class.getResource("/stock.txt");
        final String filePath = resource.getFile();
//        final DataStreamSource<String> stream = env.readTextFile(filePath);
        final DataStreamSource<String> dataStream = env.readFile(new TextInputFormat(new Path(filePath)), filePath);
        final SingleOutputStreamOperator<Stock> stockStream = dataStream
                .map((MapFunction<String, Stock>) value -> {
                    final String[] split = value.split(",");
                    return new Stock(split[0], Double.parseDouble(split[1]), split[2], Long.parseLong(split[3]));
                });
final Table stockTable = tableEnv.fromDataStream(stockStream);

​ 其实关键的就是最后这一行。将一个DataStream转换成了一个stockTable。接下来,就可以使用Table API来对stockTable进行类似关系型数据库的操作了。

  final Table table = stockTable.groupBy($("id"), $("stockName"))
                .select($("id"), $("stockName"), $("price").avg().as("priceavg"))
                .where($("stockName").isEqual("UDFStock"));

​ 整个操作过程跟操作一个关系型数据库非常类似。例如示例中的代码,应该一看就能明白。这里需要注意下,对于groupBy,select,where这些操作算子,老版本支持传入字符串,但是在1.12版本中已经标注为过时了。当前版本需要传入一个由$转换成的Expression对象。这个$不是一个特殊的符号,而是Flink中提供的一个静态API。

​ 另外,Flink提供了SQL方式来简化上面的查询过程。

tableEnv.createTemporaryView("stock",stockTable);
        String sql = "select id,stockName,avg(price) as priceavg from stock where stockName='UDFStock' group by id,stockName";
        final Table sqlTable = tableEnv.sqlQuery(sql);

​ 使用SQL需要先注册一个表,然后才能针对表进行SQL查询。注册时,createTemporaryView表示注册一个只与当前任务相关联的临时表。这些临时表在多个Flink会话和集群中都是可见的。

3.3 将Table重新转换为DataStream

​ 通过SQL查询到对应的数据后,通常有两种处理方式:

​ 一种是将查询结果转换回DataStream,进行后续的操作。

 //转换成流
        final DataStream<Tuple2<Boolean, Tuple3<String, String, Double>>> sqlTableDataStream = tableEnv.toRetractStream(sqlTable, TypeInformation.of(new TypeHint<Tuple3<String, String, Double>>() {
        }));
        sqlTableDataStream.print("sql");

​ 另一种是将查询结果插入到另一个表中,并通过另一张表对应Sink将结果输出到目标Sink中。

示例代码 com.roy.flink.table.FileTableDemo 演示了一个基本的对文件系统数据进行索引的Table和SQL操作。

4、扩展编程框架

​ 下面将针对上一章节的几个步骤,进行一部分扩展。

4.1 临时表与永久表

​ 在3.2章节注册动态表时,可以选择注册为临时表或者是永久表。临时表只能在当前任务中访问。任务相关的所有Flink的会话Session和集群Cluster都能够访问表中的数据。但是任务结束后,这个表就会删除。

​ 而永久表则是在Flink集群的整个运行过程中都存在的表。所有任务都可以像访问数据库一样访问这些永久表,直到这个表被显示的删除。

​ 表注册完成之后,可以将Table对象中的数据直接插入到表中。

//创建临时表
tableEnv.createTemporatyView("Order",orders)
//创建永久表
Table orders = tableEnv.from("Orders");
orders.executeInsert("OutOrders");
//老版本的insertInto方法已经过期,不建议使用。

示例代码 com.roy.flink.table.PermanentFileTableDemo 演示了一个基于文件的永久表。

​ Flink的永久表需要一个catalog来维护表的元数据。一旦永久表被创建,任何连接到这个catalog的Flink会话都可见并且持续存在。直到这个表被明确删除。也就是说,永久表是在Flink的会话之间共享的。

​ 而临时表则通常保存于内存中,并且只在创建他的Flink会话中存在。这些表对于其他会话是不可见的。他们也不需要与catalog绑定。临时表是不共享的。

​ 在Table对象中也能对表做一些结构化管理的工作,例如对表中的列进行增加、修改、删除、重命名等操作,但是通常都不建议这样做。原因还是因为Flink针对的是流式数据计算,他的表保存的应该只是计算过程中的临时数据,频繁的表结构变动只是增加计算过程的复杂性。

​ 最后,当一个会话里有两个重名的临时表和永久表时,将会只有临时表生效。如果临时表没有删除,那么永久表就无法访问。这个特性在做开发测试时是非常好用的。可以很容易的做Shadowing影子库测试。

4.2 AppendStream和RetractStream

​ 在3.3章节将Table转换成为DataStream时,我们用的是tableEnv.toRetractStream方法。另外还有一个方法是tableEnv.toAppendStream方法。这两个方法都是将Table转换成为DataStream。但是在我们这个示例com.roy.flink.table.FileTableDemo中如果使用toAppendStream方法,则会报错:

//代码
final DataStream<Tuple3<String, String, Double>> tuple3DataStream 
                = tableEnv.toAppendStream(sqlTable, TypeInformation.of(new TypeHint<Tuple3<String, String, Double>>() {}));
//异常
Exception in thread "main" org.apache.flink.table.api.TableException: toAppendStream doesn't support consuming update changes which is produced by node GroupAggregate(groupBy=[id, stockName], select=[id, stockName, AVG(price) AS priceavg])

​ 异常信息很明显,groupby语句不支持toAppendStream。这是为什么呢?要理解这个异常,就要从这两种结果流模式说起。

​ 我们现在的代码虽然看起来是在用SQL处理批量数据,但是本质上,数据依然是流式的,是一条一条不断进来的。这时,当处理增量数据时,将表的查询结果转换成DataStream时,就有两种不同的方式。

​ 一种是将新来的数据作为新数据,不断的追加到Flink的表中。这种方式就是toApppendStream。

​ 另一种方式是用新来的数据覆盖Flink表中原始的数据。这种方式就是toRestractStream。在他的返回类型中可以看到,他会将boolean与原始结果类型拼装成一个Tuple2组合。前面的这个boolean结果就表示这条数据是覆盖还是插入。true表示插入,false表示覆盖。

​ 很显然,经过groupby这种统计方式后,我们需要的处理结果是分组计算后的一个统计值。这个统计值只能覆盖,不能追加,所以才会有上面的错误。

4.3 内置函数与自定义函数

​ 在SQL操作时,我们经常会调用一些函数,像count()、max()等等。 Flink也提供了非常丰富的内置函数。这些函数即可以在Table API中调用,也可以在SQL中直接调用。调用的方式跟平常在关系型数据库中调用方式差不多。

​ 具体内置函数就不再一一梳理了,可以参见官方文档 https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/dev/table/functions/systemFunctions.html

​ 我们这里重点介绍下自定义函数,UDF。这些自定义函数显著扩展了查询的表达能力。使用自定义函数时需要注意以下两点:

**1、大多数情况下,用户自定义的函数需要先注册,然后才能在查询中使用。**注册的方法有两种

//注册一个临时函数 
tableEnv.createTemporaryFunction(String path, Class<? extends UserDefinedFunction> functionClass);
//注册一个临时的系统函数
 tableEnv.createTemporarySystemFunction(String name, Class<? extends UserDefinedFunction> functionClass);

​ 这两者的区别在于,用户函数只在当前Catalog和Database中生效。而系统函数能由独立于Catalog和Database的全局名称进行标识。所以使用系统函数可以继承Flink的一些内置函数,比如trim,max等

老版本使用的TableEnvironment的registerFunction()方法已经过期。

**2、自定义函数需要按照函数类型继承一个Flink中指定的函数基类。**Flink中有有以下几种函数基类:

  • 标量函数 org.apache.flink.table.functions.ScalarFunction。

    标量函数可以将0个或者多个标量值,映射成一个新的标量值。例如常见的获取当前时间、字符串转大写、加减法、多个字符串拼接,都是属于标量函数。例如下面定义一个hash方法

    public static class HashCode extends ScalarFunction {
    	private int factor = 13;
    	public HashCode(int factor) {
    		this.factor = factor;
    	}
    	public int eval(String s) {
    		return s.hashCode() * factor;
    	}
    }
    
    

示例代码 com.roy.flink.table.ScalarUDFDemo

  • 表函数 org.apache.flink.table.functions.TableFunction

    表函数同样以0个或者多个标量作为输入,但是他可以返回任意数量的行作为输出,而不是单个值。例如下面这个简单的字符串拆分函数

    public class Split extends TableFunction<String> {
       private String separator = ",";
       public Split(String separator) {
          this.separator = separator;
       }
       public void eval(String str) {
         for (String s : str.split(" ")) {
           collect(s);   // use collect(...) to emit an output row
         }
       }
     }
    
    

示例代码 com.roy.flink.table.TableUDFDemo

  • 聚合函数 org.apache.flink.table.functions.AggregateFunction

    聚合函数可以把一个表中一列的数据,聚合成一个标量值。例如常用的max、min、count这些都是聚合函数。定义聚合函数时,首先需要定义个累加器Accumulator,用来保存聚合中间结果的数据结构,可以通过createAccumulator()方法构建空累加器。然后通过accumulate()方法来对每一个输入行进行累加值更新。最后调用getValue()方法来计算并返回最终结果。例如下面是一个计算字符串出现次数的count方法。

    public static class CountFunction extends AggregateFunction<String, CountFunction.MyAccumulator> {
       public static class MyAccumulator {
         public long count = 0L;
       }
    
       public MyAccumulator createAccumulator() {
         return new MyAccumulator();
       }
        
       public void accumulate(MyAccumulator accumulator, Integer i) {
         if (i != null) {
           accumulator.count += i;
         }
       }
    
       public String getValue(MyAccumulator accumulator) {
         return "Result: " + accumulator.count;
       }
     }
    
    

    ​ 常用的自定义函数这些,Flink中也还提供了其他一些函数基类,有兴趣可以再深入了解。另外,这些函数基类都是实现了UserDefinedFunction这个接口,也就是说,应用程序完全可以基于UserDefinedFunction接口进行更深入的函数定制。这里就不再多做介绍了。

​ 另外也可以通过aggregate()函数进行一些聚合操作,例如sum 、max等等。这样将获得一个AggregatedTable。例如

tab.aggregate(call(MyAggregateFunction.class, $("a"), $("b")).as("f0", "f1", "f2"))
   .select($("f0"), $("f1"));

示例代码 com.roy.flink.table.TableUDFDemo

这一块的特性和API都还处在活跃开发阶段,也就是不稳定阶段。所以,相比学习这些代码示例,更重要的是要学会Flink的Table 和 SQL的基础处理思想,并且要学会如何查看源码猜测使用方式。

官方文档通常是最靠谱的资料。具体操作可以参见官方文档:https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/dev/table/tableApi.html

4.4 基于Connector进行数据流转

​ 由于Flink中的流数据,大部分情况下,都是映射的一个外部的数据源,所以,通常创建表时,也需要通过connector映射外部的数据源。关于Connector,之前已经介绍过。基于Connector来注册表的通用方式是这样:

tableEnv
.connect(...) // 定义表的数据来源,和外部系统建立连接
.withFormat(...) // 定义数据格式化方法
.withSchema(...) // 定义表结构
.createTemporaryTable("MyTable"); // 创建临时表

​ 例如,针对文本数据

tableEnv
.connect(
new FileSystem().path(YOUR_Path/sensor.txt”)
) // 定义到文件系统的连接
.withFormat(new Csv()) // 定义以csv格式进行数据格式化
.withSchema( new Schema()
.field("id", DataTypes.STRING())
.field("timestamp", DataTypes.BIGINT())
.field("temperature", DataTypes.DOUBLE())
) // 定义表结构
.createTemporaryTable("sensorTable"); // 创建临时表

针对kafa数据

tableEnv.connect(
    new Kafka()
    .version("0.11")
    .topic("sinkTest")
    .property("zookeeper.connect", "localhost:2181")
    .property("bootstrap.servers", "localhost:9092")
    )
.withFormat( new Csv() )
.withSchema( new Schema()
    .field("id", DataTypes.STRING())
    .field("temp", DataTypes.DOUBLE())
    )
.Flink流式计算从入门到实战 一

Flink流式计算从入门到实战 三

Flink流式计算从入门到实战 四

flink入门-流式计算概念

Flink实战|小米流式平台架构演进与实践

新一代大数据计算引擎 Flink从入门到实战