大数据(9h)FlinkSQL
Posted 小基基o_O
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了大数据(9h)FlinkSQL相关的知识,希望对你有一定的参考价值。
文章目录
环境:WIN10+IDEA+Flink1.13
概述
Flink有两种关系型API来做流批统一处理:Table API和SQL
- 动态表(Dynamic Tables)是Flink支持流数据的Table API和SQL的核心概念
动态表是随时间变化的
查询动态表将生成一个连续查询(Continuous Query)
动态表
连续查询
额外依赖
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_$scala.binary.version</artifactId>
<version>$flink.version</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_$scala.binary.version</artifactId>
<version>$flink.version</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-csv</artifactId>
<version>$flink.version</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>$flink.version</version>
</dependency>
Table API大概就是下面这种比较丑的写法,本文不予详述
Table studentTable = tableEnv.from("student");
Table resultTable = studentTable
.groupBy($("id"))
.select($("id"), $("id").count().as("cnt"));
FlinkSQL代码
pom.xml
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<flink.version>1.13.1</flink.version>
<scala.binary.version>2.12</scala.binary.version>
</properties>
<dependencies>
<!-- Flink -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>$flink.version</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_$scala.binary.version</artifactId>
<version>$flink.version</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_$scala.binary.version</artifactId>
<version>$flink.version</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_$scala.binary.version</artifactId>
<version>$flink.version</version>
</dependency>
<!-- Lombok -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.16</version>
</dependency>
</dependencies>
JavaBean
Lombok通过注解的方式 简化JavaBean代码
@Data
:自动生成set
、get
、toString
、equals
、hashCode
等方法
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
/**
* sid:学号
* age:年龄
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
public class Student
private String sid;
private Integer age;
查询
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
public class Hello
public static void main(String[] args) throws Exception
// 1、创建流执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource<Student> StudentStream =
env.fromElements(
new Student("s01", 9),
new Student("s02", 11),
new Student("s03", 11),
new Student("s04", 12),
new Student("s05", 9));
// 2、创建流式表环境
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// 3、数据流=>动态表
Table inputTable = tableEnv.fromDataStream(StudentStream);
// 4、查询未注册的表
Table resultTable = tableEnv.sqlQuery("SELECT * FROM " + inputTable + " WHERE age>9");
// 5、动态表=>数据流=>打印
tableEnv.toDataStream(resultTable).print();
// 6、执行
env.execute();
打印结果
+I[s02, 11]
+I[s03, 11]
+I[s04, 12]
使用临时视图
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import static org.apache.flink.table.api.Expressions.$;
public class Hello3
public static void main(String[] args) throws Exception
// 1、创建流环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource<Tuple2<String, Integer>> cStream =
env.fromElements(
Tuple2.of("c1", 5),
Tuple2.of("c2", 8),
Tuple2.of("c1", 5),
Tuple2.of("c2", 9));
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// 2、数据量=>动态表
Table inputTable = tableEnv.fromDataStream(cStream, $("c"), $("vol"));
// 3、创建临时视图
tableEnv.createTemporaryView("t", inputTable);
// 4、查询临时视图,返回结果表
Table resultTable = tableEnv.sqlQuery("SELECT c,SUM(vol) FROM t GROUP BY c");
// 5、打印输出
tableEnv.toChangelogStream(resultTable).print();
// 6、执行
env.execute();
打印结果
+I[c1, 5]
+I[c2, 8]
-U[c1, 5]
+U[c1, 10]
-U[c2, 8]
+U[c2, 17]
Kafka到Kafka
创建主题
kafka-topics --create \\
--bootstrap-server hadoop105:9092 \\
--replication-factor 1 --partitions 1 \\
--topic t1
kafka-topics --create \\
--bootstrap-server hadoop105:9092 \\
--replication-factor 1 --partitions 1 \\
--topic t2
Java代码
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>$flink.version</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_$scala.binary.version</artifactId>
<version>$flink.version</version>
</dependency>
<!-- 流 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_$scala.binary.version</artifactId>
<version>$flink.version</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_$scala.binary.version</artifactId>
<version>$flink.version</version>
</dependency>
<!-- FlinkSQL -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_$scala.binary.version</artifactId>
<version>$flink.version</version>
</dependency>
<!-- 'format'='json' -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>$flink.version</version>
</dependency>
<!-- 'connector'='kafka' -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.12</artifactId>
<version>$flink.version</version>
</dependency>
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
public class K2K
public static void main(String[] args)
// 1、创建流环境和流式表
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// 2、注册SourceTable
tableEnv.executeSql("CREATE TABLE tb1 (id STRING, ts BIGINT, vc INT) WITH ("
+ "'connector' = 'kafka',"
+ "'topic' = 'topic_source_sensor',"
+ "'properties.bootstrap.servers' = 'hadoop105:9092,hadoop106:9092,hadoop107:9092',"
+ "'properties.group.id' = 'g1',"
+ "'scan.startup.mode' = 'latest-offset',"
+ "'format' = 'json'"
+ ")");
// 3、注册SinkTable
tableEnv.executeSql("CREATE TABLE tb2 (id STRING, ts BIGINT, vc INT) WITH ("
+ "'connector' = 'kafka',"
+ "'topic' = 'topic_sink_sensor',"
+ "'properties.bootstrap.servers' = 'hadoop105:9092,hadoop106:9092,hadoop107:9092',"
+ "'format' = 'json'"
+ ")");
// 4、查询SourceTable数据,写到SinkTable
tableEnv.executeSql("INSERT INTO tb2 SELECT * FROM tb1 WHERE 1=1");
生产Kafka和消费Kafka
kafka-console-producer \\
--broker-list hadoop105:9092 \\
--topic t1
kafka-console-consumer \\
--bootstrap-server hadoop105:9092 \\
--topic t2
开两个窗口查看效果
以上是关于大数据(9h)FlinkSQL的主要内容,如果未能解决你的问题,请参考以下文章