16.FlinkTable & SQL为什么需要Table & SQL
Posted 涂作权的博客
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了16.FlinkTable & SQL为什么需要Table & SQL相关的知识,希望对你有一定的参考价值。
16.FlinkTable & SQL
16.1.为什么需要Table & SQL
16.2.发展历史
16.3.API
16.4.核心概念
16.5.案例1
16.6.案例2
16.7.案例3
16.8.案例4
16.FlinkTable & SQL
16.1.为什么需要Table & SQL
16.2.发展历史
16.3.API
-
依赖
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/
-
程序结构
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/common.html
-
创建环境
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/common.html
-
创建表
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/common.html
-
查询
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/common.html
-
整合DataStream
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/common.html
16.4.核心概念
- 动态表/无界表
- 连续查询/需要借助State
16.5.案例1
将DataStream数据转Table和View然后使用sql进行统计查询
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import java.util.Arrays;
import static org.apache.flink.table.api.Expressions.$;
/**
* 演示Flink Table & SQL 案例- 将DataStream数据转Table
*
* @author tuzuoquan
* @date 2022/6/2 14:50
*/
public class Demo01
public static void main(String[] args) throws Exception
//TODO 0.env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment tenv = StreamTableEnvironment.create(env, settings);
//TODO 1.source
DataStream<Order> orderA = env.fromCollection(Arrays.asList(
new Order(1L, "beer", 3),
new Order(1L, "diaper", 4),
new Order(3L, "rubber", 2)));
DataStream<Order> orderB = env.fromCollection(Arrays.asList(
new Order(2L, "pen", 3),
new Order(2L, "rubber", 3),
new Order(4L, "beer", 1)));
//TODO 2.transformation
// 将DataStream数据转Table和View,然后查询
Table tableA = tenv.fromDataStream(orderA, $("user"), $("product"), $("amount"));
tableA.printSchema();
System.out.println(tableA);
System.out.println("==========================================");
tenv.createTemporaryView("tableB", orderB, $("user"), $("product"), $("amount"));
//查询:tableA中amount>2的和tableB中amount>1的数据最后合并
/*
select * from tableA where amount > 2
union
select * from tableB where amount > 1
*/
String sql = "select * from "+tableA+" where amount > 2 " +
"union " +
" select * from tableB where amount > 1";
Table resultTable = tenv.sqlQuery(sql);
resultTable.printSchema();
//UnnamedTable$1
System.out.println(resultTable);
System.out.println("==========================================");
//将Table转为DataStream
//DataStream<Order> resultDS = tenv.toAppendStream(resultTable, Order.class);//union all使用toAppendStream
//union使用toRetractStream
DataStream<Tuple2<Boolean, Order>> resultDS = tenv.toRetractStream(resultTable, Order.class);
//toAppendStream → 将计算后的数据append到结果DataStream中去
//toRetractStream → 将计算后的新的数据在DataStream原数据的基础上更新true或是删除false
//类似StructuredStreaming中的append/update/complete
//TODO 3.sink
resultDS.print();
//TODO 4.execute
env.execute();
@Data
@NoArgsConstructor
@AllArgsConstructor
public static class Order
public Long user;
public String product;
public int amount;
输出结果:
root
|-- user: BIGINT
|-- product: STRING
|-- amount: INT
UnnamedTable$0
==========================================
root
|-- user: BIGINT
|-- product: STRING
|-- amount: INT
UnnamedTable$1
==========================================
8> (true,Demo01.Order(user=1, product=diaper, amount=4))
7> (true,Demo01.Order(user=1, product=beer, amount=3))
7> (true,Demo01.Order(user=2, product=pen, amount=3))
7> (true,Demo01.Order(user=2, product=rubber, amount=3))
Process finished with exit code 0
16.6.案例2
使用Table/DSL风格和SQL风格完成WordCount
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import static org.apache.flink.table.api.Expressions.$;
/**
* @author tuzuoquan
* @date 2022/6/2 16:02
*/
public class Demo02
public static void main(String[] args) throws Exception
//TODO 0.env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment tenv = StreamTableEnvironment.create(env, settings);
//TODO 1.source
DataStream<WC> wordsDS = env.fromElements(
new WC("Hello", 1),
new WC("World", 1),
new WC("Hello", 1)
);
//TODO 2.transformation
//将DataStream转为View或Table
tenv.createTemporaryView("t_words", wordsDS,$("word"), $("frequency"));
/**
* select word,sum(frequency) as frequency
* from t_words
* group by word
*/
String sql = "select word,sum(frequency) as frequency " +
"from t_words " +
"group by word";
//执行sql
Table resultTable = tenv.sqlQuery(sql);
//转为DataStream
DataStream<Tuple2<Boolean, WC>> resultDS = tenv.toRetractStream(resultTable, WC.class);
//toAppendStream → 将计算后的数据append到结果DataStream中去
//toRetractStream → 将计算后的新的数据在DataStream原数据的基础上更新true或是删除false
//类似StructuredStreaming中的append/update/complete
//TODO 3.sink
resultDS.print();
//new WC("Hello", 1),
//new WC("World", 1),
//new WC("Hello", 1)
//输出结果
//(true,Demo02.WC(word=Hello, frequency=1))
//(true,Demo02.WC(word=World, frequency=1))
//(false,Demo02.WC(word=Hello, frequency=1))
//(true,Demo02.WC(word=Hello, frequency=2))
//TODO 4.execute
env.execute();
@Data
@NoArgsConstructor
@AllArgsConstructor
public static class WC
public String word;
public long frequency;
输出结构:
7> (true,Demo02.WC(word=Hello, frequency=1))
4> (true,Demo02.WC(word=World, frequency=1))
7> (false,Demo02.WC(word=Hello, frequency=1))
7> (true,Demo02.WC(word=Hello, frequency=2))
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import static org.apache.flink.table.api.Expressions.$;
/**
* Desc 演示Flink Table&SQL 案例- 使用SQL和Table两种方式做WordCount
*
* @author tuzuoquan
* @date 2022/6/2 17:41
*/
public class Demo02_2
public static void main(String[] args) throws Exception
//TODO 0.env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment tenv = StreamTableEnvironment.create(env, settings);
//TODO 1.source
DataStream<WC> wordsDS = env.fromElements(
new WC("Hello", 1),
new WC("World", 1),
new WC("Hello", 1)
);
//TODO 2.transformation
//将DataStream转为View或Table
Table table = tenv.fromDataStream(wordsDS);
//使用table风格查询/DSL
Table resultTable = table
.groupBy($("word"))
.select($("word"), $("frequency").sum().as("frequency"))
.filter($("frequency").isEqual(2));
//转换为DataStream
DataStream<Tuple2<Boolean, WC>> resultDS = tenv.toRetractStream(resultTable, WC.class);
//TODO 3.sink
resultDS.print();
//TODO 4.execute
env.execute();
@Data
@NoArgsConstructor
@AllArgsConstructor
public static class WC
public String word;
public long frequency;
16.7.案例3
需求:
使用Flink SQL来统计5秒内,每个用户的订单总数、订单的最大金额、订单的最小金额
也就是每隔5秒统计最近5秒的每个用户的订单总数、订单的最大金额、订单的最小金额
上面的需求使用流处理的Window的基本时间的滚动窗口就可以搞定!
那么接下来使用FlinkTable&SQL-API来实现。
要求:使用事件时间+Watermaker+FlinkSQL和Table中的window来实现。
package day4;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import java.time.Duration;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import static org.apache.flink.table.api.Expressions.$;
/**
* 演示Flink Table&SQL 案例- 使用事件时间+Watermaker+window完成订单统计
*
* @author tuzuoquan
* @date 2022/6/2 18:12
*/
public class Demo03
public static void main(String[] args) throws Exception
//TODO 0.env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment tenv = StreamTableEnvironment.create(env, settings);
//TODO 1.source
DataStreamSource<Order> orderDS = env.addSource(new RichSourceFunction<Order>()
private Boolean isRunning = true;
@Override
public void run(SourceContext<Order> ctx) throws Exception
2021年大数据Flink(三十):Flink Table API & SQL 介绍