Flink - Timer 与 TimerService 源码分析与详解

Posted BIT_666

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flink - Timer 与 TimerService 源码分析与详解相关的知识,希望对你有一定的参考价值。

目录

一.引言 

二.Timer 简介与特性

1.Timer 四大特性

1.1 Timers are registered on a KeyedStream

1.2 Timers are automatically deduplicated

1.3 Timers are checkpointed

1.4 Timers can be deleted

2.Timer 常用方法

2.1 注册 Timer

2.2 删除 Timer

2.3 更多

三.Timer 相关组件 - keyedStream && keyedProcessFunction

1.架构关系

2.代码浅析 

3.KeyedProcessFunction

四.Timer 相关组件 - Context && TimerService

1.TimerService 初始化

1.1  internalTimerService

1.2 SimpleTimerService

1.3 timeServiceManager

2.InternalTimeServiceManagerImpl

2.1 priorityQueueSetFactory

2.2 timerServices

2.3 TimerHeapInternalTimer

3.InternalTimerServiceImpl

3.1 注册 Timer 与 TimeStamp

3.2 onTimer 回调

3.3 调用关系

五.总结


一.引言 

Timer 是 Flink 基于 ProcessFunction 引入的一种时间计时器,可以基于处理时间和事件时间,仅支持基于 keyedStream 及其衍生 Stream 使用。下面是 Timer 的最常用场景:

  • A. processElement() 处理 Event 获得一个 Context 对象
  • B. Context 对象访问元素的事件时间与 TimerService
  • C. TimerService 为当前 Event 注册或删除 Timer
  • D. 当 WaterMark 达到 Timer 注册时间时回调 onTimer 函数执行对应回调逻辑

二.Timer 简介与特性

1.Timer 四大特性

1.1 Timers are registered on a KeyedStream

因为 Timer 是基于每个键即 key 注册并触发,所以 KeyedStream 是 Timer 在 Flink 中使用的先决条件

ctx.timerService.deleteEventTimeTimer(timeStamp)

1.2 Timers are automatically deduplicated

TimerService 会自动消除计时器的重复数据,始终保持每个键 key 最多只有一个计时器,当一个键 key 注册多个 Timer 计时器时,onTimer 方法只会调用一次,重复注册会覆盖之前的 timer 注册

1.3 Timers are checkpointed

ValueState 可以通过 checkpoint 进行检查点保存和恢复,同理 Timer 也可以由 checkpoint 托管,从 Flink checkpoint 检查点恢复任务时,将立即启动恢复前应启动的处于恢复状态的每个已注册计时器,这也提高了 Timer 的容错性

1.4 Timers can be deleted

从 Flink 1.6.x 开始,计时器可以暂停和删除,提供更便捷的 Timer 处理方式

2.Timer 常用方法

2.1 注册 Timer

ctx.timerService.registerEventTimeTimer(timestamp)

通过 timerService 注册事件时间触发 Timer,并在到期时执行 onTimer 回调函数

2.2 删除 Timer

ctx.timerService.deleteEventTimeTimer(timestamp)

通过 timerService 删除时间对应的 Timer,如果当前事件未注册相关时间戳 Timer,则删除无效

2.3 更多

TimerService 通过 context 调用,自 1.12.x 版本起,Flink 采用 EventTime 作为默认时间机制,所以本文主要讨论基于 EventTime 的 Timer 相关,ProcessTime 注册 Timer 方法如下:

ctx.timerService.registerProcessingTimeTimer(timestamp) // 注册
ctx.timerService.deleteProcessingTimeTimer(timestamp) // 删除

三.Timer 相关组件 - keyedStream && keyedProcessFunction

通过前面的描述,我们已经了解了 Timer 是基于 KeyedStream 的一种时间处理机制,其主要通过 ProcessFunction 内的 context 获取 TimerService 实现相关时间的注册与删除,下面看下keyedStream && keyedProcessFunction 的组织构成。

