Flink学习:Flink Table/Sql API

Posted 我爱夜来香A

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flink学习:Flink Table/Sql API相关的知识,希望对你有一定的参考价值。

Flink Table / Sql

  • 和DataStream API一样,Table API和Sql中具有相同的编程模型,首先需要构建对应的TableEnvironment创建关系型编程环境,才能够在程序中使用Table API和Sql来编写应用程序,另外Table API和Sql接口可以在应用中同时使用

一、获取TableEnvironment

1、流式应用

import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.TableEnvironment

object SqlTest 
  def main(args: Array[String]): Unit = 
    val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
    val tEnv = TableEnvironment.getTableEnvironment(streamEnv)
  

2、批式应用

import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.TableEnvironment

object SqlTest 
  def main(args: Array[String]): Unit = 
    val Env = ExecutionEnvironment.getExecutionEnvironment
    val tEnv = TableEnvironment.getTableEnvironment(streamEnv)
  

  • 首先需要创建执行环境TableEnvironment,方法中的参数分别是每种应用类型(流式应用和批式应用)对应的执行环境

二、注册Catalog

(一)、内部Catalog注册

1、内部Table注册

  • 通过scan方法从内部Catalog找到表名并select一些字段形成一个新表
  • 通过registerTable把新表注册到Catalog中
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.TableEnvironment

object SqlTest 
  def main(args: Array[String]): Unit = 
   
    val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
    val tEnv = TableEnvironment.getTableEnvironment(streamEnv)
	
	val projTable = tableEnv.scan("SensorsTable").select(...)
	tableEnv.registerTable("projectedTable",projTable)
  

2、TableSource注册

在这里插入代码片

3、TableSink注册

(二)、外部Catalog

除了能够使用Flink内部的Catalog作为所有Table数据的元数据存储介质之外,也可以使用外部Catalog,外部Catalog需要用户自定义实现,如下述代码所示,需要实现InMemoryExternalCatalog接口

import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.TableEnvironment, Types
import org.apache.flink.table.catalog.InMemoryExternalCatalog

object SqlTest 
  def main(args: Array[String]): Unit = 
    val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
    val tEnv = TableEnvironment.getTableEnvironment(streamEnv)

    val InmemCatalog = new InMemoryExternalCatalog()
    tEnv.registerExternalCatalog("externalCatalog",InmemCatalog)
  

(三)、DataStream/DataSet转换为Table

Table/Sql API是构建在DataStream、DataSet API之上的更高级的抽象,可以将DataStream、DataSet转换为Table
1、DataStream注册成Table(registerDatastream)

import org.apache.flink.api.scala.createTypeInformation
import org.apache.flink.streaming.api.environment._
import org.apache.flink.api.java.tuple._
import org.apache.flink.table.api.scala.table2TableConversions
import org.apache.flink.table.api.TableEnvironment, Types
import org.apache.flink.table.sources.CsvTableSource
import org.apache.flink.types.Row

object sqlTest 
  def main(args: Array[String]): Unit = 
    val sEnv = StreamExecutionEnvironment.getExecutionEnvironment
    val tEnv = TableEnvironment.getTableEnvironment(sEnv)

    val stream = sEnv.fromElements(new Tuple2(192,"nie"),
      new Tuple2(200,"hu"))

    //将DataStream注册成Table,指定表名为testTable
    tEnv.registerDataStream("testTable", stream)
    //将DataStream注册成Table,指定表名为testTable,并指定字段名称id,name
    tEnv.registerDataStream("testTable", stream, "id,name")
  

2、DataStream转换成Table(fromDataStream)

import org.apache.flink.api.scala.createTypeInformation
import org.apache.flink.streaming.api.environment._
import org.apache.flink.api.java.tuple._
import org.apache.flink.table.api.scala.table2TableConversions
import org.apache.flink.table.api.TableEnvironment, Types
import org.apache.flink.table.sources.CsvTableSource
import org.apache.flink.types.Row

object SqlTest 
  def main(args: Array[String]): Unit = 
    val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
    val tEnv = TableEnvironment.getTableEnvironment(streamEnv)
    
    val stream = sEnv.fromElements(new Tuple2(192,"nie"),
      new Tuple2(200,"hu"))
    
    //将DataStream转换为Table
    val table1:Table = tEnv.fromDataStream(stream)
    //将DataStream转换为Table,并指定字段名称id,name
    val table1:Table = tEnv.fromDataStream(stream,"id,name")
    tEnv.registerTable("testTable",table1)
  

三、执行

(一)、Sql API

Flink Sql可以借助于TableEnvironment的SqlQuery和SqlUpdate两种操作符使用

  • sqlQuery:从执行的Table中查询并处理数据形成新的Table
  • sqlUpdate:通过sql语句将查询的结果写入到注册的表中
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.Table, TableEnvironment, Types
import org.apache.flink.table.catalog.InMemoryExternalCatalog

object SqlTest 
  def main(args: Array[String]): Unit = 
    val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
    val tEnv = TableEnvironment.getTableEnvironment(streamEnv)

    val stream : DataStream[(Long,String)] = streamEnv.fromElements((192,"foo"),(122,"fun"))
	tEnv.registerDataStream("testTable",stream)
	val result = tEnv.sqlQuery("select * from testTable")
  

(二)、Table API

以上是关于Flink学习:Flink Table/Sql API的主要内容,如果未能解决你的问题,请参考以下文章

Flink扩展 Table/SQL Scalar 函数实现文档

Flink扩展 Table/SQL Scalar 函数的实现

FlinkFlink Flink 1.14 新特性预览

深入解读 Flink SQL 1.13版本新功能

Flink学习笔记01:初探Flink

Flink学习笔记:搭建Flink on Yarn环境并运行Flink应用