大数据(9h)FlinkSQL连MySQLKafka

Posted 小基基o_O

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了大数据(9h)FlinkSQL连MySQLKafka相关的知识,希望对你有一定的参考价值。

文章目录

环境:WIN10+JDK1.8+IDEA+Maven3.6.3+FlinkSQL1.13

INSERT 到 connector=print的表

pom.xml

<properties>
    <maven.compiler.source>8</maven.compiler.source>
    <maven.compiler.target>8</maven.compiler.target>
    <flink.version>1.13.6</flink.version>
    <scala.binary.version>2.12</scala.binary.version>
</properties>
<!-- https://mvnrepository.com/ -->
<dependencies>
    <!-- Flink -->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-java</artifactId>
        <version>$flink.version</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-java_$scala.binary.version</artifactId>
        <version>$flink.version</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-scala_$scala.binary.version</artifactId>
        <version>$flink.version</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-clients_$scala.binary.version</artifactId>
        <version>$flink.version</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-runtime-web_$scala.binary.version</artifactId>
        <version>$flink.version</version>
    </dependency>
    <!-- FlinkSQL -->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-table-planner-blink_$scala.binary.version</artifactId>
        <version>$flink.version</version>
    </dependency>
</dependencies>

Java代码

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

public class Hi 
    public static void main(String[] args) 
        //创建流和表的执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);
        StreamTableEnvironment tbEnv = StreamTableEnvironment.create(env);
        //创建表,连接mysql
        tbEnv.executeSql("CREATE TABLE t1(a STRING,b INT)WITH ('connector' = 'print')");
        //写入数据
        tbEnv.executeSql("INSERT INTO t1 VALUES('ab',23)");
    

打印结果
+I[ab, 23]

FlinkSQL连MySQL

pom.xml

<properties>
    <maven.compiler.source>8</maven.compiler.source>
    <maven.compiler.target>8</maven.compiler.target>
    <flink.version>1.13.6</flink.version>
    <scala.binary.version>2.12</scala.binary.version>
    <mysql.version>8.0.31</mysql.version>
</properties>
<!-- https://mvnrepository.com/ -->
<dependencies>
    <!-- Flink -->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-java</artifactId>
        <version>$flink.version</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-java_$scala.binary.version</artifactId>
        <version>$flink.version</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-scala_$scala.binary.version</artifactId>
        <version>$flink.version</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-clients_$scala.binary.version</artifactId>
        <version>$flink.version</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-runtime-web_$scala.binary.version</artifactId>
        <version>$flink.version</version>
    </dependency>
    <!-- FlinkSQL -->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-table-planner-blink_$scala.binary.version</artifactId>
        <version>$flink.version</version>
    </dependency>
    <!-- 'connector' = 'jdbc' -->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-jdbc_$scala.binary.version</artifactId>
        <version>$flink.version</version>
    </dependency>
    <!-- 'driver' = 'com.mysql.cj.jdbc.Driver' -->
    <dependency>
        <groupId>mysql</groupId>
        <artifactId>mysql-connector-java</artifactId>
        <version>$mysql.version</version>
    </dependency>
</dependencies>

MySQL建表

CREATE OR REPLACE VIEW db1.v AS
SELECT 'ab' AS a,23 AS b,6.78 AS c;

Java代码

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

public class Hi 
    public static void main(String[] args) 
        //创建流和表的执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);
        StreamTableEnvironment tbEnv = StreamTableEnvironment.create(env);
        //创建表,连接MySQL表
        tbEnv.executeSql("CREATE TEMPORARY TABLE temp_tb (\\n" +
                "  a STRING,\\n" +
                "  b INT,\\n" +
                "  c DECIMAL(3,2))\\n" +
                "WITH (\\n" +
                "  'connector' = 'jdbc',\\n" +
                "  'driver' = 'com.mysql.cj.jdbc.Driver',\\n" +
                "  'url' = 'jdbc:mysql://localhost:3306/db1',\\n" +
                "  'username' = 'root',\\n" +
                "  'password' = '123456',\\n" +
                "  'table-name' = 'v'\\n" +
                ")");
        //执行查询,打印
        tbEnv.sqlQuery("SELECT * FROM temp_tb").execute().print();
    

FlinkSQL连Kafka

Kafka => FlinkSQL => Kafka

创建主题

kafka-topics --create \\
--bootstrap-server hadoop105:9092 \\
--replication-factor 1  --partitions 1 \\
--topic t1

kafka-topics --create \\
--bootstrap-server hadoop105:9092 \\
--replication-factor 1  --partitions 1 \\
--topic t2

依赖

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-java</artifactId>
    <version>1.13.1</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-clients_2.12</artifactId>
    <version>1.13.1</version>
</dependency>
<!-- 流 -->
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-java_2.12</artifactId>
    <version>1.13.1</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-scala_2.12</artifactId>
    <version>1.13.1</version>
</dependency>
<!-- FlinkSQL -->
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-planner-blink_2.12</artifactId>
    <version>1.13.1</version>
</dependency>
<!-- 'format'='json' -->
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-json</artifactId>
    <version>1.13.1</version>
</dependency>
<!-- 'connector'='kafka' -->
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka_2.12</artifactId>
    <version>1.13.1</version>
</dependency>

Java代码

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

public class K2K 
    public static void main(String[] args) 
        // 1、创建流环境和流式表
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
        // 2、注册SourceTable
        tableEnv.executeSql("CREATE TABLE tb1 (id STRING, ts BIGINT, vc INT) WITH ("
                + "'connector' = 'kafka',"
                + "'topic' = 'topic_source_sensor',"
                + "'properties.bootstrap.servers' = 'hadoop105:9092,hadoop106:9092,hadoop107:9092',"
                + "'properties.group.id' = 'g1',"
                + "'scan.startup.mode' = 'latest-offset',"
                + "'format' = 'json'"
                + ")");
        // 3、注册SinkTable
        tableEnv.executeSql("CREATE TABLE tb2 (id STRING, ts BIGINT, vc INT) WITH ("
                + "'connector' = 'kafka',"
                + "'topic' = 'topic_sink_sensor',"
                + "'properties.bootstrap.servers' = 'hadoop105:9092,hadoop106:9092,hadoop107:9092',"
                + "'format' = 'json'"
                + ")");
        // 4、查询SourceTable数据,写到SinkTable
        tableEnv.executeSql("INSERT INTO tb2 SELECT * FROM tb1 WHERE 1=1");
    

生产Kafka和消费Kafka

kafka-console-producer \\
--broker-list hadoop105:9092 \\
--topic t1
kafka-console-consumer \\
--bootstrap-server hadoop105:9092 \\
--topic t2

开两个窗口查看效果

以上是关于大数据(9h)FlinkSQL连MySQLKafka的主要内容,如果未能解决你的问题,请参考以下文章

大数据(9h)FlinkSQL连MySQLKafka

大数据(9h)FlinkSQL

大数据(9h)FlinkSQL

大数据(9h)FlinkSQL双流JOIN

大数据(9h)FlinkSQL之Lookup Join

大数据(9h)FlinkSQL之Lookup Join