Flink自定义触发器

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flink自定义触发器相关的知识,希望对你有一定的参考价值。

参考技术A

上一篇分享中介绍了Flink完成数据统计的例子,在最后提到了自定义的统计触发器,这一篇分享主要介绍一下自定义的触发器如何来实现。

一、触发器的作用
触发器的作用就是我们在窗口中,什么时候来触发我们的聚合方法。主要涉及到的就是聚合计算(AggregateFunction)中的
OUT getResult(ACC var1);
这两个方法

比如我们想要在1个小时为单位的时间窗口里,达到每分钟来刷新数据的目的,那我们就必须每分钟都要触发一次getResult方法,来把数据发送到下一个处理节点(一般来说都是Sink-->保存数据的节点)

二、触发器的的实现
实现很简单,只需要继承Trigger<Object, W>类,实现它的方法即可
例如,我们需要一个带步长的触发器:

方法调用时机如下:
onElement()方法,每个元素被添加到窗口时调用
  
onEventTime()方法,当一个已注册的事件时间计时器启动时调用
   onProcessingTime()方法,当一个已注册的处理时间计时器启动时调用
  
onMerge()方法,与状态性触发器相关,当使用会话窗口时,两个触发器对应的窗口合并时,合并两个触发器的状态。
  *最后一个clear()方法执行任何需要清除的相应窗口
上面的方法中有两个需要注意的地方:
1)第一、三通过返回一个TriggerResult来决定如何操作调用他们的事件,这些操作可以是下面操作中的一个;
CONTINUE:什么也不做
FIRE:触发计算
PURGE:清除窗口中的数据
FIRE_AND_PURGE:触发计算并清除窗口中的数据

三、自定义注册定时触发器
我们在需要在onElement中注册一个定时触发的任务

根据步长来注册下次执行的时间

然后

在onProcessingTime的时候如果步长和当前的执行时间一致,则触发计算
并再注册下一次的触发时间,直到窗口结束。

Flink 触发器

1、什么是触发器
触发器决定了一个窗口何时可以被窗口函数处理(条件满足时触发并发出信号)

每一个WindowAssigner都有一个默认的触发器,如果默认的触发器不能满足你的需要,你可以通过调用trigger(...)来指定一个自定义的触发器

触发器有5个方法来允许触发器处理不同的事件(Trigger):
onElement()方法:每个元素被添加到窗口时调用
onEventTime()方法:当一个已注册的事件时间计时器启动时调用
onProcessingTime()方法:当一个已注册的处理时间计时器启动时调用
onMerge()方法:与状态性触发器相关,当使用session window时,两个触发器对应的窗口合并时,合并两个触发器的状态。
clear() 相应窗口被清除时触发

前三个方法通过返回TriggerResult来决定如何对其调用事件进行操作。该操作可以是以下操作之一:
CONTINUE:什么也不做
FIRE:触发计算
PURGE:清除窗口中的数据
FIRE_AND_PURGE:触发计算并随后清除窗口中的元素

2、触发和清除(Fire and Purge)
一旦一个触发器决定一个窗口已经准备好进行处理,它将触发并返回FIRE或者FIRE_AND_PURGE。这是窗口操作发送当前窗口结果的信号,发送给一个带有ProcessWindowFunction的窗口,所有元素都被传递给ProcessWindowFunction(可能在将它们传递给回收器之后)。

信号发送给具有ReduceFunction、AggregateFunction或FoldFunction的窗口只发出它们聚合的结果。

当一个触发器触发时,它可以是FIRE或者FIRE_AND_PURGE,如果是FIRE的话,将保持window中的内容,FIRE_AND_PURGE的话会清除window的内容。默认情况下,预实现的触发器仅仅是FIRE,不会清除window的状态。

注意:清除操作仅清除window的内容,并留下潜在的窗口元信息和完整的触发器状态。

3、默认触发器
每一个窗口分配器都有一个默认的触发器

WindowAssigner的默认触发器覆盖了很多场景(基本够用了)。
例如,所有event-time window assigner都有一个EventTimeTrigger作为默认触发器。只要水印通过窗口的末端,这个触发器就会触发。

GlobalWindow的默认触发器是永不触发的NeverTrigger。因此在使用GlobalWindow时,必须定义一个自定义触发器。

通过使用trigger()指定触发器,将覆盖WindowAssigner的默认触发器。例如,如果你为TumblingEventTimeWindows指定了CountTrigger,那么将不再根据时间的进度获得窗口触发,而只根据计数。如果希望根据时间和计数进行响应,就必须编写自己的触发器。

4、内置和自定义触发器
4.1、Flink内置的触发器:
EventTimeTrigger(前面提到过) 根据由watermark衡量的Event Time进度来触发
ProcessingTimeTrigger 根据处理时间来触发
CountTrigger 一旦窗口中的元素个数超出了给定的限制就会触发
PurgingTrigger 接受另一个触发器作为参数,并将其转换为一个purging触发器(当嵌套触发器触发时,将返回FIRE_AND_PURGE类型的TriggerResult)

4.2、自定义触发器
实现Trigger

以上是关于Flink自定义触发器的主要内容,如果未能解决你的问题,请参考以下文章

Flink 自定义触发器实现带超时时间的 countAndTimeTrigger

flink02------1.自定义source

Flink实战系列Flink SQL 写入 kafka 自定义分区策略

Flink SQL 自定义 format

1.31.Flink自定义rocketmq(source/sink)+自定义redis source和sink

Flink 自定义 SQL Connector