Fllink实时计算运用Flink Table API & SQL 案例实战
Posted mirson
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Fllink实时计算运用Flink Table API & SQL 案例实战相关的知识,希望对你有一定的参考价值。
1. Table API & SQL 实战运用
案例说明
功能说明
通过socket读取数据源,进行单词的统计处理。
实现流程
- 初始化Table运行环境
转换操作处理:
1)以空格进行分割
2)给每个单词计数累加1
3)根据单词进行分组处理
4)求和统计
5)输出打印数据
- 执行任务
FlinkTable API 方式实现
StreamTableApiApplication,代码实现:
//获取流处理的运行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); EnvironmentSettings environmentSettings = EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build(); //获取Table的运行环境 StreamTableEnvironment tabEnv = StreamTableEnvironment.create(env, environmentSettings); //接入数据源 DataStreamSource<String> lines = env.socketTextStream("10.10.20.15", 9922); //对字符串进行分词压平 SingleOutputStreamOperator<String> words = lines.flatMap(new FlatMapFunction<String, String>() { @Override public void flatMap(String line, Collector<String> out) throws Exception { Arrays.stream(line.split(" ")).forEach(out::collect); } }); //将DataStream转换成Table对象,字段名默认的是f0,给定字段名是word Table table = tabEnv.fromDataStream(words, "word"); //按照单词进行分组聚合操作 Table resultTable = table.groupBy("word").select("word, sum(1L) as counts"); //在流处理中,数据会源源不断的产生,需要累加处理,只能采用用toRestractStream // DataStream<WordCount> wordCountDataStream = tabEnv.toAppendStream(resultTable, WordCount.class); // wordCountDataStream.printToErr("toAppendStream>>>"); DataStream<Tuple2<Boolean, WordCount>> wordCountDataStream = tabEnv.toRetractStream(resultTable, WordCount.class); wordCountDataStream.printToErr("toRetractStream>>>"); env.execute();
测试验证:
开启socket输入, 输入字符串:
[root@flink1 flink-1.11.2]# nc -lk 9922
FlinkTable SQL 方式实现
代码实现:
StreamTableSqlApplication实现类:
//获取流处理的运行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); EnvironmentSettings environmentSettings = EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build(); //获取Table的运行环境 StreamTableEnvironment tabEnv = StreamTableEnvironment.create(env, environmentSettings); //接入数据源 DataStreamSource<String> lines = env.socketTextStream("10.10.20.15", 9922); //对字符串进行分词压平 SingleOutputStreamOperator<String> words = lines.flatMap(new FlatMapFunction<String, String>() { @Override public void flatMap(String line, Collector<String> out) throws Exception { Arrays.stream(line.split(" ")).forEach(out::collect); } }); //将DataStream转换成Table对象,字段名默认的是f0,给定字段名是word tabEnv.registerDataStream("t_wordcount", words, "word"); //按照单词进行分组聚合操作 Table resultTable = tabEnv.sqlQuery("select word,count(1) as counts from t_wordcount group by word"); DataStream<Tuple2<Boolean, WordCount>> wordCountDataStream = tabEnv.toRetractStream(resultTable, WordCount.class); wordCountDataStream.printToErr("toRetractStream>>>"); env.execute();
2. Flink SQL 滚动窗口实战
Flink SQL 窗口说明
Flink SQL支持的窗口聚合主要是两种:Window聚合和Over聚合。这里主要介绍Window聚合。Window聚合支持两种时间属性定义窗口:Event Time和Processing Time。每种时间属性类型支持三种窗口类型:滚动窗口(TUMBLE)、滑动窗口(HOP)和会话窗口(SESSION)。
案例说明
统计在过去的1分钟内有多少用户点击了某个的网页,可以通过定义一个窗口来收集最近1分钟内的数据,并对这个窗口内的数据进行计算。
测试数据:
| 用户名 | 访问地址 | 访问时间|
| ------ | --------------------- | -------------------- |
| 张三 | http://taobao.com/xxx | 2021-05-10 10:00:00 |
| 张三 | http://taobao.com/xxx | 2021-05-10 10:00:10 |
| 张三 | http://taobao.com/xxx | 2021-05-10 10:00:49 |
| 张三 | http://taobao.com/xxx | 2021-05-10 10:01:05 |
| 张三 | http://taobao.com/xxx | 2021-05-10 10:01:58 |
| 李四 | http://taobao.com/xxx | 2021-05-10 10:02:10 |
滚动窗口运用
滚动窗口(Tumbling windows)要用Tumble类来定义,另外还有三个方法:
- over:定义窗口长度
- on:用来分组(按时间间隔)或者排序(按行数)的时间字段
- as:别名,必须出现在后面的groupBy中
实现步骤:
- 初始化流运行环境
- 在流模式下使用blink planner
- 创建用户点击事件数据
- 将源数据写入临时文件并获取绝对路径
- 创建表载入用户点击事件数据
- 对表运行SQL查询,并将结果作为新表检索
- Table转换成DataStream
- 执行任务
TumbleUserClickApplication,实现代码:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); EnvironmentSettings environmentSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); StreamTableEnvironment tabEnv = StreamTableEnvironment.create(env, environmentSettings); // 将源数据写入临时文件并获取绝对路径 String contents = "张三,http://taobao.com/xxx,2021-05-10 10:00:00\\n" + "张三,http://taobao.com/xxx,2021-05-10 10:00:10\\n" + "张三,http://taobao.com/xxx,2021-05-10 10:00:49\\n" + "张三,http://taobao.com/xxx,2021-05-10 10:01:05\\n" + "张三,http://taobao.com/xxx,2021-05-10 10:01:58\\n" + "张三,http://taobao.com/xxx,2021-05-10 10:02:10\\n"; String path = FileUtil.createTempFile(contents); String ddl = "CREATE TABLE user_clicks (\\n" + " username varchar,\\n" + " click_url varchar,\\n" + " ts TIMESTAMP(3),\\n" + " WATERMARK FOR ts AS ts - INTERVAL \'2\' SECOND\\n" + ") WITH (\\n" + " \'connector.type\' = \'filesystem\',\\n" + " \'connector.path\' = \'" + path + "\',\\n" + " \'format.type\' = \'csv\'\\n" + ")"; tabEnv.sqlUpdate(ddl); //对表数据进行sql查询,并将结果作为新表进行查询 String query = "SELECT\\n" + " TUMBLE_START(ts, INTERVAL \'1\' MINUTE),\\n" + " TUMBLE_END(ts, INTERVAL \'1\' MINUTE),\\n" + " username,\\n" + " COUNT(click_url)\\n" + "FROM user_clicks\\n" + "GROUP BY TUMBLE(ts, INTERVAL \'1\' MINUTE), username"; Table result = tabEnv.sqlQuery(query); tabEnv.toAppendStream(result, Row.class).print(); env.execute();
以1分钟作为时间滚动窗口,水印延迟2秒。
输出结果:
4> 2021-10-10T10:00,2021-10-10T10:01,张三,3
4> 2021-10-10T10:01,2021-10-10T10:02,张三,2
4> 2021-10-10T10:02,2021-10-10T10:03,张三,1
3. Flink SQL 滑动窗口实战
实现步骤
- 初始化流运行环境
- 在流模式下使用blink planner
- 创建用户点击事件数据
- 将源数据写入临时文件并获取绝对路径
- 创建表载入用户点击事件数据
- 对表运行SQL查询,并将结果作为新表检索
- Table转换成DataStream
- 执行任务
实现代码
代码HopUserClickApplication:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); EnvironmentSettings environmentSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); StreamTableEnvironment tabEnv = StreamTableEnvironment.create(env, environmentSettings); // 将源数据写入临时文件并获取绝对路径 String contents = "张三,http://taobao.com/xxx,2020-10-10 10:00:00\\n" + "张三,http://taobao.com/xxx,2020-10-10 10:00:10\\n" + "张三,http://taobao.com/xxx,2020-10-10 10:00:49\\n" + "张三,http://taobao.com/xxx,2020-10-10 10:01:05\\n" + "张三,http://taobao.com/xxx,2020-10-10 10:01:58\\n" + "张三,http://taobao.com/xxx,2020-10-10 10:02:10\\n"; String path = FileUtil.createTempFile(contents); String ddl = "CREATE TABLE user_clicks (\\n" + " username varchar,\\n" + " click_url varchar,\\n" + " ts TIMESTAMP(3),\\n" + " WATERMARK FOR ts AS ts - INTERVAL \'2\' SECOND\\n" + ") WITH (\\n" + " \'connector.type\' = \'filesystem\',\\n" + " \'connector.path\' = \'" + path + "\',\\n" + " \'format.type\' = \'csv\'\\n" + ")"; tabEnv.sqlUpdate(ddl); //对表数据进行sql查询,并将结果作为新表进行查询,每隔30秒,统计一次过去1分钟的数据 String query = "SELECT\\n" + " HOP_START(ts, INTERVAL \'30\' SECOND, INTERVAL \'1\' MINUTE),\\n" + " HOP_END(ts, INTERVAL \'30\' SECOND, INTERVAL \'1\' MINUTE),\\n" + " username,\\n" + " COUNT(click_url)\\n" + "FROM user_clicks\\n" + "GROUP BY HOP (ts, INTERVAL \'30\' SECOND, INTERVAL \'1\' MINUTE), username"; Table result = tabEnv.sqlQuery(query); tabEnv.toAppendStream(result, Row.class).print(); env.execute();
每隔30秒,统计一次过去1分钟的用户点击数量。
输出结果:
4> 2021-05-10T09:59:30,2021-05-10T10:00:30,张三,2
4> 2021-05-10T10:00,2021-05-10T10:01,张三,3
4> 2021-05-10T10:00:30,2021-05-10T10:01:30,张三,2
4> 2021-05-10T10:01,2021-05-10T10:02,张三,2
4> 2021-05-10T10:01:30,2021-05-10T10:02:30,张三,2
4> 2021-05-10T10:02,2021-05-10T10:03,张三,1
4. Flink SQL 会话窗口实战
实现步骤
- 初始化流运行环境
- 在流模式下使用blink planner
- 创建用户点击事件数据
- 将源数据写入临时文件并获取绝对路径
- 创建表载入用户点击事件数据
- 对表运行SQL查询,并将结果作为新表检索
- Table转换成DataStream
- 执行任务
代码实现:
代码:SessionUserClickApplication
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); EnvironmentSettings environmentSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); StreamTableEnvironment tabEnv = StreamTableEnvironment.create(env, environmentSettings); // 将源数据写入临时文件并获取绝对路径 String contents = "张三,http://taobao.com/xxx,2021-05-10 10:00:00\\n" + "张三,http://taobao.com/xxx,2021-05-10 10:00:10\\n" + "张三,http://taobao.com/xxx,2021-05-10 10:00:49\\n" + "张三,http://taobao.com/xxx,2021-05-10 10:01:05\\n" + "张三,http://taobao.com/xxx,2021-05-10 10:01:58\\n" + "张三,http://taobao.com/xxx,2021-05-10 10:02:10\\n"; String path = FileUtil.createTempFile(contents); String ddl = "CREATE TABLE user_clicks (\\n" + " username varchar,\\n" + " click_url varchar,\\n" + " ts TIMESTAMP(3),\\n" + " WATERMARK FOR ts AS ts - INTERVAL \'2\' SECOND\\n" + ") WITH (\\n" + " \'connector.type\' = \'filesystem\',\\n" + " \'connector.path\' = \'" + path + "\',\\n" + " \'format.type\' = \'csv\'\\n" + ")"; tabEnv.sqlUpdate(ddl); //对表数据进行sql查询,并将结果作为新表进行查询,每隔30秒统计一次数据 String query = "SELECT\\n" + " SESSION_START(ts, INTERVAL \'30\' SECOND),\\n" + " SESSION_END(ts, INTERVAL \'30\' SECOND),\\n" + " username,\\n" + " COUNT(click_url)\\n" + "FROM user_clicks\\n" + "GROUP BY SESSION (ts, INTERVAL \'30\' SECOND), username"; Table result = tabEnv.sqlQuery(query); tabEnv.toAppendStream(result, Row.class).print(); env.execute();
每隔30秒统计一次用户点击数据.
输出结果:
4> 2021-05-10T10:00,2021-05-10T10:00:40,张三,2
4> 2021-05-10T10:00:49,2021-05-10T10:01:35,张三,2
4> 2021-05-10T10:01:58,2021-05-10T10:02:40,张三,2
本文由mirson创作分享,如需进一步交流,请加QQ群:19310171或访问www.softart.cn
以上是关于Fllink实时计算运用Flink Table API & SQL 案例实战的主要内容,如果未能解决你的问题,请参考以下文章