从0到1Flink的成长之路-Table API& SQL巩固案例

Posted 熊老二-

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了从0到1Flink的成长之路-Table API& SQL巩固案例相关的知识,希望对你有一定的参考价值。

批处理

数据
有sql_students.txt 保存学生信息:

id,name,classname

1,张三,1班
2,李四,1班
3,王五,2班
4,赵六,2班
5,田七,2班

有sql_scores.txt 保存成绩:

id,chinese,math,english
1,100,90,80
2,97,87,74
3,70,50,43
4,100,99,99
5,80,81,82

需求

1.初始化flink env
2.读取文件数据,读取student.txt、scores.txt两张表
3.数据预处理,通过id字段将两个表的数据join出dataset
4.将dataset映射成table,并执行sql求各班级每个学科的平均分、三科总分平均分
5.结果保存到文件中

无论是Table API还是SQL,最终结果封装到实体类:ScoreInfo,代码如下所示。

package xx.xxxxx.flink.batch;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
@Data
@NoArgsConstructor
@AllArgsConstructor
public class ScoreInfo {
public String classroom;
public Integer avg_chinese;
public Integer avg_math;
public Integer avg_english;
public Integer avg_total;
}

SQL 实现
使用Flink SQL API实现:各班级每个学科的平均分、三科总分平均分,代码如下:

package xx.xxxxxx.flink.batch;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.BatchTableEnvironment;
import org.apache.flink.table.sources.CsvTableSource;
import org.apache.flink.types.Row;
/**
* 使用Flink SQL API实现:各班级每个学科的平均分、三科总分平均分
*/
public class FlinkBatchSQLDemo {
public static void main(String[] args) throws Exception {
// 1. 准备环境-env
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(2);
BatchTableEnvironment tableEnv = BatchTableEnvironment.create(env);
// 2. 数据源-source
// student: 1,张三,1班 -> id name classroom
CsvTableSource studentSource = CsvTableSource.builder()
.path("datas/sql_students.txt") // 文件路径
.field("id", DataTypes.INT())
.field("name", DataTypes.STRING())
.field("classroom", DataTypes.STRING())
.fieldDelimiter(",")
.lineDelimiter("\\n")
.ignoreParseErrors() // 忽略 解析错误
.build();
DataSet<Row> studentDataSet = studentSource.getDataSet(env);
// score: 1,100,90,80 -> id chinese math english
CsvTableSource scoreSource = CsvTableSource.builder()
.path("datas/sql_scores.txt") // 文件路径
.field("id", DataTypes.INT())
.field("chinese", DataTypes.INT())
.field("math", DataTypes.INT())
.field("english", DataTypes.INT())
.fieldDelimiter(",")
.lineDelimiter("\\n")
.ignoreParseErrors() // 忽略 解析错误
.build();
DataSet<Row> scoreDataSet = scoreSource.getDataSet(env);
// TODO: 注册表
tableEnv.createTemporaryView("t_student", studentDataSet);
tableEnv.createTemporaryView("t_score", scoreDataSet);
// TODO: 执行SQL -> 求各个班级每个学科的平均分和三科总分平均分
Table resultTable = tableEnv.sqlQuery(
"WITH tmp AS ( "
+ "SELECT t1.name, t1.classroom, t2.chinese, t2.math, t2.english "
+ "FROM t_student t1 JOIN t_score t2 "
+ "ON t1.id = t2.id "
+ ") "
+ "SELECT "
+ "classroom, AVG(chinese) AS avg_chinese, "
+ "AVG(math) AS avg_math, AVG(english) AS avg_english, "
+ "AVG(chinese + math + english) AS avg_total "
+ "FROM tmp "
+ "GROUP BY classroom "
+ "ORDER BY avg_total DESC "
);
// 将Table转换为DataSet
DataSet<ScoreInfo> resultDataSet = tableEnv.toDataSet(resultTable, ScoreInfo.class);
// 4. 数据终端-sink
resultDataSet.printToErr();
}
}

Table API 实现
使用Flink Table API实现:各班级每个学科的平均分、三科总分平均分,代码如下:

package xx.xxxxx.flink.batch;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.MapOperator;
import org.apache.flink.api.java.tuple.*;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.BatchTableEnvironment;
/**
* 使用Flink Table API实现:各班级每个学科的平均分、三科总分平均分
*/
public class FlinkBatchTableDemo {
public static void main(String[] args) throws Exception {
// 1. 执行环境-env
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(2);
BatchTableEnvironment tableEnv = BatchTableEnvironment.create(env);
// 2. 数据源-source:读取文本文件数据
DataSource<String> studentDataSet = env.readTextFile("datas/sql_students.txt");
DataSource<String> scoreDataSet = env.readTextFile("datas/sql_scores.txt");
// 3. 数据转换-transformation
// student:1,张三,1班 -> id name classroom
MapOperator<String, Tuple3<Integer, String, String>> studentOperator = studentDataSet.map(
new MapFunction<String, Tuple3<Integer, String, String>>() {
@Override
public Tuple3<Integer, String, String> map(String line) throws Exception {
String[] array = line.trim().split(",");
return Tuple3.of(Integer.parseInt(array[0]), array[1], array[2]);
}
}
);
// score: 1,100,90,80 -> id chinese math english
MapOperator<String, Tuple4<Integer, Integer, Integer, Integer>> scoreOperator = scoreDataSet.map(
new MapFunction<String, Tuple4<Integer, Integer, Integer, Integer>>() {
@Override
public Tuple4<Integer, Integer, Integer, Integer> map(String line) throws Exception {
String[] array = line.trim().split(",");
return Tuple4.of(
Integer.parseInt(array[0]), Integer.parseInt(array[1]),
Integer.parseInt(array[2]), Integer.parseInt(array[3])
);
}
}
);
// 两个数据集进行关联JOIN
DataSet<Tuple6<Integer, String, String, Integer, Integer, Integer>> dataset = studentOperator
.join(scoreOperator).where(0).equalTo(0)
.projectFirst(0, 1, 2).projectSecond(1, 2, 3);
// TODO: 注册为表
tableEnv.createTemporaryView(
"t_student_score", dataset, "id, name, classroom, chinese, math, english"
);
// TODO: 编写Table API
Table resultTable = tableEnv
.from("t_student_score").groupBy("classroom")
.select(
"classroom, chinese.avg AS avg_chinese, math.avg AS avg_math, "
+ "english.avg AS avg_english, (chinese + math + english).avg AS avg_total "
)
.orderBy("avg_total.desc");
// 将Table转换为DataSet
DataSet<ScoreInfo> resultDataSet = tableEnv.toDataSet(resultTable, ScoreInfo.class);
// 4. 数据终端-sink
resultDataSet.printToErr();
}
}

以上是关于从0到1Flink的成长之路-Table API& SQL巩固案例的主要内容,如果未能解决你的问题,请参考以下文章

从0到1Flink的成长之路-Table API& SQL巩固案例

从0到1Flink的成长之路-Table API& SQL入门案例

从0到1Flink的成长之路(二十一)-Flink Table API 与 SQL

从0到1Flink的成长之路-Table API& SQL发展历程

从0到1Flink的成长之路-Table API& SQL巩固案例

从0到1Flink的成长之路-Table API& SQL巩固案例