FlinkFlink 源码之Buffer Debloating

Posted 九师兄

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了FlinkFlink 源码之Buffer Debloating相关的知识,希望对你有一定的参考价值。

1.概述

转载:【Flink】Flink 源码之Buffer Debloating

2.什么是Buffer debloating

Buffer Debloating是Flink 1.14新增的优化方式。它能够根据指标(buffer数据被全部消费的期望等待时间taskmanager.network.memory.buffer-debloat.target)自动推算和控制in-flight data(operator输入队列和输出队列缓存的数据)大小,从而减少checkpoint耗时,减少checkpoint存储大小和恢复时间(因为in-flight data的量减少了)。对于Unaligned Checkpoint效果更为显著。启用和配置方式参见Flink 使用之状态和checkpoint。

3.buffer debloating 源代码解析

StreamTask摄入数据的时候,schedule一个buffer debloater定时任务,触发间隔时间为taskmanager.network.memory.buffer-debloat.period

@Override
public final void invoke() throws Exception 
    // Allow invoking method 'invoke' without having to call 'restore' before it.
    if (!isRunning) 
        LOG.debug("Restoring during invoke will be called.");
        restoreInternal();
    

    // final check to exit early before starting to run
    ensureNotCanceled();

    // 创建buffer debloating定时任务
    scheduleBufferDebloater();

    // let the task do its work
    runMailboxLoop();

    // if this left the run() method cleanly despite the fact that this was canceled,
    // make sure the "clean shutdown" is not attempted
    ensureNotCanceled();

    afterInvoke();

scheduleBufferDebloater方法在systemTimerService注册一个定时任务,周期性触发debloat任务。

private void scheduleBufferDebloater() 
    // See https://issues.apache.org/jira/browse/FLINK-23560
    // If there are no input gates, there is no point of calculating the throughput and running
    // the debloater. At the same time, for SourceStreamTask using legacy sources and checkpoint
    // lock, enqueuing even a single mailbox action can cause performance regression. This is
    // especially visible in batch, with disabled checkpointing and no processing time timers.
    
    // 如果没有inputGate或者没有启用buffer debloating,直接返回
    if (getEnvironment().getAllInputGates().length == 0
            || !environment
                    .getTaskManagerInfo()
                    .getConfiguration()
                    .getBoolean(TaskManagerOptions.BUFFER_DEBLOAT_ENABLED)) 
        return;
    
    // 注册一个事件,在buffer debloat间隔时间之后调用debloat方法
    // buffer debloat间隔时间由配置项taskmanager.network.memory.buffer-debloat.period决定
    systemTimerService.registerTimer(
            systemTimerService.getCurrentProcessingTime() + bufferDebloatPeriod,
            timestamp ->
                    mainMailboxExecutor.execute(
                            () -> 
                                debloat();
                                // 再schedule一个作业,实现周期定时调用
                                scheduleBufferDebloater();
                            ,
                            "Buffer size recalculation"));

到此我们可以得知,debloat主要执行逻辑在debloat方法中。

void debloat() 
    for (IndexedInputGate inputGate : environment.getAllInputGates()) 
        inputGate.triggerDebloating();
    

debloat方法依次调用所有inputGate的triggerDebloating方法。

我们查看SingleInputGate的triggerDebloating方法。

@Override
public void triggerDebloating() 
    if (isFinished() || closeFuture.isDone()) 
        return;
    

    checkState(bufferDebloater != null, "Buffer debloater should not be null");
    final long currentThroughput = throughputCalculator.calculateThroughput();
    bufferDebloater
            // 重新计算buffer大小
            .recalculateBufferSize(currentThroughput, getBuffersInUseCount())
            // 如果返回的不为empty,说明需要更新buffer size
            // 设置各个channel的buffer size
            .ifPresent(this::announceBufferSize);

首先我们分析吞吐量计算器ThroughputCalculator的calculateThroughput方法,用于计算间隔时间内的吞吐量。