1.架构关系

keyedStream 基于 keyedProcessFunction 处理,keyedProcessFunction 底层算子为 KeyedProcessOperator,下面看下相关类图,按从低到高排列:

public interface StreamOperator<OUT> extends CheckpointListener, KeyContext, Disposable, Serializable 

public abstract class AbstractStreamOperator<OUT> implements StreamOperator<OUT>, SetupableStreamOperator<OUT>, CheckpointedStreamOperator, Serializable 

public abstract class AbstractUdfStreamOperator<OUT, F extends Function> extends AbstractStreamOperator<OUT> implements OutputTypeConfigurable<OUT> 

public interface OneInputStreamOperator<IN, OUT> extends StreamOperator<OUT>, Input<IN> 

public class KeyedProcessOperator<K, IN, OUT> extends AbstractUdfStreamOperator<OUT, KeyedProcessFunction<K, IN, OUT>> implements OneInputStreamOperator<IN, OUT>, Triggerable<K, VoidNamespace> 

上述关系可以转换为下图关系,这里 StreamOperator 继承 Disposable 和 Serializable 并未展示

2.代码浅析 

下图为 KeyedOperator<K, IN, OUT> 部分源码实现部分,K 代表 keyedStream 的 keyBy 类型,IN,OUT 代表当前 Operator 的处理输入与输出的泛型。

- open

open 函数内可以看到 timerService 通过 internalTimerService 初始化为 SimpleTimerService,后续将用户自定义的 UDF-ProcessFunction 与该 TimerService 传入 ContextImpl 实现 processElement 方法中的 context,同时传入 OnTimerContextImpl 生成 onTimer 函数逻辑内的 onTimerContext,同时生成 collector 用户数据的传输,通过 collect 方法即可输出数据到下游

- onEventTime / onProcessTime

当注册 Timer 对应的时间戳触发时,根据对应时间模式调用相应的处理逻辑。

- processElement 

通过 UDF-ProcessFunction 处理函数逻辑,可以通过 context 获取上下文变量,也可以通过 collector 输出数据

3.KeyedProcessFunction

通过源码可以看到,processElement 与 onTimer 调用的 context 并非同一个:

常用的除了 keyedProcessFunction,还有 KeyedBroadcastProcessFunction,该处理函数用于 keyedStream 与 BroadcastStream 的结合处理,可以看到该场景下,会存在三种不同的 context 对应三个处理函数:

关于广播流与 KeyedBroadcastProcessFunction 的使用,大家可以参考 Flink / Scala - DataStream Broadcast State 模式示例详解。keyedStream 与 keyedProcessFunction 为外层包装 API,主要了解其结构与相关方法含义即可,下面针对 Timer 更底层的 context 与 TimerService 进行挖掘。

四.Timer 相关组件 - Context && TimerService

1.TimerService 初始化

从 KeyedProcessOperator 可以看到 TimerService 的初始化过程:

InternalTimerService -> SimpleTimerService -> TimerService

1.1  internalTimerService

InternalTimerService 通过 getInternalTimerService 获取,位于 AbstractStreamOperator 类内,构造参数主要包含 name 名称,命名空间类型 N 的序列化器与 Trigger<K, N> 的触发器,这里 Trigger 也并不陌生,之前 Flink - Scala/Java trigger 简介与使用 中介绍了 Flink 的 Trigger 系统,主要用于触发相关操作,一般使用 TriggerResult.CONTINUE 跳过操作,TriggerResult.FIRE 触发操作。

Tips: internalTimerService 底层操作基于 timeServiceManager。

但其实 InternalTimerService 本身仍然是一个接口,从 register 和 delete 相关方法我们已经看到 Timer 的影子。

1.2 SimpleTimerService

该类只是在 InternalTimerService 接口的基础上进行了包装,非常简单:

1.3 timeServiceManager

