4.1flink窗口算子的trigger触发器和Evictor清理器

Posted PacosonSWJTU

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了4.1flink窗口算子的trigger触发器和Evictor清理器相关的知识,希望对你有一定的参考价值。

【README】

本文记录了 窗口算子的触发器trigger和 evictor清理器;

  • trigger触发器:决定了一个窗口(由 window assigner 定义)何时可以被 window function 处理;
  • evictor清理器: evictor 可以在 trigger 触发后、调用窗口函数之前或之后从窗口中删除元素;

【1】触发器trigger

1)Trigger 接口提供了五个方法来响应不同的事件

  1. onElement() 方法在每个元素被加入窗口时调用。
  2. onEventTime() 方法在注册的 event-time timer 触发时调用。
  3. onProcessingTime() 方法在注册的 processing-time timer 触发时调用。
  4. onMerge() 方法与有状态的 trigger 相关。该方法会在两个窗口合并时, 将窗口对应 trigger 的状态进行合并,比如使用会话窗口时。
  5. 最后,clear() 方法处理在对应窗口被移除时所需的逻辑。

2)有两点需要注意:

  • 前三个方法通过返回 TriggerResult 来决定 trigger 如何应对到达窗口的事件。应对方案有以下几种:
    • CONTINUE: 什么也不做
    • FIRE: 触发计算
    • PURGE: 清空窗口内的元素
    • FIRE_AND_PURGE: 触发计算,计算结束后清空窗口内的元素
  • 上面的任意方法都可以用来注册 processing-time 或 event-time timer。

3)触发(Fire)与清除(Purge)
当 trigger 认定一个窗口可以被计算时,它就会触发,也就是返回 FIRE 或 FIRE_AND_PURGE


【1.1】触发器代码

1)以滑动计数窗口为例

DataStream<Tuple2<String, BigDecimal>> windowAggStream = sensorStream
                .keyBy(SensorReadingWindow::getId)// 按照id 分组
                .countWindow(5, 2) // 滑动计数窗口

countWindow() 如下:

public WindowedStream<T, KEY, GlobalWindow> countWindow(long size, long slide) 
        return this.window(GlobalWindows.create()).evictor(CountEvictor.of(size)).trigger(CountTrigger.of(slide));
    
CountTrigger.of() 就是返回计数触发器;
public static <W extends Window> CountTrigger<W> of(long maxCount) 
        return new CountTrigger(maxCount);
    

 其中 maxCount等于滑动步长2,CountTrigger计数触发器定义如下:

 对于 onElement方法,当 计数值大于 maxCount时,则触发生成新窗口;而maxCount等于滑动步长2 ;具体代码如下:

public TriggerResult onElement(Object element, long timestamp, W window, TriggerContext ctx) throws Exception 
        ReducingState<Long> count = (ReducingState)ctx.getPartitionedState(this.stateDesc);
        count.add(1L);
        if ((Long)count.get() >= this.maxCount) 
            count.clear();
            return TriggerResult.FIRE;
         else 
            return TriggerResult.CONTINUE;
        
    

即,对于 7个元素的流,会在第2 , 4, 6 个元素生成新窗口;因为滑动步长为2;


 

【2】清理器 evictor 

1)定义:Evictor 可以在 trigger 触发后、调用窗口函数之前或之后从窗口中删除元素。

2)Evictor 接口提供了两个方法实现此功能:

evictBefore() 包含在调用窗口函数前的逻辑,而 evictAfter() 包含在窗口函数调用之后的逻辑。 在调用窗口函数之前被移除的元素不会被窗口函数计算。

(图1 CountEvictor类定义)

3)清理器分类

Flink 内置有三个 evictor:

  1. CountEvictor: 仅记录用户指定数量的元素,一旦窗口中的元素超过这个数量,多余的元素会从窗口缓存的开头移除
  2. DeltaEvictor: 接收 DeltaFunction 和 threshold 参数,计算最后一个元素与窗口缓存中所有元素的差值, 并移除差值大于或等于 threshold 的元素。
  3. TimeEvictor: 接收 interval 参数,以毫秒表示。 它会找到窗口中元素的最大 timestamp max_ts 并移除比 max_ts - interval 小的所有元素。

 

【2.1】代码示例

对于 countWindow 计数窗口方法,其使用了 Evictor,如下:

public WindowedStream<T, KEY, GlobalWindow> countWindow(long size, long slide) 
        return this.window(GlobalWindows.create()).evictor(CountEvictor.of(size)).trigger(CountTrigger.of(slide));
    

CountEvictor 定义见图1;其长度为 窗口大小;


【3】滑动计数窗口代码

1)滑动计数窗口参数

  • 窗口大小:5;
  • 滑动步长:2,即生成新窗口频率,每2条数据就会生成一个新窗口;

2)代码

/**
 * @Description 滑动计数窗口算子
 * @author xiao tang
 * @version 1.0.0
 * @createTime 2022年04月17日
 */
