Flinkflink的table api 和sql开发

Posted 星欲冷hx

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flinkflink的table api 和sql开发相关的知识,希望对你有一定的参考价值。

目录

为什么要Table API和SQL

API

案例

导入pom依赖

运行,查看结果

表的合并操作


Table API和SQL介绍

为什么要Table API和SQL

Table API是一种类似于SQL的API,SQL作为一种声明式语言,可以不用关心底层的实习即可以进行数据的处理。

Apache Flink 1.12 Documentation: 概念与通用 API

 

API

  1. 创建表
  2. 查询表
  3. 输出表

 

案例

将DataStream注册为Table和view并进行SQL统计

导入pom依赖

 

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>flink-dataset-demo</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.12</artifactId>
            <version>1.12.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>3.1.4</version>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>RELEASE</version>
            <scope>compile</scope>
        </dependency>
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.47</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java-bridge_2.12</artifactId>
            <version>1.12.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner-blink_2.12</artifactId>
            <version>1.12.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.12</artifactId>
            <version>1.12.2</version>
        </dependency>
    </dependencies>

</project>

 

代码实现

 

 

package cn.edu.hgu.flink.table;

import cn.edu.hgu.flink.DataStream.model.Student;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

import java.util.Arrays;

import static org.apache.flink.table.api.Expressions.$;

/**
 * flink的table API和 SQL演示
 */

public class FlinkTableDemo 
    public static void main(String[] args) throws Exception
        //1.env
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
        //2.table env
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
        //3.source
        DataStream<Student> StudnetA = env.fromCollection(Arrays.asList(
                new Student(101,"张三",20),
                new Student(102,"李四",18),
                new Student(103,"王五",19)
        ));
        DataStream<Student> StudentB = env.fromCollection(Arrays.asList(
                new Student(201,"赵六",21),
                new Student(202,"钱七",19),
                new Student(203,"孙八",18)
        ));
        //3.注册表
        //3.1 转换一个流为一个表
        Table tableA = tEnv.fromDataStream(StudnetA,$("id"),$("name"),$("age"));
        //3.2 注册一个流为表
        tEnv.createTemporaryView("StudentB",StudentB,$("id"),$("name"),$("age"));
        //4.transformation
        Table tableB = tEnv.sqlQuery("select * from StudentB where age>=19");//SQL
        //合并
        Table unionTable = tEnv.sqlQuery(
                "select * from " + tableA + " where age<19 " +
                        "union all " +
                        "select * from StudentB where age >= 19"
        );
        //5.sink
        //把表转换为流
        DataStream<Student> resultA = tEnv.toAppendStream(tableA,Student.class);
        DataStream<Student> resultB = tEnv.toAppendStream(tableB,Student.class);
        DataStream<Student> resultUnion = tEnv.toAppendStream(unionTable,Student.class);
//        tableA.printSchema();
//        tableB.printSchema();
//        resultA.print();
//        resultB.print();
        resultUnion.print();
        //6.execute
        env.execute();

    

 

运行,查看结果

 

表的合并操作

 

下一章介绍的是flink的四大基石

以上是关于Flinkflink的table api 和sql开发的主要内容,如果未能解决你的问题,请参考以下文章

FlinkFLink SQL TableException: Table sink doesn‘t support consuming update changes which is

FlinkFlink SQL 一个流 输出 2 次

FlinkFlink Row类型转Table类型

FlinkFlink Flink 1.14 新特性预览

翻译Flink Table 和 SQL API 概念与通用API

FlinkFlink 源码阅读笔记(18)- Flink SQL 中的流和动态表