Flink学习:Flink Table/Sql API
Posted 我爱夜来香A
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flink学习:Flink Table/Sql API相关的知识,希望对你有一定的参考价值。
Flink Table / Sql
- 和DataStream API一样,Table API和Sql中具有相同的编程模型,首先需要构建对应的TableEnvironment创建关系型编程环境,才能够在程序中使用Table API和Sql来编写应用程序,另外Table API和Sql接口可以在应用中同时使用
一、获取TableEnvironment
1、流式应用
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.TableEnvironment
object SqlTest
def main(args: Array[String]): Unit =
val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
val tEnv = TableEnvironment.getTableEnvironment(streamEnv)
2、批式应用
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.TableEnvironment
object SqlTest
def main(args: Array[String]): Unit =
val Env = ExecutionEnvironment.getExecutionEnvironment
val tEnv = TableEnvironment.getTableEnvironment(streamEnv)
- 首先需要创建执行环境TableEnvironment,方法中的参数分别是每种应用类型(流式应用和批式应用)对应的执行环境
二、注册Catalog
(一)、内部Catalog注册
1、内部Table注册
- 通过scan方法从内部Catalog找到表名并select一些字段形成一个新表
- 通过registerTable把新表注册到Catalog中
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.TableEnvironment
object SqlTest
def main(args: Array[String]): Unit =
val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
val tEnv = TableEnvironment.getTableEnvironment(streamEnv)
val projTable = tableEnv.scan("SensorsTable").select(...)
tableEnv.registerTable("projectedTable",projTable)
2、TableSource注册
在这里插入代码片
3、TableSink注册
(二)、外部Catalog
除了能够使用Flink内部的Catalog作为所有Table数据的元数据存储介质之外,也可以使用外部Catalog,外部Catalog需要用户自定义实现,如下述代码所示,需要实现InMemoryExternalCatalog接口
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.TableEnvironment, Types
import org.apache.flink.table.catalog.InMemoryExternalCatalog
object SqlTest
def main(args: Array[String]): Unit =
val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
val tEnv = TableEnvironment.getTableEnvironment(streamEnv)
val InmemCatalog = new InMemoryExternalCatalog()
tEnv.registerExternalCatalog("externalCatalog",InmemCatalog)
(三)、DataStream/DataSet转换为Table
Table/Sql API是构建在DataStream、DataSet API之上的更高级的抽象,可以将DataStream、DataSet转换为Table
1、DataStream注册成Table(registerDatastream)
import org.apache.flink.api.scala.createTypeInformation
import org.apache.flink.streaming.api.environment._
import org.apache.flink.api.java.tuple._
import org.apache.flink.table.api.scala.table2TableConversions
import org.apache.flink.table.api.TableEnvironment, Types
import org.apache.flink.table.sources.CsvTableSource
import org.apache.flink.types.Row
object sqlTest
def main(args: Array[String]): Unit =
val sEnv = StreamExecutionEnvironment.getExecutionEnvironment
val tEnv = TableEnvironment.getTableEnvironment(sEnv)
val stream = sEnv.fromElements(new Tuple2(192,"nie"),
new Tuple2(200,"hu"))
//将DataStream注册成Table,指定表名为testTable
tEnv.registerDataStream("testTable", stream)
//将DataStream注册成Table,指定表名为testTable,并指定字段名称id,name
tEnv.registerDataStream("testTable", stream, "id,name")
2、DataStream转换成Table(fromDataStream)
import org.apache.flink.api.scala.createTypeInformation
import org.apache.flink.streaming.api.environment._
import org.apache.flink.api.java.tuple._
import org.apache.flink.table.api.scala.table2TableConversions
import org.apache.flink.table.api.TableEnvironment, Types
import org.apache.flink.table.sources.CsvTableSource
import org.apache.flink.types.Row
object SqlTest
def main(args: Array[String]): Unit =
val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
val tEnv = TableEnvironment.getTableEnvironment(streamEnv)
val stream = sEnv.fromElements(new Tuple2(192,"nie"),
new Tuple2(200,"hu"))
//将DataStream转换为Table
val table1:Table = tEnv.fromDataStream(stream)
//将DataStream转换为Table,并指定字段名称id,name
val table1:Table = tEnv.fromDataStream(stream,"id,name")
tEnv.registerTable("testTable",table1)
三、执行
(一)、Sql API
Flink Sql可以借助于TableEnvironment的SqlQuery和SqlUpdate两种操作符使用
- sqlQuery:从执行的Table中查询并处理数据形成新的Table
- sqlUpdate:通过sql语句将查询的结果写入到注册的表中
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.Table, TableEnvironment, Types
import org.apache.flink.table.catalog.InMemoryExternalCatalog
object SqlTest
def main(args: Array[String]): Unit =
val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
val tEnv = TableEnvironment.getTableEnvironment(streamEnv)
val stream : DataStream[(Long,String)] = streamEnv.fromElements((192,"foo"),(122,"fun"))
tEnv.registerDataStream("testTable",stream)
val result = tEnv.sqlQuery("select * from testTable")
(二)、Table API
以上是关于Flink学习:Flink Table/Sql API的主要内容,如果未能解决你的问题,请参考以下文章
Flink扩展 Table/SQL Scalar 函数实现文档