InternalTimerService 由 internalTimerServiceManager 统一管理:

AbstractStreamOperator 方法内 timeServiceManager 通过 context 初始化 internalTimerServiceManager 得到,而 internalTimerServiceManager<K> 实际上也是接口,其真正实现是在 InternalTimeServiceManagerImpl<K> 类内,下面主要分析下 InternalTimeServiceManagerImpl。

2.InternalTimeServiceManagerImpl

通过 InternalTimeServiceManagerImpl 的初始化变量可以看到其主要负责内容。

2.1 priorityQueueSetFactory

该方法生成 KeyGroupedInternalPriorityQueue<TimerHeapInternalTimer<K, N>>, 实际生成类为 HeapPriorityQueueSet:

HeapPriorityQueueSet 包含多个参数:

keyGroup:相当于 key 对应的分组,与对应 processFunction 并行度一致,最常见的方法即 key.hashCode() % parallelism

keyGroupRange:连续 keyGroup 的范围,一般而言每个 sub-task 都会维护一份 KeyGroupRange 包含其本地的所有 keyGroup

deduplicationMapsByKeyGroup:则实现了去重,对应 Flink Timer 的特性2 - Timers are automatically deduplicated

totalNumberOfKeyGroups:全部 keyGroup 的数量

Tips:

HeapPriorityQueueSet 在 PriorityQueue 优先队列的基础上增加了去重的原理,因为 putIfAbsent() 方法会先判断指定的键(key)是否存在,不存在则将键/值对插入到 HashMap 中,这里也对应了 Flink Timer 的第二个特性,TimerService 自动去除重复数据。PriorityQueue 的实现原理基于堆,有需要的同学可以参考 Scala - PriorityQueue 踩坑之不保序

2.2 timerServices

timerServices Map<String, InternalTimerServiceImpl<K, ?>> 通过 Map 维护多个 InternalTimerServiceImpl,即 InternalTimeServiceManagerImpl 可以管理对应 K 多个 InternalTimerServiceImpl。

获取 timerService 时,会先判断对应 name 的 Service 是否存在,如果不存在则创建,存在则直接获取:

timerService = new InternalTimerServiceImpl(
this.localKeyGroupRange, 
this.keyContext, 
this.processingTimeService, 
this.createTimerPriorityQueue("_timer_state/processing_" + name, timerSerializer), 
this.createTimerPriorityQueue("_timer_state/event_" + name, timerSerializer));

可以看到对应 TimerService 就是 InternalTimerServiceImpl,其初始化会分别为 ProcessTime 和 EventTime 创建 PriorityQueue 。

2.3 TimerHeapInternalTimer

 可以看到 PriorityQueue 创建时对应的基类为 TimerHeapInternalTimer:

这里基类提供了4个字段:

timestamp: 时间戳

key: 分区键 key

namespace:命名空间,可以看作是基于 operator 维度的标识

timerHeapIndex: timer 在堆中的位置索引,由于堆积于二叉树实现,所以可以根据索引确定位置

还有一个主要方法:

cinoarePriorityTo:判断不同 TimerHeapInternalTimer 之间的优先级,可以看到非常简单,就是基于各自的时间戳进行比较,通过 Long.compare 也可以看出,PriorityQueue 实现为最小堆,因为堆顶应该是时间最小,最先触发的 timestamp

Tips:

从上面可以看出,一个 TimerHeapInternalTimer 可以通过 name + scope 确定,作为 operator 的 scope 我们其实不感知,所以可以看做是基于 key 级别处理,对应特性一,只能应用于 keyedStream,关于 scope 最好的例子就是 Flink / Scala - ProcessFunction 之间共用缓存测试,本文中,我们基于相同的 key 在两个 processFunction 中存储 ValueState,虽然 key 相同,但是两个 Function 函数不互通,原因就是不同 processFunction 对应不同 scope 的 operator,所以 key + scope 映射到单独 operator 内的单独去重 key 上。