public class WindowTest2_CountWindow 
    public static void main(String[] args) throws Exception 
        // 创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        // 从socket读取数据
        DataStream<String> fileStream = env.readTextFile("D:\\\\workbench_idea\\\\diydata\\\\flinkdemo2\\\\src\\\\main\\\\resources\\\\sensorTimeWindow.txt");

        // 转换为 SensorReader pojo类型
        DataStream<SensorReadingWindow> sensorStream = fileStream.map(x -> 
            String[] arr = x.split(",");
            return new SensorReadingWindow(arr[0], arr[1], new BigDecimal(arr[2]));
        );

        // 滑动计数窗口,进行增量聚合来计算温度均值(采用 Tuple2 )
        DataStream<Tuple2<String, BigDecimal>> windowAggStream = sensorStream
                .keyBy(SensorReadingWindow::getId)// 按照id 分组
                .countWindow(5, 2) // 滑动计数窗口
                .aggregate(new AggregateFunction<SensorReadingWindow, Tuple3<String, BigDecimal, Integer>, Tuple2<String, BigDecimal>>() 
                    @Override
                    public Tuple3<String, BigDecimal, Integer> createAccumulator() 
                        return new Tuple3<>("", BigDecimal.ZERO, 0); // 初始值
                    
                    @Override
                    public Tuple3<String, BigDecimal, Integer> add(SensorReadingWindow sensorReading, Tuple3<String, BigDecimal, Integer> accumulator) 
                        return new Tuple3<>(sensorReading.getId(), sensorReading.getTemperature().add(accumulator.f1).setScale(2,BigDecimal.ROUND_HALF_UP), accumulator.f2+1);
                    
                    @Override
                    public Tuple2<String, BigDecimal> getResult(Tuple3<String, BigDecimal, Integer> accumulator) 
                        return new Tuple2<>(accumulator.f0, accumulator.f1.divide(new BigDecimal(accumulator.f2)).setScale(2,BigDecimal.ROUND_HALF_UP));
                    
                    @Override
                    public Tuple3<String, BigDecimal, Integer> merge(Tuple3<String, BigDecimal, Integer> a, Tuple3<String, BigDecimal, Integer> b) 
                        return new Tuple3<String, BigDecimal, Integer>(a.f0, a.f1.add(b.f1), a.f2 + b.f2);
                    
                )
                ;
        // 打印
        windowAggStream.print("slideCountWindowAggStream");
        // 执行
        env.execute("slideCountWindowAggStreamJob");
    

sensor文本:

sensor1,2022-04-17 22:07:01,36.1
sensor2,2022-04-17 22:07:02,36.2
sensor1,2022-04-17 22:07:03,36.3
sensor2,2022-04-17 22:07:04,36.4
sensor1,2022-04-17 22:07:05,36.5
sensor1,2022-04-17 22:07:06,36.6
sensor1,2022-04-17 22:07:07,36.7

打印结果:

slideCountWindowAggStream> (sensor1,36.20)
slideCountWindowAggStream> (sensor2,36.30)
slideCountWindowAggStream> (sensor1,36.38)

3)分析:

sensor1 的温度是 36.1、 36.3、 36.5、 36.6、 36.7 ;

sensor2 的温度是 36.2、36.4 ;

所以 sensor2 的温度均值是 36.3 ,打印结果没有问题

对于 sensor1 的温度均值, 又每2个生成一个新窗口,则 第1个窗口的均值是 数据 36.1 和 36.3  的均值;

第2个窗口的均值是数据36.1  36.3  36.5 36.6 4个数据的均值(四舍五入);


【4】 滑动计数窗口代码变体

把sensor文本修改为:(多了一条数据,第8条数据)

sensor1,2022-04-17 22:07:01,36.1
sensor2,2022-04-17 22:07:02,36.2
sensor1,2022-04-17 22:07:03,36.3
sensor2,2022-04-17 22:07:04,36.4
sensor1,2022-04-17 22:07:05,36.5
sensor1,2022-04-17 22:07:06,36.6
sensor1,2022-04-17 22:07:07,36.7
sensor1,2022-04-17 22:07:07,36.7

打印结果为:

slideCountWindowAggStream> (sensor1,36.20)
slideCountWindowAggStream> (sensor2,36.30)
slideCountWindowAggStream> (sensor1,36.38)
slideCountWindowAggStream> (sensor1,36.56)

分析 【4】与【3】的源代码相同,但输入数据sensor和打印结果不同,原因如下:

【4】有8条数据:按照key分组如下:

sensor1 的温度是 36.1、 36.3、 36.5、 36.6、 36.7 、36.7;
sensor2 的温度是 36.2、36.4 ;

sensor2的数据没变,下文只讨论 sensor1的数据;又每2个生成一个新窗口,则窗口列表如下:

  • 第1个窗口的均值是 数据 36.1 和 36.3  的均值 36.2;
  • 第2个窗口的均值是 数据 36.1 ,36.3,36.5, 36.6  的均值 36.375的四舍五入值 36.38;
  • 第3个窗口的均值是 数据  36.3,36.5, 36.6 , 36.7, 36.7  的均值 36.56  的四舍五入值 36.56;(注意,第3个窗口的数据项不是 36.1, 36.3,36.5, 36.6 , 36.7, 36.7,因为窗口大小为5,包不下6个数字

【小结】

经过以上分析,本文应该是把窗口计算规则讲明白了;

在进行窗口聚合时,需要关注2个参数,即 窗口大小,滑动步长

新人创作打卡挑战赛 发博客就能抽奖!定制产品红包拿不停!

以上是关于4.1flink窗口算子的trigger触发器和Evictor清理器的主要内容,如果未能解决你的问题,请参考以下文章

Flink Window API

flink 控制窗口行为(触发器移除器允许延迟将迟到的数据放入侧输出流)

Flink 窗口算子

Flink流处理之窗口算子分析

Flink 自定义触发器实现带超时时间的 countAndTimeTrigger

Flink窗口转换算子