从0到1Flink的成长之路(十九)-案例:计数窗口

Posted 熊老二-

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了从0到1Flink的成长之路(十九)-案例:计数窗口相关的知识,希望对你有一定的参考价值。

案例:计数窗口

假设有1个滑动计数窗口,每2个元素计算1次最近4个元素的总和,窗口工作示意图如下所示:
在这里插入图片描述
上图中所示的各个窗口逻辑上是不同的窗口,但在物理上是同一个窗口。该滑动计数窗口,trigger的触发条件是元素个数达到2个(每进入_x0010_2个元素就会触发一次),evictor保留的元素个数是4个,每次计算完窗口总和后会保留剩余的元素。所以第一次触发trigger是当元素5进入,第三次触发trigger是当元素2进入,并驱逐5和2,计算剩余的4个元素的总和(22)并发送出去,保留下2,4,9,7元素供下个逻辑窗口使用。

1 需求
需求1:基于数量的滚动窗口,窗口大小window size=滑动大小slide size
统计在最近5条消息中,数字之和sum值

需求2:基于数量的滑动窗口,窗口大小window size>滑动大小slide size
每隔5条数据,统计在最近10条消息中, 数字之和sum值

2 代码实现:滚动计数窗口
窗口统计案例演示:滚动计数窗口(Tumbling Count Window),数字累加求和统计

package xx.xxxxx.flink.window.count;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
/**
* 窗口统计案例演示:滚动计数窗口(Tumbling Count Window),数字累加求和统计
*/
public class StreamTumblingCountWindow {
public static void main(String[] args) throws Exception {
// 1. 执行环境-env:流计算执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1) ;
// 2. 数据源-source:Socket接收数据
DataStreamSource<String> inputStream = env.socketTextStream("node1.itcast.cn", 9999);
// 3. 转换处理-transformation:调用DataStream函数,处理数据
SingleOutputStreamOperator<Integer> numberStream = inputStream
// a. 过滤数据
.filter(line -> null != line && line.trim().length() > 0)
// b. 转换整数
.map(line -> Integer.parseInt(line.trim()));
// TODO:每隔5条数据对统计最近5条数据,进行累加之和
SingleOutputStreamOperator<Integer> sumStream = numberStream
.countWindowAll(5)
.sum(0) ;
// 4. 数据终端-sink
sumStream.printToErr("sum");
// 5. 触发执行-execute
env.execute(StreamTumblingCountWindow.class.getSimpleName()) ; } }

3 代码实现:滑动计数窗口
窗口统计案例演示:滑动计数窗口(Sliding Count Window),数字累加求和统计

package xx.xxxxxx.flink.window.count;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
/**
* 窗口统计案例演示:滑动计数窗口(Sliding Count Window),数字累加求和统计
*/
public class StreamSlidingCountWindow {
public static void main(String[] args) throws Exception {
// 1. 执行环境-env:流计算执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1) ;
// 2. 数据源-source:Socket接收数据
DataStreamSource<String> inputStream = env.socketTextStream("node1.itcast.cn", 9999);
// 3. 转换处理-transformation:调用DataStream函数,处理数据
SingleOutputStreamOperator<Integer> numberStream = inputStream
// a. 过滤数据
.filter(line -> null != line && line.trim().length() > 0)
// b. 转换整数
.map(line -> Integer.parseInt(line.trim()));
// TODO:每隔5条数据对统计最近10条数据,进行累加之和
SingleOutputStreamOperator<Integer> sumStream = numberStream
.countWindowAll(10, 5)
.sum(0) ;
// 4. 数据终端-sink
sumStream.printToErr("sum");
// 5. 触发执行-execute
env.execute(StreamSlidingCountWindow.class.getSimpleName()) ; } }

以上是关于从0到1Flink的成长之路(十九)-案例:计数窗口的主要内容,如果未能解决你的问题,请参考以下文章

从0到1Flink的成长之路(二十)-案例:时间会话窗口

从0到1Flink的成长之路-Table API& SQL入门案例

从0到1Flink的成长之路-Table API& SQL入门案例

从0到1Flink的成长之路-Flink Action 综合案例

从0到1Flink的成长之路-Flink Action 综合案例

从0到1Flink的成长之路-Flink Action 综合案例-BroadcastState 动态更新