大数据(9h)FlinkSQL连MySQLKafka
Posted 小基基o_O
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了大数据(9h)FlinkSQL连MySQLKafka相关的知识,希望对你有一定的参考价值。
文章目录
环境:WIN10+JDK1.8+IDEA+Maven3.6.3+FlinkSQL1.13
INSERT 到 connector=print的表
pom.xml
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<flink.version>1.13.6</flink.version>
<scala.binary.version>2.12</scala.binary.version>
</properties>
<!-- https://mvnrepository.com/ -->
<dependencies>
<!-- Flink -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>$flink.version</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_$scala.binary.version</artifactId>
<version>$flink.version</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_$scala.binary.version</artifactId>
<version>$flink.version</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_$scala.binary.version</artifactId>
<version>$flink.version</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime-web_$scala.binary.version</artifactId>
<version>$flink.version</version>
</dependency>
<!-- FlinkSQL -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_$scala.binary.version</artifactId>
<version>$flink.version</version>
</dependency>
</dependencies>
Java代码
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
public class Hi
public static void main(String[] args)
//创建流和表的执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);
StreamTableEnvironment tbEnv = StreamTableEnvironment.create(env);
//创建表,连接mysql表
tbEnv.executeSql("CREATE TABLE t1(a STRING,b INT)WITH ('connector' = 'print')");
//写入数据
tbEnv.executeSql("INSERT INTO t1 VALUES('ab',23)");
-
打印结果
-
+I[ab, 23]
FlinkSQL连MySQL
pom.xml
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<flink.version>1.13.6</flink.version>
<scala.binary.version>2.12</scala.binary.version>
<mysql.version>8.0.31</mysql.version>
</properties>
<!-- https://mvnrepository.com/ -->
<dependencies>
<!-- Flink -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>$flink.version</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_$scala.binary.version</artifactId>
<version>$flink.version</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_$scala.binary.version</artifactId>
<version>$flink.version</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_$scala.binary.version</artifactId>
<version>$flink.version</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime-web_$scala.binary.version</artifactId>
<version>$flink.version</version>
</dependency>
<!-- FlinkSQL -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_$scala.binary.version</artifactId>
<version>$flink.version</version>
</dependency>
<!-- 'connector' = 'jdbc' -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc_$scala.binary.version</artifactId>
<version>$flink.version</version>
</dependency>
<!-- 'driver' = 'com.mysql.cj.jdbc.Driver' -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>$mysql.version</version>
</dependency>
</dependencies>
MySQL建表
CREATE OR REPLACE VIEW db1.v AS
SELECT 'ab' AS a,23 AS b,6.78 AS c;
Java代码
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
public class Hi
public static void main(String[] args)
//创建流和表的执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);
StreamTableEnvironment tbEnv = StreamTableEnvironment.create(env);
//创建表,连接MySQL表
tbEnv.executeSql("CREATE TEMPORARY TABLE temp_tb (\\n" +
" a STRING,\\n" +
" b INT,\\n" +
" c DECIMAL(3,2))\\n" +
"WITH (\\n" +
" 'connector' = 'jdbc',\\n" +
" 'driver' = 'com.mysql.cj.jdbc.Driver',\\n" +
" 'url' = 'jdbc:mysql://localhost:3306/db1',\\n" +
" 'username' = 'root',\\n" +
" 'password' = '123456',\\n" +
" 'table-name' = 'v'\\n" +
")");
//执行查询,打印
tbEnv.sqlQuery("SELECT * FROM temp_tb").execute().print();
FlinkSQL连Kafka
Kafka => FlinkSQL => Kafka
创建主题
kafka-topics --create \\
--bootstrap-server hadoop105:9092 \\
--replication-factor 1 --partitions 1 \\
--topic t1
kafka-topics --create \\
--bootstrap-server hadoop105:9092 \\
--replication-factor 1 --partitions 1 \\
--topic t2
依赖
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.13.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.12</artifactId>
<version>1.13.1</version>
</dependency>
<!-- 流 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.12</artifactId>
<version>1.13.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.12</artifactId>
<version>1.13.1</version>
</dependency>
<!-- FlinkSQL -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_2.12</artifactId>
<version>1.13.1</version>
</dependency>
<!-- 'format'='json' -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>1.13.1</version>
</dependency>
<!-- 'connector'='kafka' -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.12</artifactId>
<version>1.13.1</version>
</dependency>
Java代码
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
public class K2K
public static void main(String[] args)
// 1、创建流环境和流式表
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// 2、注册SourceTable
tableEnv.executeSql("CREATE TABLE tb1 (id STRING, ts BIGINT, vc INT) WITH ("
+ "'connector' = 'kafka',"
+ "'topic' = 'topic_source_sensor',"
+ "'properties.bootstrap.servers' = 'hadoop105:9092,hadoop106:9092,hadoop107:9092',"
+ "'properties.group.id' = 'g1',"
+ "'scan.startup.mode' = 'latest-offset',"
+ "'format' = 'json'"
+ ")");
// 3、注册SinkTable
tableEnv.executeSql("CREATE TABLE tb2 (id STRING, ts BIGINT, vc INT) WITH ("
+ "'connector' = 'kafka',"
+ "'topic' = 'topic_sink_sensor',"
+ "'properties.bootstrap.servers' = 'hadoop105:9092,hadoop106:9092,hadoop107:9092',"
+ "'format' = 'json'"
+ ")");
// 4、查询SourceTable数据,写到SinkTable
tableEnv.executeSql("INSERT INTO tb2 SELECT * FROM tb1 WHERE 1=1");
生产Kafka和消费Kafka
kafka-console-producer \\
--broker-list hadoop105:9092 \\
--topic t1
kafka-console-consumer \\
--bootstrap-server hadoop105:9092 \\
--topic t2
开两个窗口查看效果
以上是关于大数据(9h)FlinkSQL连MySQLKafka的主要内容,如果未能解决你的问题,请参考以下文章