2021年大数据Flink(三十五):Table与SQL 案例二
Posted Lansonli
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了2021年大数据Flink(三十五):Table与SQL 案例二相关的知识,希望对你有一定的参考价值。
目录
案例二
需求
使用SQL和Table两种方式对DataStream中的单词进行统计
代码实现-SQL
package cn.itcast.sql;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import static org.apache.flink.table.api.Expressions.$;
/**
* Author itcast
* Desc
*/
public class FlinkSQL_Table_Demo02 {
public static void main(String[] args) throws Exception {
//1.准备环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
//2.Source
DataStream<WC> input = env.fromElements(
new WC("Hello", 1),
new WC("World", 1),
new WC("Hello", 1)
);
//3.注册表
tEnv.createTemporaryView("WordCount", input, $("word"), $("frequency"));
//4.执行查询
Table resultTable = tEnv.sqlQuery("SELECT word, SUM(frequency) as frequency FROM WordCount GROUP BY word");
//5.输出结果
//toAppendStream doesn't support consuming update changes which is produced by node GroupAggregate
//DataStream<WC> resultDS = tEnv.toAppendStream(resultTable, WC.class);
DataStream<Tuple2<Boolean, WC>> resultDS = tEnv.toRetractStream(resultTable, WC.class);
resultDS.print();
env.execute();
}
@Data
@NoArgsConstructor
@AllArgsConstructor
public static class WC {
public String word;
public long frequency;
}
}
代码实现-Table
package cn.itcast.sql;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import static org.apache.flink.table.api.Expressions.$;
/**
* Author itcast
* Desc
*/
public class FlinkSQL_Table_Demo03 {
public static void main(String[] args) throws Exception {
//1.准备环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
//2.Source
DataStream<WC> input = env.fromElements(
new WC("Hello", 1),
new WC("World", 1),
new WC("Hello", 1)
);
//3.注册表
Table table = tEnv.fromDataStream(input);
//4.执行查询
Table resultTable = table
.groupBy($("word"))
.select($("word"), $("frequency").sum().as("frequency"))
.filter($("frequency").isEqual(2));
//5.输出结果
DataStream<Tuple2<Boolean, WC>> resultDS = tEnv.toRetractStream(resultTable, WC.class);
resultDS.print();
env.execute();
}
@Data
@NoArgsConstructor
@AllArgsConstructor
public static class WC {
public String word;
public long frequency;
}
}
以上是关于2021年大数据Flink(三十五):Table与SQL 案例二的主要内容,如果未能解决你的问题,请参考以下文章
2021年大数据Flink(三十三):Table与SQL相关概念
2021年大数据Flink(三十四):Table与SQL 案例一
2021年大数据Flink(三十九):Table与SQL 总结 Flink-SQL常用算子
2021年大数据Flink(三十):Flink Table API & SQL 介绍