public long calculateThroughput() 
    if (measurementStartTime != NOT_TRACKED) 
        // 获取当前时间
        long absoluteTimeMillis = clock.absoluteTimeMillis();
        
        // 获取计量吞吐量期间时长
        currentMeasurementTime += absoluteTimeMillis - measurementStartTime;
        
        // 设置下一个计量起始时间
        measurementStartTime = absoluteTimeMillis;
    

    // 计算吞吐量,方法参数为这段时间累积的数据量和时长
    long throughput = calculateThroughput(currentAccumulatedDataSize, currentMeasurementTime);

    // 变量重置
    currentAccumulatedDataSize = currentMeasurementTime = 0;

    return throughput;


public long calculateThroughput(long dataSize, long time) 
    checkArgument(dataSize >= 0, "Size of data should be non negative");
    checkArgument(time >= 0, "Time should be non negative");

    if (time == 0) 
        return currentThroughput;
    

    return currentThroughput = instantThroughput(dataSize, time);


// 计算每秒的数据量
static long instantThroughput(long dataSize, long time) 
    return (long) ((double) dataSize / time * MILLIS_IN_SECOND);

我们回到SingleInputGate的getBuffersInUseCount方法,它统计各个channel已使用的buffer数量总和。

int getBuffersInUseCount() 
    int total = 0;
    for (InputChannel channel : channels) 
        total += channel.getBuffersInUseCount();
    
    return total;

接下来该分析BufferDebloater的recalculateBufferSize方法。它使用指数滑动平均(EMA)算法,推算出一个更为平滑过度的buffer值。它还能够对比新旧buffer size值的变化率,如果变化率过小,不修改buffer size,从而避免了来回拉锯式频繁修改buffer大小造成性能剧烈抖动。

public OptionalInt recalculateBufferSize(long currentThroughput, int buffersInUse) 
    // 当前实际buffer使用量
    int actualBuffersInUse = Math.max(1, buffersInUse);
    // 计算期待的buffer大小,计算公式为:
    // 当前吞吐量 x buffer数据被全部消费的期望等待时间(taskmanager.network.memory.buffer-debloat.target)
    long desiredTotalBufferSizeInBytes =
            (currentThroughput * targetTotalBufferSize) / MILLIS_IN_SECOND;

    // 使用指数滑动平均算法(Exponential moving average),计算新的buffer大小
    int newSize =
            bufferSizeEMA.calculateBufferSize(
                    desiredTotalBufferSizeInBytes, actualBuffersInUse);

    // 估算buffer数据完全消费的所需时间
    lastEstimatedTimeToConsumeBuffers =
            Duration.ofMillis(
                    newSize
                            * actualBuffersInUse
                            * MILLIS_IN_SECOND
                            / Math.max(1, currentThroughput));

    // 如果新计算出的大小和旧的很接近,不更新buffer大小
    // 旧buffer大小乘以taskmanager.network.memory.buffer-debloat.threshold-percentages计算出变化量
    // 如果newSize和旧buffer大小差异值小于变化量,则不更新buffer大小
    boolean skipUpdate = skipUpdate(newSize);

    LOG.debug(
            "Buffer size recalculation: gateIndex=, currentSize=, newSize=, instantThroughput=, desiredBufferSize=, buffersInUse=, estimatedTimeToConsumeBuffers=, announceNewSize=",
            gateIndex,
            lastBufferSize,
            newSize,
            currentThroughput,
            desiredTotalBufferSizeInBytes,
            buffersInUse,
            lastEstimatedTimeToConsumeBuffers,
            !skipUpdate);

    // Skip update if the new value pretty close to the old one.
    // 如果不需要更新,返回empty
    if (skipUpdate) 
        return OptionalInt.empty();
    

    // 返回新计算的buffer大小
    lastBufferSize = newSize;
    return OptionalInt.of(newSize);

以上是关于FlinkFlink 源码之Buffer Debloating的主要内容,如果未能解决你的问题,请参考以下文章

FlinkFlink 源码之快照

FlinkFlink 源码之时间处理

FlinkFlink 源码之ExecutionGraph

FlinkFlink 源码之RPC调用

FlinkFlink 源码之AsyncFunction异步 IO 源码

FlinkFlink 源码之OperatorChain