16.FlinkTable & SQL为什么需要Table & SQL

Posted 涂作权的博客

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了16.FlinkTable & SQL为什么需要Table & SQL相关的知识,希望对你有一定的参考价值。

16.FlinkTable & SQL
16.1.为什么需要Table & SQL
16.2.发展历史
16.3.API
16.4.核心概念
16.5.案例1
16.6.案例2
16.7.案例3
16.8.案例4

16.FlinkTable & SQL

16.1.为什么需要Table & SQL

16.2.发展历史

16.3.API

  • 依赖
    https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/

  • 程序结构
    https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/common.html

  • 创建环境
    https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/common.html

  • 创建表
    https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/common.html

  • 查询
    https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/common.html

  • 整合DataStream
    https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/common.html

16.4.核心概念

  • 动态表/无界表
  • 连续查询/需要借助State

16.5.案例1

将DataStream数据转Table和View然后使用sql进行统计查询

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

import java.util.Arrays;

import static org.apache.flink.table.api.Expressions.$;

/**
 * 演示Flink Table & SQL 案例- 将DataStream数据转Table
 *
 * @author tuzuoquan
 * @date 2022/6/2 14:50
 */
public class Demo01 

    public static void main(String[] args) throws Exception 
        //TODO 0.env
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
        StreamTableEnvironment tenv = StreamTableEnvironment.create(env, settings);

        //TODO 1.source
        DataStream<Order> orderA = env.fromCollection(Arrays.asList(
                new Order(1L, "beer", 3),
                new Order(1L, "diaper", 4),
                new Order(3L, "rubber", 2)));

        DataStream<Order> orderB = env.fromCollection(Arrays.asList(
                new Order(2L, "pen", 3),
                new Order(2L, "rubber", 3),
                new Order(4L, "beer", 1)));

        //TODO 2.transformation
        // 将DataStream数据转Table和View,然后查询
        Table tableA = tenv.fromDataStream(orderA, $("user"), $("product"), $("amount"));
        tableA.printSchema();
        System.out.println(tableA);

        System.out.println("==========================================");

        tenv.createTemporaryView("tableB", orderB, $("user"), $("product"), $("amount"));

        //查询:tableA中amount>2的和tableB中amount>1的数据最后合并
        /*
         select * from tableA where amount > 2
         union
         select * from tableB where amount > 1
         */
        String sql = "select * from "+tableA+" where amount > 2 " +
                "union " +
                " select * from tableB where amount > 1";

        Table resultTable = tenv.sqlQuery(sql);
        resultTable.printSchema();
        //UnnamedTable$1
        System.out.println(resultTable);

        System.out.println("==========================================");

        //将Table转为DataStream
        //DataStream<Order> resultDS = tenv.toAppendStream(resultTable, Order.class);//union all使用toAppendStream
        //union使用toRetractStream
        DataStream<Tuple2<Boolean, Order>> resultDS = tenv.toRetractStream(resultTable, Order.class);
        //toAppendStream → 将计算后的数据append到结果DataStream中去
        //toRetractStream  → 将计算后的新的数据在DataStream原数据的基础上更新true或是删除false
        //类似StructuredStreaming中的append/update/complete

        //TODO 3.sink
        resultDS.print();

        //TODO 4.execute
        env.execute();
    

    @Data
    @NoArgsConstructor
    @AllArgsConstructor
    public static class Order 
        public Long user;
        public String product;
        public int amount;
    


输出结果:

root
 |-- user: BIGINT
 |-- product: STRING
 |-- amount: INT

UnnamedTable$0
==========================================
root
 |-- user: BIGINT
 |-- product: STRING
 |-- amount: INT

UnnamedTable$1
==========================================
8> (true,Demo01.Order(user=1, product=diaper, amount=4))
7> (true,Demo01.Order(user=1, product=beer, amount=3))
7> (true,Demo01.Order(user=2, product=pen, amount=3))
7> (true,Demo01.Order(user=2, product=rubber, amount=3))

Process finished with exit code 0

16.6.案例2

使用Table/DSL风格和SQL风格完成WordCount

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

import static org.apache.flink.table.api.Expressions.$;

/**
 * @author tuzuoquan
 * @date 2022/6/2 16:02
 */