3.InternalTimerServiceImpl

3.1 注册 Timer 与 TimeStamp

基于上面的分析,我们现在了解了流程,一个 Timer 类需要首先 keyedProcessOpeartor 注册生成 TimerService,而 TimerService 则是通过注册生成 InternalTimeServiceManagerImpl 类用来管理并实现对 key+scope 命名空间下 Timer 的管理,其管理 K 的 InternalTimerServiceImpl,而 InternalTimerServiceImpl 管理的最小单位为 TimerHeapInternalTimer,通过 heap + TimerHeapInternalTimer.timeStamp 完成时间的从小到大排序。

这里可以看到,为某个 operator scope 下的 key 注册 EventTimeStamp 或者删除就是将 TimerHeapInternalTimer 添加或从堆中移除,正如上面所说,该堆为基于时间戳的最小堆,堆顶为最先到消费时间的 Timer 类。 

3.2 onTimer 回调

InternalTimerServiceImpl 类内通过 triggerTarget 实现对 onProcessTime 和 advanceWatermark 进行 ProcessTime 和 EventTime 的 onTimer 回调触发。

由于 EventTime 需要基于 watermark 推进事件时钟的前进,所以方法名略有不同,执行的逻辑也很明白,判断当前 time 是否与最小堆的堆顶 TimerHeapInternalTimer 的 timestamp 相等,如果相等,则代表到达对应 Event 的触发事件,执行 poll 并通过 triggerTarget 调用 onEventTime。

查看对应 keyedProcessOperator 可以看出这里 invoke 调用 UDF UserFunction,最终调用至 onTime (timer.getTimestamp(), this.onTimerContext,this.collector):

3.3 调用关系

上面我们通过 InternalTimerServiceImpl 调用了 advanceWatermark 触发了 EventTime 模式下的 onTimer,InternalTimerServiceImpl 受到 InternalTimeServiceManagerImpl 的管理,InternalTimeServiceManagerImpl 下 advanceWatermark 会遍历 timeServers 中的多个 InternalTimerServiceImpl,并以此触发 InternalTimerServiceImpl 的 advanceWatermark:

而 InternalTimeServiceManagerImpl 则是在 AbstractStreamOperator 内部实现了初始化:

可以看出  AbstractStreamOperator 类在处理 Watermark 时实现 advanceWatermark 的调用,即调用 InternalTimeServiceManagerImpl 触发每个 InternalTimerServiceImpl 的 advanceWatermark,随后 emitWatermark 发出新的水印。

五.总结

兜兜转转,从 AbstractStreamOperator 开始,我们一路引用至 triggerTarget 回调 onEvent 触发 ProcessFunction 的 onTimer 函数,又从触发 onTimer 的 onEventTime 一步一步回退至最初的 AbstractStreamOperator,一次完整的 Timer 管理 EventTimer 就结束了。其内部主要知识点:

  • 特性1. 通过 key + scope 的模式实现了基于 keyedSteam 实现 TimerService
  • 特性2. 通过 HeapPriorityQueueSet 实现了 TimerService 的去重
  • 特性3. 通过 checkpoint 实现了 TimeService 保存与恢复
  • 特性4. 通过 PriorityQueue 实现 TimerService 的添加与删减

即我们问文章开头提到的 4 characteristics of Timers in Apache Flink:

最后祝各位中秋节快乐🎑

以上是关于Flink - Timer 与 TimerService 源码分析与详解的主要内容,如果未能解决你的问题,请参考以下文章

Flink Timer 机制原理,源码整理。

Flink的定时器(EventTime和ProcessTime)

PyFlink 教程:PyFlink DataStream API - state & timer

PyFlink 教程:PyFlink DataStream API - state & timer

从0到1Flink的成长之路-Flink Action 综合案例-订单自动好评

从0到1Flink的成长之路-Flink Action 综合案例-订单自动好评