4.1flink窗口算子的trigger触发器和Evictor清理器
Posted PacosonSWJTU
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了4.1flink窗口算子的trigger触发器和Evictor清理器相关的知识,希望对你有一定的参考价值。
【README】
本文记录了 窗口算子的触发器trigger和 evictor清理器;
- trigger触发器:决定了一个窗口(由 window assigner 定义)何时可以被 window function 处理;
- evictor清理器: evictor 可以在 trigger 触发后、调用窗口函数之前或之后从窗口中删除元素;
【1】触发器trigger
1)Trigger 接口提供了五个方法来响应不同的事件:
- onElement() 方法在每个元素被加入窗口时调用。
- onEventTime() 方法在注册的 event-time timer 触发时调用。
- onProcessingTime() 方法在注册的 processing-time timer 触发时调用。
- onMerge() 方法与有状态的 trigger 相关。该方法会在两个窗口合并时, 将窗口对应 trigger 的状态进行合并,比如使用会话窗口时。
- 最后,clear() 方法处理在对应窗口被移除时所需的逻辑。
2)有两点需要注意:
- 前三个方法通过返回 TriggerResult 来决定 trigger 如何应对到达窗口的事件。应对方案有以下几种:
- CONTINUE: 什么也不做
- FIRE: 触发计算
- PURGE: 清空窗口内的元素
- FIRE_AND_PURGE: 触发计算,计算结束后清空窗口内的元素
- 上面的任意方法都可以用来注册 processing-time 或 event-time timer。
3)触发(Fire)与清除(Purge)
当 trigger 认定一个窗口可以被计算时,它就会触发,也就是返回 FIRE 或 FIRE_AND_PURGE。
【1.1】触发器代码
1)以滑动计数窗口为例
DataStream<Tuple2<String, BigDecimal>> windowAggStream = sensorStream
.keyBy(SensorReadingWindow::getId)// 按照id 分组
.countWindow(5, 2) // 滑动计数窗口
countWindow() 如下:
public WindowedStream<T, KEY, GlobalWindow> countWindow(long size, long slide)
return this.window(GlobalWindows.create()).evictor(CountEvictor.of(size)).trigger(CountTrigger.of(slide));
CountTrigger.of() 就是返回计数触发器;
public static <W extends Window> CountTrigger<W> of(long maxCount)
return new CountTrigger(maxCount);
其中 maxCount等于滑动步长2,CountTrigger计数触发器定义如下:
对于 onElement方法,当 计数值大于 maxCount时,则触发生成新窗口;而maxCount等于滑动步长2 ;具体代码如下:
public TriggerResult onElement(Object element, long timestamp, W window, TriggerContext ctx) throws Exception
ReducingState<Long> count = (ReducingState)ctx.getPartitionedState(this.stateDesc);
count.add(1L);
if ((Long)count.get() >= this.maxCount)
count.clear();
return TriggerResult.FIRE;
else
return TriggerResult.CONTINUE;
即,对于 7个元素的流,会在第2 , 4, 6 个元素生成新窗口;因为滑动步长为2;
【2】清理器 evictor
1)定义:Evictor 可以在 trigger 触发后、调用窗口函数之前或之后从窗口中删除元素。
2)Evictor 接口提供了两个方法实现此功能:
evictBefore() 包含在调用窗口函数前的逻辑,而 evictAfter() 包含在窗口函数调用之后的逻辑。 在调用窗口函数之前被移除的元素不会被窗口函数计算。
(图1 CountEvictor类定义)
3)清理器分类
Flink 内置有三个 evictor:
- CountEvictor: 仅记录用户指定数量的元素,一旦窗口中的元素超过这个数量,多余的元素会从窗口缓存的开头移除;
- DeltaEvictor: 接收 DeltaFunction 和 threshold 参数,计算最后一个元素与窗口缓存中所有元素的差值, 并移除差值大于或等于 threshold 的元素。
- TimeEvictor: 接收 interval 参数,以毫秒表示。 它会找到窗口中元素的最大 timestamp max_ts 并移除比 max_ts - interval 小的所有元素。
【2.1】代码示例
对于 countWindow 计数窗口方法,其使用了 Evictor,如下:
public WindowedStream<T, KEY, GlobalWindow> countWindow(long size, long slide)
return this.window(GlobalWindows.create()).evictor(CountEvictor.of(size)).trigger(CountTrigger.of(slide));
CountEvictor 定义见图1;其长度为 窗口大小;
【3】滑动计数窗口代码
1)滑动计数窗口参数
- 窗口大小:5;
- 滑动步长:2,即生成新窗口频率,每2条数据就会生成一个新窗口;
2)代码
/**
* @Description 滑动计数窗口算子
* @author xiao tang
* @version 1.0.0
* @createTime 2022年04月17日
*/
public class WindowTest2_CountWindow
public static void main(String[] args) throws Exception
// 创建执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// 从socket读取数据
DataStream<String> fileStream = env.readTextFile("D:\\\\workbench_idea\\\\diydata\\\\flinkdemo2\\\\src\\\\main\\\\resources\\\\sensorTimeWindow.txt");
// 转换为 SensorReader pojo类型
DataStream<SensorReadingWindow> sensorStream = fileStream.map(x ->
String[] arr = x.split(",");
return new SensorReadingWindow(arr[0], arr[1], new BigDecimal(arr[2]));
);
// 滑动计数窗口,进行增量聚合来计算温度均值(采用 Tuple2 )
DataStream<Tuple2<String, BigDecimal>> windowAggStream = sensorStream
.keyBy(SensorReadingWindow::getId)// 按照id 分组
.countWindow(5, 2) // 滑动计数窗口
.aggregate(new AggregateFunction<SensorReadingWindow, Tuple3<String, BigDecimal, Integer>, Tuple2<String, BigDecimal>>()
@Override
public Tuple3<String, BigDecimal, Integer> createAccumulator()
return new Tuple3<>("", BigDecimal.ZERO, 0); // 初始值
@Override
public Tuple3<String, BigDecimal, Integer> add(SensorReadingWindow sensorReading, Tuple3<String, BigDecimal, Integer> accumulator)
return new Tuple3<>(sensorReading.getId(), sensorReading.getTemperature().add(accumulator.f1).setScale(2,BigDecimal.ROUND_HALF_UP), accumulator.f2+1);
@Override
public Tuple2<String, BigDecimal> getResult(Tuple3<String, BigDecimal, Integer> accumulator)
return new Tuple2<>(accumulator.f0, accumulator.f1.divide(new BigDecimal(accumulator.f2)).setScale(2,BigDecimal.ROUND_HALF_UP));
@Override
public Tuple3<String, BigDecimal, Integer> merge(Tuple3<String, BigDecimal, Integer> a, Tuple3<String, BigDecimal, Integer> b)
return new Tuple3<String, BigDecimal, Integer>(a.f0, a.f1.add(b.f1), a.f2 + b.f2);
)
;
// 打印
windowAggStream.print("slideCountWindowAggStream");
// 执行
env.execute("slideCountWindowAggStreamJob");
sensor文本:
sensor1,2022-04-17 22:07:01,36.1 sensor2,2022-04-17 22:07:02,36.2 sensor1,2022-04-17 22:07:03,36.3 sensor2,2022-04-17 22:07:04,36.4 sensor1,2022-04-17 22:07:05,36.5 sensor1,2022-04-17 22:07:06,36.6 sensor1,2022-04-17 22:07:07,36.7
打印结果:
slideCountWindowAggStream> (sensor1,36.20)
slideCountWindowAggStream> (sensor2,36.30)
slideCountWindowAggStream> (sensor1,36.38)
3)分析:
sensor1 的温度是 36.1、 36.3、 36.5、 36.6、 36.7 ;
sensor2 的温度是 36.2、36.4 ;
所以 sensor2 的温度均值是 36.3 ,打印结果没有问题;
对于 sensor1 的温度均值, 又每2个生成一个新窗口,则 第1个窗口的均值是 数据 36.1 和 36.3 的均值;
第2个窗口的均值是数据36.1 36.3 36.5 36.6 4个数据的均值(四舍五入);
【4】 滑动计数窗口代码变体
把sensor文本修改为:(多了一条数据,第8条数据)
sensor1,2022-04-17 22:07:01,36.1
sensor2,2022-04-17 22:07:02,36.2
sensor1,2022-04-17 22:07:03,36.3
sensor2,2022-04-17 22:07:04,36.4
sensor1,2022-04-17 22:07:05,36.5
sensor1,2022-04-17 22:07:06,36.6
sensor1,2022-04-17 22:07:07,36.7
sensor1,2022-04-17 22:07:07,36.7
打印结果为:
slideCountWindowAggStream> (sensor1,36.20)
slideCountWindowAggStream> (sensor2,36.30)
slideCountWindowAggStream> (sensor1,36.38)
slideCountWindowAggStream> (sensor1,36.56)
分析 【4】与【3】的源代码相同,但输入数据sensor和打印结果不同,原因如下:
【4】有8条数据:按照key分组如下:
sensor1 的温度是 36.1、 36.3、 36.5、 36.6、 36.7 、36.7;
sensor2 的温度是 36.2、36.4 ;
sensor2的数据没变,下文只讨论 sensor1的数据;又每2个生成一个新窗口,则窗口列表如下:
- 第1个窗口的均值是 数据 36.1 和 36.3 的均值 36.2;
- 第2个窗口的均值是 数据 36.1 ,36.3,36.5, 36.6 的均值 36.375的四舍五入值 36.38;
- 第3个窗口的均值是 数据 36.3,36.5, 36.6 , 36.7, 36.7 的均值 36.56 的四舍五入值 36.56;(注意,第3个窗口的数据项不是 36.1, 36.3,36.5, 36.6 , 36.7, 36.7,因为窗口大小为5,包不下6个数字 )
【小结】
经过以上分析,本文应该是把窗口计算规则讲明白了;
在进行窗口聚合时,需要关注2个参数,即 窗口大小,滑动步长 ;
新人创作打卡挑战赛 发博客就能抽奖!定制产品红包拿不停!以上是关于4.1flink窗口算子的trigger触发器和Evictor清理器的主要内容,如果未能解决你的问题,请参考以下文章
flink 控制窗口行为(触发器移除器允许延迟将迟到的数据放入侧输出流)