FlinkFlink 源码之时间处理
Posted 九师兄
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了FlinkFlink 源码之时间处理相关的知识,希望对你有一定的参考价值。
1.概述
2.Flink支持的时间类型
- EventTime: 每条数据都携带时间戳。Operator处理数据的时候所有依赖时间的操作依据数据携带的时间戳。可以支持乱序数据的处理。时间戳信息可以在数据源产生数据的时候指定(SourceFunction的中调用context的collectWithTimestamp收集元素),也可以使用DataStream的assignTimestampsAndWatermarks指定。通常来说在每条数据中会有一个字段存储时间戳信息。
- ProcessingTime: 数据不携带任何时间戳的信息。operator使用系统当前时间作为每一条数据的处理时间。如果数据存在乱序的情况,Flink无法察觉。ProcessingTime为系统的默认值。
- IngestionTime: 和EventTime 类似,不同的是Flink会使用系统时间作为timestamp绑定到每条数据(数据进入Flink系统的时候使用系统当前时间为时间戳绑定数据)。可以防止Flink内部处理数据是发生乱序的情况。但无法解决数据到达Flink之前发生的乱序问题。如果需要处理此类问题,建议使用EventTime。
3.设置Flink系统使用的时间类型
使用Environment的setStreamTimeCharacteristic
方法指定系统使用的时间类型。方法参数为TimeCharacteristic
。
TimeCharacteristic
为枚举类型,定义如下。
@PublicEvolving
public enum TimeCharacteristic
ProcessingTime,
IngestionTime,
EventTime
和之前所说的时间类型一一对应。
StreamExecutionEnvironment
的setStreamTimeCharacteristic
方法源码如下:
@PublicEvolving
public void setStreamTimeCharacteristic(TimeCharacteristic characteristic)
this.timeCharacteristic = Preconditions.checkNotNull(characteristic);
if (characteristic == TimeCharacteristic.ProcessingTime)
getConfig().setAutoWatermarkInterval(0);
else
getConfig().setAutoWatermarkInterval(200);
这里我们发现如果系统TimeCharacteristic
为EventTime
或者IngestionTime,会设置一个默认的自动watermark间隔时间(auto watermark interval)。这个参数是用来对齐集群中所有机器的watermark的。所有发送到下游的watermark一定是auto watermark interval的整数倍(通过源码分析发现该配置仅对IngestionTime生效)。具体逻辑在下文StreamSourceContexts部分分析。
4.StreamSourceContexts
StreamSourceContexts
类负责根据系统的TimeCharacteristic
来决定生成哪种类型的SourceContext。SourceContext在SourceFunction使用(参见 Flink 使用之数据源),不同的SourceContext对数据timestamp处理的行为不同。
SourceFunction
中使用的SourceContext由getSourceContext方法决定。
getSourceContext方法的调用链如下所示:
- SourceStreamTask中的
LegacySourceFunctionThread.run
:headOperator.run(getCheckpointLock(), getStreamStatusMaintainer(), operatorChain);
在这一行代码中传入了StreamStatusMaintainer
。可以追溯到StreamTask的getStreamStatusMaintainer
方法,返回的是一个OperatorChain。 StreamSource.run: this.ctx = StreamSourceContexts.getSourceContext
getSourceContext方法的源码如下:
public static <OUT> SourceFunction.SourceContext<OUT> getSourceContext(
TimeCharacteristic timeCharacteristic,
ProcessingTimeService processingTimeService,
Object checkpointLock,
StreamStatusMaintainer streamStatusMaintainer,
Output<StreamRecord<OUT>> output,
long watermarkInterval,
long idleTimeout)
final SourceFunction.SourceContext<OUT> ctx;
switch (timeCharacteristic)
case EventTime:
ctx = new ManualWatermarkContext<>(
output,
processingTimeService,
checkpointLock,
streamStatusMaintainer,
idleTimeout);
break;
case IngestionTime:
ctx = new AutomaticWatermarkContext<>(
output,
watermarkInterval,
processingTimeService,
checkpointLock,
streamStatusMaintainer,
idleTimeout);
break;
case ProcessingTime:
ctx = new NonTimestampContext<>(checkpointLock, output);
break;
default:
throw new IllegalArgumentException(String.valueOf(timeCharacteristic));
return ctx;
从源码可以看出,SourceContext有三种:
- EventTime使用
ManualWatermarkContext
- ProcessingTime使用
NonTimestampContext
- IngestionTime使用
AutomaticWatermarkContext
其中ManualWatermarkContext
和AutomaticWatermarkContext
具有相同的父类WatermarkContext。
下面逐个分析WatermarkContext的方法。
4.1 WatermarkContext类
@Override
public void collect(T element)
// 防止和checkpoint操作同时进行
synchronized (checkpointLock)
// 改变stream的状态为ACTIVE状态
streamStatusMaintainer.toggleStreamStatus(StreamStatus.ACTIVE);
if (nextCheck != null)
// failOnNextCheck:如果下一个空闲检查已被安排,需要设置为true。当元素被collect之后,需要设置该变量为false。
this.failOnNextCheck = false;
else
scheduleNextIdleDetectionTask();
processAndCollect(element);
WatermarkContext的streamStatusMaintainer
只有一个实现类OperatorChain
。该变量由StreamTask
的operatorChain
传入。
nextCheck
为ScheduledFuture
类型。
failOnNextCheck
:如果下一个空闲检查已被安排,需要设置为true。当元素被collect之后,需要设置该变量为false。
如果没有安排下一次空闲检查,需要调用scheduleNextIdleDetectionTask
。代码稍后分析。
最后调用processAndCollect
方法,包含具体的处理和收集数据的逻辑。该方法为抽象方法,稍后分析。
scheduleNextIdleDetectionTask
代码如下:
private void scheduleNextIdleDetectionTask()
if (idleTimeout != -1)
// reset flag; if it remains true when task fires, we have detected idleness
failOnNextCheck = true;
// 安排一个空闲检测任务。该任务在idleTimeout之后执行
// getCurrentProcessingTime()返回的是系统当前时间
nextCheck = this.timeService.registerTimer(
this.timeService.getCurrentProcessingTime() + idleTimeout,
new IdlenessDetectionTask());
IdlenessDetectionTask
的源码如下:
private class IdlenessDetectionTask implements ProcessingTimeCallback
@Override
public void onProcessingTime(long timestamp) throws Exception
synchronized (checkpointLock)
// set this to null now;
// the next idleness detection will be scheduled again
// depending on the below failOnNextCheck condition
// 设置nextCheck为null
// 这样下次调用collect方法的时候会再次安排一个空闲检测任务
nextCheck = null;
if (failOnNextCheck)
// 标记数据源为空闲
markAsTemporarilyIdle();
else
// 再次安排一个空闲检测任务
scheduleNextIdleDetectionTask();
markAsTemporarilyIdle
方法:
@Override
public void markAsTemporarilyIdle()
synchronized (checkpointLock)
// 设置operatorChain的状态为空闲
streamStatusMaintainer.toggleStreamStatus(StreamStatus.IDLE);
经过以上分析我们不难发现collect方法具有自动空闲检测的功能
。数据被收集的时候会设置stream为active状态
,并设置一个空闲检查任务
。该任务会在idleTimeout
时间之后触发。如果在此期间内,仍没有数据被数据源采集,该数据源会被标记为空闲
。如果期间内有数据到来,failOnNextCheck会被设置为false。此时空闲检测任务执行之后便不会标记数据源为空闲状态,取而代之的是再次安排一个空闲检测任务。
collectWithTimestamp
方法在收集元素的同时,为元素绑定时间戳。代码如下:
@Override
public void collectWithTimestamp(T element, long timestamp)
synchronized (checkpointLock)
streamStatusMaintainer.toggleStreamStatus(StreamStatus.ACTIVE);
if (nextCheck != null)
this.failOnNextCheck = false;
else
scheduleNextIdleDetectionTask();
processAndCollectWithTimestamp(element, timestamp);
这段方法和collect方法的逻辑完全一致。同样具有定期检测数据源是否闲置的功能。在方法最后调用了子类的processAndCollectWithTimestamp
方法。
emitWatermark方法用于向下游发送watermark。代码如下:
@Override
public void emitWatermark(Watermark mark)
// 此处多了一个判断,在允许使用watermark的情形下才会调用
if (allowWatermark(mark))
synchronized (checkpointLock)
streamStatusMaintainer.toggleStreamStatus(StreamStatus.ACTIVE);
if (nextCheck != null)
this.failOnNextCheck = false;
else
scheduleNextIdleDetectionTask();
processAndEmitWatermark(mark);
此方法的逻辑和collect方法逻辑基本一致,不再赘述。
close方法用于关闭SourceContext,该方法会取消下一次空闲检测任务。代码如下:
@Override
public void close()
cancelNextIdleDetectionTask();
4.2 ManualWatermarkContext 类
EventTime时间类型使用的是ManualWatermarkContext。ManualWatermarkContext相比父类多了两个成员变量:
- output: 负责输出数据流中的元素。对于StreamSource而言output为AbstractStreamOperator$CountingOutput包装的RecordWriterOutput
- reuse:数据流中一个元素的包装类。该类在此被复用,不必反复创建。
ManualWatermarkContext实现父类的方法如下:
@Override
protected void processAndCollect(T element)
output.collect(reuse.replace(element));
@Override
protected void processAndCollectWithTimestamp(T element, long timestamp)
output.collect(reuse.replace(element, timestamp));
@Override
protected void processAndEmitWatermark(Watermark mark)
output.emitWatermark(mark);
@Override
protected boolean allowWatermark(Watermark mark)
// 永远允许发送watermark,所以返回true
return true;
4.3 AutomaticWatermarkContext 类
IngestionTime时间类型使用的是AutomaticWatermarkContext。
此类的构造方法如下:
private AutomaticWatermarkContext(
final Output<StreamRecord<T>> output,
final long watermarkInterval,
final ProcessingTimeService timeService,
final Object checkpointLock,
final StreamStatusMaintainer streamStatusMaintainer,
final long idleTimeout)
super(timeService, checkpointLock, streamStatusMaintainer, idleTimeout);
this.output = Preconditions.checkNotNull(output, "The output cannot be null.");
Preconditions.checkArgument(watermarkInterval >= 1L, "The watermark interval cannot be smaller than 1 ms.");
// 通过 auto watermark interval配置
this.watermarkInterval = watermarkInterval;
this.reuse = new StreamRecord<>(null);
this.lastRecordTime = Long.MIN_VALUE;
// 获取系统当前时间
long now = this.timeService.getCurrentProcessingTime();
// 设置一个watermark发送定时器,在watermarkInterval时间之后触发
this.nextWatermarkTimer = this.timeService.registerTimer(now + watermarkInterval,
new WatermarkEmittingTask(this.timeService, checkpointLock, output));
WatermarkEmittingTask主要代码逻辑如下:
@Override
public void onProcessingTime(long timestamp)
// 获取系统当前时间
final long currentTime = timeService.getCurrentProcessingTime();
// 加锁,不能和checkpoint操作同时运行
synchronized (lock)
// we should continue to automatically emit watermarks if we are active
// 需要OperatorChain的状态为ACTIVE
if (streamStatusMaintainer.getStreamStatus().isActive())
// idleTimeout 不等于-1意味着设置了数据源的空闲超时时间
// 发送watermark的时候也检查数据源空闲时间
if (idleTimeout != -1 && currentTime - lastRecordTime > idleTimeout)
// if we are configured to detect idleness, piggy-back the idle detection check on the
// watermark interval, so that we may possibly discover idle sources faster before waiting
// for the next idle check to fire
markAsTemporarilyIdle();
// no need to finish the next check, as we are now idle.
cancelNextIdleDetectionTask();
else if (currentTime > nextWatermarkTime)
// align the watermarks across all machines. this will ensure that we
// don't have watermarks that creep along at different intervals because
// the machine clocks are out of sync
// 取watermarkTime 为最接近currentTime 的watermarkInterval整数倍
// 这称为watermark对齐操作,因为集群机器的时间是不同步的
final long watermarkTime = currentTime - (currentTime % watermarkInterval);
// 发送watermark
output.emitWatermark(new Watermark(watermarkTime));
// 设置下次发送的watermark的时间,注意和下次执行发送watermark任务的时间不同
nextWatermarkTime = watermarkTime + watermarkInterval;
// 再次安排一个watermark发送任务
long nextWatermark = currentTime + watermarkInterval;
nextWatermarkTimer = this.timeService.registerTimer(
nextWatermark, new WatermarkEmittingTask(this.timeService, lock, output));
通过以上分析我们不难发现AutomaticWatermarkContext是自动定时发送watermark到下游的。发送的间隔为watermarkInterval。
processAndCollect方法和逻辑如下所示:
@Override
protected void processAndCollect(T element)
lastRecordTime = this.timeService.getCurrentProcessingTime();
output.collect(reuse.replace(element, lastRecordTime));
// this is to avoid lock contention in the lockingObject by
// sending the watermark before the firing of the watermark
// emission task.
// lastRecordTime如果大于nextWatermarkTime需要立即发送一次watermark
// nextWatermarkTime为下次要发送的watermark的时间,和下次执行发送watermark任务的时间不同
// 发送的watermark的时间一定比执行发送watermark任务的时间早
// 如果没有此判断,到下次发送watermark任务执行之后,发送的watermark时间会早于这条数据的时间,下游不会及时处理这条数据。
if (lastRecordTime > nextWatermarkTime)
// in case we jumped some watermarks, recompute the next watermark time
final long watermarkTime = lastRecordTime - (lastRecordTime % watermarkInterval);
// nextWatermarkTime比lastRecordTime大
// 因此下游会立即开始处理这条数据
nextWatermarkTime = watermarkTime + watermarkInterval;
output.emitWatermark(new Watermark(watermarkTime));
// we do not need to register another timer here
// because the emitting task will do so.
processAndCollectWithTimestamp
方法如下所示。第二个参数timestamp被忽略。IngestionTime使用系统时间作为元素绑定时间。
@Override
protected void processAndCollectWithTimestamp(T element, long timestamp)
processAndCollect(element);
最后我们分析下allowWatermark
和processAndEmitWatermark
方法。AutomaticWatermarkContext
不允许我们显式要求发送watermark
。只能通过定时任务发送。只有当waterMark
时间为Long.MAX_VALUE
并且nextWatermarkTime
不为Long.MAX_VALUE
才可以发送。发送过这个特殊的watermark之后,关闭定时发送watermark的任务
。代码如下所示:
@Override
protected boolean allowWatermark(Watermark mark)
// allow Long.MAX_VALUE since this is the special end-watermark that for example the Kafka source emits
return mark.getTimestamp() == Long.MAX_VALUE && nextWatermarkTime != Long.MAX_VALUE;
/** This will only be called if allowWatermark returned @code true. */
@Override
protected void processAndEmitWatermark(Watermark mark)
nextWatermarkTime = Long.MAX_VALUE;
output.emitWatermark(mark);
// we can shutdown the watermark timer now, no watermarks will be needed any more.
// Note that this procedure actually doesn't need to be synchronized with the lock,
// but since it's only a one-time thing, doesn't hurt either
final ScheduledFuture<?> nextWatermarkTimer = this.nextWatermarkTimer;
if (nextWatermarkTimer != null)
nextWatermarkTimer.cancel(true);
4.4 NonTimestampContext 类
这个类比较简单,不处理任何和timestamp相关的逻辑。也不会发送任何watermark。在此不做过多的分析。
5 ProcessingTime 调用链
InternalTimeServiceImpl.registerProcessingTimeTimer
SystemProcessingTimeService.registerTimer
SystemProcessingTimeService.wrapOnTimerCallback
ScheduledTask.run
TimerInvocationContext.invoke
InternalTimeServiceImpl.onProcessingTime(): triggerTarget.onProcessingTime(timer);
4.5.1 InternalTimeServiceImpl.registerProcessingTimeTimer
registerProcessingTimeTimer方法注册一个ProcessingTime定时器:
@Override
// 该方法主要在windowOperator和SimpleTimerService中调用
// 在windowOperator调用,namespace传入当前window
// 在SimpleTimerService调用,namespace传入VoidNamespace.INSTANCE
public void registerProcessingTimeTimer(N namespace, long time)
// 这是一个PriorityQueue。获取timestamp最小的timer
InternalTimer<K, N> oldHead = processingTimeTimersQueue.peek();
// 如果新加入的timer的timestamp是最小的,方法返回true
if (processingTimeTimersQueue.add(new TimerHeapInternalTimer<>(time, (K) keyContext.getCurrentKey(), namespace)))
long nextTriggerTime = oldHead != null ? oldHead.getTimestamp() : Long.MAX_VALUE;
// check if we need to re-schedule our timer to earlier
// 如果新加入的timer的timetstamp在队列中最小(最先执行)
// 需要取消掉原有的timer
// 再重新注册timer,timestamp为新加入timer的timetstamp
if (time < nextTriggerTime)
if (nextTimer != null)
nextTimer.cancel(false);
nextTimer = processingTimeService.registerTimer(time, this);
InternalTimeServiceImpl
维护了一个processingTimeTimersQueue
变量。该变量是一个有序的队列,存储了一系列定时器对象。
InternalTimeServiceManager
在获取InternalTimeServiceImpl
会调用它的startTimerService
方法。该方法会把第一个(时间最早的timer)注册到一个ScheduledThreadPoolExecutor
上。因此第一个timer到时间的时候会调用InternalTimeServiceImpl
的onProcessingTime
方法。
InternalTimeServiceImpl的onProcessingTime
方法代码如下:
@Override
public void onProcessingTime(long time) throws Exception
// null out the timer in case the Triggerable calls registerProcessingTimeTimer()
// inside the callback.
nextTimer = null;
InternalTimer<K, N> timer;
// 一直循环获取时间小于参数time的所有定时器,并运行triggerTarget的onProcessingTime方法
// 例如WindowOperator中的internalTimerService,triggerTarget就是WindowOperator自身
while ((timer = processingTimeTimersQueue.peek()) != null && timer.getTimestamp() <= time)
processingTimeTimersQueue.poll();
keyContext.setCurrentKey(timer.getKey());
triggerTarget.onProcessingTime(timer);
// 执行到这一步的时候timer的timetamp刚好大于参数time
// 此时在安排下一个定时器
if (timer != null && nextTimer == null) FlinkFlink 源码之AsyncFunction异步 IO 源码