FlinkFlink 源码之Buffer Debloating
Posted 九师兄
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了FlinkFlink 源码之Buffer Debloating相关的知识,希望对你有一定的参考价值。
1.概述
转载:【Flink】Flink 源码之Buffer Debloating
2.什么是Buffer debloating
Buffer Debloating是Flink 1.14
新增的优化方式。它能够根据指标(buffer数据被全部消费的期望等待时间taskmanager.network.memory.buffer-debloat.target
)自动推算和控制in-flight data(operator输入队列和输出队列缓存的数据)大小,从而减少checkpoint耗时,减少checkpoint存储大小和恢复时间(因为in-flight data的量减少了)。对于Unaligned Checkpoint效果更为显著。启用和配置方式参见Flink 使用之状态和checkpoint。
3.buffer debloating 源代码解析
StreamTask摄入数据的时候,schedule一个buffer debloater定时任务,触发间隔时间为taskmanager.network.memory.buffer-debloat.period
。
@Override
public final void invoke() throws Exception
// Allow invoking method 'invoke' without having to call 'restore' before it.
if (!isRunning)
LOG.debug("Restoring during invoke will be called.");
restoreInternal();
// final check to exit early before starting to run
ensureNotCanceled();
// 创建buffer debloating定时任务
scheduleBufferDebloater();
// let the task do its work
runMailboxLoop();
// if this left the run() method cleanly despite the fact that this was canceled,
// make sure the "clean shutdown" is not attempted
ensureNotCanceled();
afterInvoke();
scheduleBufferDebloater方法在systemTimerService注册一个定时任务,周期性触发debloat任务。
private void scheduleBufferDebloater()
// See https://issues.apache.org/jira/browse/FLINK-23560
// If there are no input gates, there is no point of calculating the throughput and running
// the debloater. At the same time, for SourceStreamTask using legacy sources and checkpoint
// lock, enqueuing even a single mailbox action can cause performance regression. This is
// especially visible in batch, with disabled checkpointing and no processing time timers.
// 如果没有inputGate或者没有启用buffer debloating,直接返回
if (getEnvironment().getAllInputGates().length == 0
|| !environment
.getTaskManagerInfo()
.getConfiguration()
.getBoolean(TaskManagerOptions.BUFFER_DEBLOAT_ENABLED))
return;
// 注册一个事件,在buffer debloat间隔时间之后调用debloat方法
// buffer debloat间隔时间由配置项taskmanager.network.memory.buffer-debloat.period决定
systemTimerService.registerTimer(
systemTimerService.getCurrentProcessingTime() + bufferDebloatPeriod,
timestamp ->
mainMailboxExecutor.execute(
() ->
debloat();
// 再schedule一个作业,实现周期定时调用
scheduleBufferDebloater();
,
"Buffer size recalculation"));
到此我们可以得知,debloat主要执行逻辑在debloat方法中。
void debloat()
for (IndexedInputGate inputGate : environment.getAllInputGates())
inputGate.triggerDebloating();
debloat方法依次调用所有inputGate的triggerDebloating方法。
我们查看SingleInputGate的triggerDebloating方法。
@Override
public void triggerDebloating()
if (isFinished() || closeFuture.isDone())
return;
checkState(bufferDebloater != null, "Buffer debloater should not be null");
final long currentThroughput = throughputCalculator.calculateThroughput();
bufferDebloater
// 重新计算buffer大小
.recalculateBufferSize(currentThroughput, getBuffersInUseCount())
// 如果返回的不为empty,说明需要更新buffer size
// 设置各个channel的buffer size
.ifPresent(this::announceBufferSize);
首先我们分析吞吐量计算器ThroughputCalculator的calculateThroughput方法,用于计算间隔时间内的吞吐量。
public long calculateThroughput()
if (measurementStartTime != NOT_TRACKED)
// 获取当前时间
long absoluteTimeMillis = clock.absoluteTimeMillis();
// 获取计量吞吐量期间时长
currentMeasurementTime += absoluteTimeMillis - measurementStartTime;
// 设置下一个计量起始时间
measurementStartTime = absoluteTimeMillis;
// 计算吞吐量,方法参数为这段时间累积的数据量和时长
long throughput = calculateThroughput(currentAccumulatedDataSize, currentMeasurementTime);
// 变量重置
currentAccumulatedDataSize = currentMeasurementTime = 0;
return throughput;
public long calculateThroughput(long dataSize, long time)
checkArgument(dataSize >= 0, "Size of data should be non negative");
checkArgument(time >= 0, "Time should be non negative");
if (time == 0)
return currentThroughput;
return currentThroughput = instantThroughput(dataSize, time);
// 计算每秒的数据量
static long instantThroughput(long dataSize, long time)
return (long) ((double) dataSize / time * MILLIS_IN_SECOND);
我们回到SingleInputGate的getBuffersInUseCount方法,它统计各个channel已使用的buffer数量总和。
int getBuffersInUseCount()
int total = 0;
for (InputChannel channel : channels)
total += channel.getBuffersInUseCount();
return total;
接下来该分析BufferDebloater的recalculateBufferSize方法。它使用指数滑动平均(EMA)算法,推算出一个更为平滑过度的buffer值。它还能够对比新旧buffer size值的变化率,如果变化率过小,不修改buffer size,从而避免了来回拉锯式频繁修改buffer大小造成性能剧烈抖动。
public OptionalInt recalculateBufferSize(long currentThroughput, int buffersInUse)
// 当前实际buffer使用量
int actualBuffersInUse = Math.max(1, buffersInUse);
// 计算期待的buffer大小,计算公式为:
// 当前吞吐量 x buffer数据被全部消费的期望等待时间(taskmanager.network.memory.buffer-debloat.target)
long desiredTotalBufferSizeInBytes =
(currentThroughput * targetTotalBufferSize) / MILLIS_IN_SECOND;
// 使用指数滑动平均算法(Exponential moving average),计算新的buffer大小
int newSize =
bufferSizeEMA.calculateBufferSize(
desiredTotalBufferSizeInBytes, actualBuffersInUse);
// 估算buffer数据完全消费的所需时间
lastEstimatedTimeToConsumeBuffers =
Duration.ofMillis(
newSize
* actualBuffersInUse
* MILLIS_IN_SECOND
/ Math.max(1, currentThroughput));
// 如果新计算出的大小和旧的很接近,不更新buffer大小
// 旧buffer大小乘以taskmanager.network.memory.buffer-debloat.threshold-percentages计算出变化量
// 如果newSize和旧buffer大小差异值小于变化量,则不更新buffer大小
boolean skipUpdate = skipUpdate(newSize);
LOG.debug(
"Buffer size recalculation: gateIndex=, currentSize=, newSize=, instantThroughput=, desiredBufferSize=, buffersInUse=, estimatedTimeToConsumeBuffers=, announceNewSize=",
gateIndex,
lastBufferSize,
newSize,
currentThroughput,
desiredTotalBufferSizeInBytes,
buffersInUse,
lastEstimatedTimeToConsumeBuffers,
!skipUpdate);
// Skip update if the new value pretty close to the old one.
// 如果不需要更新,返回empty
if (skipUpdate)
return OptionalInt.empty();
// 返回新计算的buffer大小
lastBufferSize = newSize;
return OptionalInt.of(newSize);
以上是关于FlinkFlink 源码之Buffer Debloating的主要内容,如果未能解决你的问题,请参考以下文章