从0到1Flink的成长之路-Table API& SQL入门案例

Posted 熊老二-

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了从0到1Flink的成长之路-Table API& SQL入门案例相关的知识,希望对你有一定的参考价值。

入门案例

依赖
依赖

<!-- Either... -->
<dependency>
 <groupId>org.apache.flink</groupId>
 <artifactId>flink-table-api-java-bridge_2.11</artifactId>
 <version>1.10.0</version>
 <scope>provided</scope>
</dependency>
<!-- or... -->
<dependency>
 <groupId>org.apache.flink</groupId>
 <artifactId>flink-table-api-scala-bridge_2.11</artifactId>
 <version>1.10.0</version>
 <scope>provided</scope>
</dependency>
<!-- Either... (for the old planner that was available before Flink 1.9) -->
<dependency>
 <groupId>org.apache.flink</groupId>
 <artifactId>flink-table-planner_2.11</artifactId>
 <version>1.10.0</version>
 <scope>provided</scope>
</dependency>
<!-- or.. (for the new Blink planner) -->
<dependency>
 <groupId>org.apache.flink</groupId>
 <artifactId>flink-table-planner-blink_2.11</artifactId>
 <version>1.10.0</version>
 <scope>provided</scope>
</dependency>
<dependency>
 <groupId>org.apache.flink</groupId>
 <artifactId>flink-streaming-scala_2.11</artifactId>
 <version>1.10.0</version>
 <scope>provided</scope>
</dependency>
<dependency>
 <groupId>org.apache.flink</groupId>
 <artifactId>flink-table-common</artifactId>
 <version>1.10.0</version>
 <scope>provided</scope>
</dependency>

flink-table-common:这个包中主要是包含 Flink Planner 和 Blink Planner一些共用的代码。
flink-table-api-java:这部分是用户编程使用的 API,包含了大部分的 API。
flink-table-api-scala:这里只是非常薄的一层,仅和 Table API 的 Expression 和 DSL 相关。
两个 Planner:flink-table-planner 和 flink-table-planner-blink。
两个 Bridge:flink-table-api-scala-bridge 和 flink-table-api-java-bridge。
Flink Planner 和 Blink Planner 都会依赖于具体的 JAVA API,也会依赖于具体的 Bridge,通过Bridge 可以将 API 操作相应的转化为Scala 的 DataStream、DataSet,或者转化为 JAVA 的DataStream 或者Data Set。

API
注意:目前新版本(当前使用版本Flink 1.10)Flink的Table和SQL的API还不够稳定,依然在不断完善中,所以课程中的案例还是以老版本文档的API来演示。

获取环境
在这里插入图片描述
环境

old-API:old-API

在这里插入图片描述
new-API:new-API

在这里插入图片描述
程序结构
old-API:old-API
在这里插入图片描述
new-API:new-API

在这里插入图片描述
入门案例:批处理
基于Flink Table API和SQL实现词频统计WordCount程序。为了方便处理,将数据封装在
WordCount实体类,代码如下:

package xx.xxxxxx.flink.start.batch;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
@Data
@NoArgsConstructor
@AllArgsConstructor
public class WordCount {
private String word ;
private Long counts;
}

在这里插入图片描述
SQL 实现
Flink SQL API 针对批处理实现词频统计WordCount,代码如下所示:

package xx.xxxxxx.flink.start.batch;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.BatchTableEnvironment;
/**
* Flink SQL API 针对批处理实现词频统计WordCount
*/
public class BatchWordCountSQLDemo {
public static void main(String[] args) throws Exception {
// 1. 执行环境-env
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment() ;
env.setParallelism(2);
// TODO:构建Table执行环境
BatchTableEnvironment tableEnv = BatchTableEnvironment.create(env);
// 2. 数据源-source
// 模拟测试数据集
DataSource<WordCount> inputDataSet = env.fromElements(
new WordCount("flink", 1L), new WordCount("flink", 1L),
new WordCount("spark", 1L), new WordCount("spark", 1L),
new WordCount("flink", 1L), new WordCount("hive", 1L),
new WordCount("flink", 1L), new WordCount("spark", 1L)
);
// TODO:注册并指定字段名称
tableEnv.createTemporaryView("word_count", inputDataSet, "word, counts");
// 3. 数据转换-transformation
// 按照单词分组并统计单词次数,然后过滤排序
String sql = "SELECT word, SUM(counts) AS counts FROM word_count GROUP BY word " +
"HAVING SUM(counts) >= 2 ORDER BY counts DESC" ;
// 执行查询
Table wcTable = tableEnv.sqlQuery(sql);
// 将Table转换为DataSet
DataSet<WordCount> resultDataSet = tableEnv.toDataSet(wcTable, WordCount.class);
// 4. 数据终端-sink
resultDataSet.printToErr();
}
}

Table API 实现

Flink Table API 针对批处理实现词频统计WordCount,完整代码如下所示:

package xx.xxxxxx.flink.start.batch;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.BatchTableEnvironment;
/**
* Flink Table API 针对批处理实现词频统计WordCount
*/
public class BatchWordCountTableDemo {
public static void main(String[] args) throws Exception {
// 1. 执行环境-env
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment() ;
env.setParallelism(2);
BatchTableEnvironment tableEnv = BatchTableEnvironment.create(env);
// 2. 数据源-source
// 模拟测试数据集
DataSource<WordCount> inputDataSet = env.fromElements(
new WordCount("flink", 1L), new WordCount("flink", 1L),
new WordCount("spark", 1L), new WordCount("spark", 1L),
new WordCount("flink", 1L), new WordCount("hive", 1L),
new WordCount("flink", 1L), new WordCount("spark", 1L)
);
// 通过DataSet创建表
Table inputTable = tableEnv.fromDataSet(inputDataSet);
// 3. 数据转换-transformation
// 调用Table API进行操作,将SQL中语句进行拆解
/*
SELECT word, SUM(counts) AS counts FROM word_count GROUP BY word HAVING counts >= 2 ORDER BY counts DESC
*/
Table wcTable = inputTable
.groupBy("word") // 分组
.select("word, counts.sum as counts") // sum
.filter("counts >= 2") // 过滤
.orderBy("counts.desc");
// 将表转成DataSet
DataSet<WordCount> resultDataSet = tableEnv.toDataSet(wcTable, WordCount.class);
// 4. 数据终端-sink
resultDataSet.printToErr();
}
}

以上是关于从0到1Flink的成长之路-Table API& SQL入门案例的主要内容,如果未能解决你的问题,请参考以下文章

从0到1Flink的成长之路-Table API& SQL巩固案例

从0到1Flink的成长之路-Table API& SQL入门案例

从0到1Flink的成长之路(二十一)-Flink Table API 与 SQL

从0到1Flink的成长之路-Table API& SQL发展历程

从0到1Flink的成长之路-Table API& SQL巩固案例

从0到1Flink的成长之路-Table API& SQL巩固案例