大数据(9h)FlinkSQL

Posted 小基基o_O

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了大数据(9h)FlinkSQL相关的知识,希望对你有一定的参考价值。

文章目录

环境:WIN10+IDEA+Flink1.13

概述

Flink有两种关系型API来做流批统一处理:Table APISQL

  • 动态表(Dynamic Tables)是Flink支持流数据的Table APISQL的核心概念
    动态表是随时间变化
    查询动态表将生成一个连续查询(Continuous Query)

动态表

连续查询

额外依赖

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-planner-blink_$scala.binary.version</artifactId>
    <version>$flink.version</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-scala_$scala.binary.version</artifactId>
    <version>$flink.version</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-csv</artifactId>
    <version>$flink.version</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-json</artifactId>
    <version>$flink.version</version>
</dependency>

Table API大概就是下面这种比较丑的写法,本文不予详述

Table studentTable = tableEnv.from("student");
Table resultTable = studentTable
    .groupBy($("id"))
    .select($("id"), $("id").count().as("cnt"));

FlinkSQL代码

pom.xml

<properties>
    <maven.compiler.source>8</maven.compiler.source>
    <maven.compiler.target>8</maven.compiler.target>
    <flink.version>1.13.1</flink.version>
    <scala.binary.version>2.12</scala.binary.version>
</properties>
<dependencies>
    <!-- Flink -->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-java</artifactId>
        <version>$flink.version</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-java_$scala.binary.version</artifactId>
        <version>$flink.version</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-table-planner-blink_$scala.binary.version</artifactId>
        <version>$flink.version</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-clients_$scala.binary.version</artifactId>
        <version>$flink.version</version>
    </dependency>
    <!-- Lombok -->
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <version>1.18.16</version>
    </dependency>
</dependencies>

JavaBean

Lombok通过注解的方式 简化JavaBean代码
@Data:自动生成setgettoStringequalshashCode等方法

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

/**
 * sid:学号
 * age:年龄
 */
@Data
@NoArgsConstructor
@AllArgsConstructor
public class Student 
    private String sid;
    private Integer age;

查询

import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

public class Hello 
    public static void main(String[] args) throws Exception 
        // 1、创建流执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStreamSource<Student> StudentStream =
                env.fromElements(
                        new Student("s01", 9),
                        new Student("s02", 11),
                        new Student("s03", 11),
                        new Student("s04", 12),
                        new Student("s05", 9));
        // 2、创建流式表环境
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
        // 3、数据流=>动态表
        Table inputTable = tableEnv.fromDataStream(StudentStream);
        // 4、查询未注册的表
        Table resultTable = tableEnv.sqlQuery("SELECT * FROM " + inputTable + " WHERE age>9");
        // 5、动态表=>数据流=>打印
        tableEnv.toDataStream(resultTable).print();
        // 6、执行
        env.execute();
    

打印结果

+I[s02, 11]
+I[s03, 11]
+I[s04, 12]

使用临时视图

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

import static org.apache.flink.table.api.Expressions.$;

public class Hello3 
    public static void main(String[] args) throws Exception 
        // 1、创建流环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStreamSource<Tuple2<String, Integer>> cStream =
                env.fromElements(
                        Tuple2.of("c1", 5),
                        Tuple2.of("c2", 8),
                        Tuple2.of("c1", 5),
                        Tuple2.of("c2", 9));
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
        // 2、数据量=>动态表
        Table inputTable = tableEnv.fromDataStream(cStream, $("c"), $("vol"));
        // 3、创建临时视图
        tableEnv.createTemporaryView("t", inputTable);
        // 4、查询临时视图,返回结果表
        Table resultTable = tableEnv.sqlQuery("SELECT c,SUM(vol) FROM t GROUP BY c");
        // 5、打印输出
        tableEnv.toChangelogStream(resultTable).print();
        // 6、执行
        env.execute();
    

打印结果

+I[c1, 5]
+I[c2, 8]
-U[c1, 5]
+U[c1, 10]
-U[c2, 8]
+U[c2, 17]

Kafka到Kafka

创建主题

kafka-topics --create \\
--bootstrap-server hadoop105:9092 \\
--replication-factor 1  --partitions 1 \\
--topic t1

kafka-topics --create \\
--bootstrap-server hadoop105:9092 \\
--replication-factor 1  --partitions 1 \\
--topic t2

Java代码

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-java</artifactId>
    <version>$flink.version</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-clients_$scala.binary.version</artifactId>
    <version>$flink.version</version>
</dependency>
<!-- 流 -->
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-java_$scala.binary.version</artifactId>
    <version>$flink.version</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-scala_$scala.binary.version</artifactId>
    <version>$flink.version</version>
</dependency>
<!-- FlinkSQL -->
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-planner-blink_$scala.binary.version</artifactId>
    <version>$flink.version</version>
</dependency>
<!-- 'format'='json' -->
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-json</artifactId>
    <version>$flink.version</version>
</dependency>
<!-- 'connector'='kafka' -->
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka_2.12</artifactId>
    <version>$flink.version</version>
</dependency>
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

public class K2K 
    public static void main(String[] args) 
        // 1、创建流环境和流式表
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
        // 2、注册SourceTable
        tableEnv.executeSql("CREATE TABLE tb1 (id STRING, ts BIGINT, vc INT) WITH ("
                + "'connector' = 'kafka',"
                + "'topic' = 'topic_source_sensor',"
                + "'properties.bootstrap.servers' = 'hadoop105:9092,hadoop106:9092,hadoop107:9092',"
                + "'properties.group.id' = 'g1',"
                + "'scan.startup.mode' = 'latest-offset',"
                + "'format' = 'json'"
                + ")");
        // 3、注册SinkTable
        tableEnv.executeSql("CREATE TABLE tb2 (id STRING, ts BIGINT, vc INT) WITH ("
                + "'connector' = 'kafka',"
                + "'topic' = 'topic_sink_sensor',"
                + "'properties.bootstrap.servers' = 'hadoop105:9092,hadoop106:9092,hadoop107:9092',"
                + "'format' = 'json'"
                + ")");
        // 4、查询SourceTable数据,写到SinkTable
        tableEnv.executeSql("INSERT INTO tb2 SELECT * FROM tb1 WHERE 1=1");
    

生产Kafka和消费Kafka

kafka-console-producer \\
--broker-list hadoop105:9092 \\
--topic t1
kafka-console-consumer \\
--bootstrap-server hadoop105:9092 \\
--topic t2

开两个窗口查看效果

以上是关于大数据(9h)FlinkSQL的主要内容,如果未能解决你的问题,请参考以下文章

大数据(9h)FlinkSQL连MySQLKafka

大数据(9h)FlinkSQL

大数据(9h)FlinkSQL

大数据(9h)FlinkSQL双流JOIN

大数据(9h)FlinkSQL之Lookup Join

大数据(9h)FlinkSQL之Lookup Join