public class Demo02 

    public static void main(String[] args) throws Exception 
        //TODO 0.env
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
        StreamTableEnvironment tenv = StreamTableEnvironment.create(env, settings);

        //TODO 1.source
        DataStream<WC> wordsDS = env.fromElements(
                new WC("Hello", 1),
                new WC("World", 1),
                new WC("Hello", 1)
        );

        //TODO 2.transformation
        //将DataStream转为View或Table
        tenv.createTemporaryView("t_words", wordsDS,$("word"), $("frequency"));

        /**
         * select word,sum(frequency) as frequency
         * from t_words
         * group by word
         */
        String sql = "select word,sum(frequency) as frequency " +
                "from t_words " +
                "group by word";

        //执行sql
        Table resultTable = tenv.sqlQuery(sql);

        //转为DataStream
        DataStream<Tuple2<Boolean, WC>> resultDS = tenv.toRetractStream(resultTable, WC.class);
        //toAppendStream → 将计算后的数据append到结果DataStream中去
        //toRetractStream  → 将计算后的新的数据在DataStream原数据的基础上更新true或是删除false
        //类似StructuredStreaming中的append/update/complete

        //TODO 3.sink
        resultDS.print();
        //new WC("Hello", 1),
        //new WC("World", 1),
        //new WC("Hello", 1)
        //输出结果
        //(true,Demo02.WC(word=Hello, frequency=1))
        //(true,Demo02.WC(word=World, frequency=1))
        //(false,Demo02.WC(word=Hello, frequency=1))
        //(true,Demo02.WC(word=Hello, frequency=2))

        //TODO 4.execute
        env.execute();
    

    @Data
    @NoArgsConstructor
    @AllArgsConstructor
    public static class WC 
        public String word;
        public long frequency;
    


输出结构:

7> (true,Demo02.WC(word=Hello, frequency=1))
4> (true,Demo02.WC(word=World, frequency=1))
7> (false,Demo02.WC(word=Hello, frequency=1))
7> (true,Demo02.WC(word=Hello, frequency=2))
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

import static org.apache.flink.table.api.Expressions.$;

/**
 * Desc 演示Flink Table&SQL 案例- 使用SQL和Table两种方式做WordCount
 *
 * @author tuzuoquan
 * @date 2022/6/2 17:41
 */
public class Demo02_2 

    public static void main(String[] args) throws Exception 
        //TODO 0.env
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
        StreamTableEnvironment tenv = StreamTableEnvironment.create(env, settings);

        //TODO 1.source
        DataStream<WC> wordsDS = env.fromElements(
                new WC("Hello", 1),
                new WC("World", 1),
                new WC("Hello", 1)
        );

        //TODO 2.transformation
        //将DataStream转为View或Table
        Table table = tenv.fromDataStream(wordsDS);

        //使用table风格查询/DSL
        Table resultTable = table
                .groupBy($("word"))
                .select($("word"), $("frequency").sum().as("frequency"))
                .filter($("frequency").isEqual(2));

        //转换为DataStream
        DataStream<Tuple2<Boolean, WC>> resultDS = tenv.toRetractStream(resultTable, WC.class);

        //TODO 3.sink
        resultDS.print();

        //TODO 4.execute
        env.execute();
    

    @Data
    @NoArgsConstructor
    @AllArgsConstructor
    public static class WC 
        public String word;
        public long frequency;
    


16.7.案例3

需求:

使用Flink SQL来统计5秒内,每个用户的订单总数、订单的最大金额、订单的最小金额
也就是每隔5秒统计最近5秒的每个用户的订单总数、订单的最大金额、订单的最小金额
上面的需求使用流处理的Window的基本时间的滚动窗口就可以搞定!
那么接下来使用FlinkTable&SQL-API来实现。

要求:使用事件时间+Watermaker+FlinkSQL和Table中的window来实现。

package day4;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;

import java.time.Duration;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.TimeUnit;

import static org.apache.flink.table.api.Expressions.$;

/**
 * 演示Flink Table&SQL 案例- 使用事件时间+Watermaker+window完成订单统计
 *
 * @author tuzuoquan
 * @date 2022/6/2 18:12
 */
public class Demo03 

    public static void main(String[] args) throws Exception 
        //TODO 0.env
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
        StreamTableEnvironment tenv = StreamTableEnvironment.create(env, settings);

        //TODO 1.source
        DataStreamSource<Order> orderDS  = env.addSource(new RichSourceFunction<Order>() 
            private Boolean isRunning = true;
            @Override
            public void run(SourceContext<Order> ctx) throws Exception 
                2021年大数据Flink(三十):Flink ​​​​​​​Table API & SQL 介绍

.~day06-07_FlinkTable&SQL

SQL Server不等式比较运算符,为啥有两个[重复]

pl/sql 绑定错误

SQL Sumif 函数/GROUP BY & SUM?

sql 否定运算符:!= vs <> [重复]