flinkFlink 1.12.2 源码浅析 : Task数据输入

Posted 九师兄

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了flinkFlink 1.12.2 源码浅析 : Task数据输入相关的知识,希望对你有一定的参考价值。

1.概述

转载:Flink 1.12.2 源码浅析 : Task数据输入


在 Task 中,InputGate 是对输入的封装,InputGate 是和 JobGraph 中 JobEdge 一一对应的。
也就是说,InputGate 实际上对应的是该 Task 依赖的上游算子(包含多个并行子任务),每个 InputGate 消费了一个或多个 ResultPartition。
InputGate 由 InputChannel 构成,InputChannel 和 ExecutionGraph 中的 ExecutionEdge 一一对应;也就是说, InputChannel 和 ResultSubpartition 一一相连,一个 InputChannel 接收一个 ResultSubpartition 的输出。根据读取的 ResultSubpartition 的位置,InputChannel 有 LocalInputChannel 和 RemoteInputChannel 两种不同的实现。

二 .InputGate

Task 的输入被抽象为 InputGate, 而 InputGate 则由 InputChannel 组成, InputChannel 和该 Task 需要消费的 ResultSubpartition 是一一对应的。

InputGate 消费 单个生成的中间结果的一个或多个分区。
每个中间结果在其产生的并行子任务上被划分;
这些分区中的每一个都被进一步划分为一个或多个子分区。

例如,考虑一个map reduce程序,其中map操作符生成数据,reduce操作符使用生成的数据。

/**
 * InputGate 消费 单个生成的中间结果的一个或多个分区。
 *
 * 每个中间结果在其产生的并行子任务上被划分;
 *
 * 这些分区中的每一个都被进一步划分为一个或多个子分区。
 *
 * 例如,考虑一个map reduce程序,其中map操作符生成数据,reduce操作符使用生成的数据。
 *
 * <pre>{@code
 * +-----+              +---------------------+              +--------+
 * | Map | = produce => | Intermediate Result | <= consume = | Reduce |
 * +-----+              +---------------------+              +--------+
 * }</pre>
 *
 * 
 * 
 * */

当并行部署这样一个程序时,中间结果将被划分到它产生的并行子任务上;
这些分区中的每一个都被进一步划分为一个或多个子分区。

 * <pre>{@code
 *                            Intermediate result
 *               +-----------------------------------------+
 *               |                      +----------------+ |              +-----------------------+
 * +-------+     | +-------------+  +=> | Subpartition 1 | | <=======+=== | Input Gate | Reduce 1 |
 * | Map 1 | ==> | | Partition 1 | =|   +----------------+ |         |    +-----------------------+
 * +-------+     | +-------------+  +=> | Subpartition 2 | | <==+    |
 *               |                      +----------------+ |    |    | Subpartition request
 *               |                                         |    |    |
 *               |                      +----------------+ |    |    |
 * +-------+     | +-------------+  +=> | Subpartition 1 | | <==+====+
 * | Map 2 | ==> | | Partition 2 | =|   +----------------+ |    |         +-----------------------+
 * +-------+     | +-------------+  +=> | Subpartition 2 | | <==+======== | Input Gate | Reduce 2 |
 *               |                      +----------------+ |              +-----------------------+
 *               +-----------------------------------------+
 * }</pre>
 *
 */

在上述示例中,两个map子任务并行生成中间结果,从而产生两个分区(分区1和分区2)。
每个分区进一步划分为两个子分区 —— 每个并行reduce子任务一个子分区。
如图所示,每个reduce任务都有一个连接到它的输入gate。
这将提供它的输入,它将由中间结果的每个分区的一个子分区组成。

2.1. InputGate实现的接口

InputGate是一个抽象类, 它实现了 PullingAsyncDataInput, AutoCloseable, ChannelStateHolder 这三个 接口
调重点看下

2.1.1. PullingAsyncDataInput

接口定义了异步和非阻塞数据轮询的两种基本方法。

为了获得最有效的使用,这个类的用户应该调用{@link #pollNext()},直到它返回不再有可用的元素为止。

如果发生这种情况,他应该检查输入{@link #isFinished()}。

如果没有,他应该等待{@link #getAvailableFuture()}{@link CompletableFuture}完成。例如:

 /**
 * <pre>{@code
 * AsyncDataInput<T> input = ...;
 * while (!input.isFinished()) {
 * 	Optional<T> next;
 *
 * 	while (true) {
 * 		next = input.pollNext();
 * 		if (!next.isPresent()) {
 * 			break;
 * 		}
 * 		// do something with next
 * 	}
 *
 * 	input.getAvailableFuture().get();
 * }
 */ }</pre>

方法
PullingAsyncDataInput 就两个方法

名称描述
Optional pollNext() throws Exception;获取下一个 元素, 该方法应该是非阻塞的
boolean isFinished();是否完成

2.1.2. ChannelStateHolder

由持有任何类型通道状态并需要引用{@link ChannelStateWriter}的实体实现。

仅仅有一个方法, 并且只能调用一次…

/** Injects the {@link ChannelStateWriter}. Must only be called once. */
    void setChannelStateWriter(ChannelStateWriter channelStateWriter);

2.1.3. AutoCloseable

只有一个 void close() throws Exception; 方法…

2.2. 方法清单

名称描述
setChannelStateWriter(ChannelStateWriter channelStateWriter)设置ChannelStateWriter(
abstract int getNumberOfInputChannels是否完成
abstract Optional getNext()正在阻塞等待下一个{@link BufferOrEvent}的调用。( 在得到下一个缓冲区之前,应该保证上一个返回的缓冲区已经被回收。)
abstract Optional pollNext()非阻塞 , 轮询{@link BufferOrEvent}。
abstract void sendTaskEvent(TaskEvent event)发送 任务事件
abstract void resumeConsumption(InputChannelInfo channelInfo)请求消费 ResultPartition
abstract InputChannel getChannel(int channelIndex)返回次gate的channel
List getChannelInfos()返回这个gate 的 channel 信息
CompletableFuture<?> getPriorityEventAvailableFuture()当优先级事件已排队时通知。如果从任务线程查询这个future,可以保证优先级事件可用并通过 {@link #getNext()} 检索。
abstract void setup()设置gate,可能很重的重量,阻塞操作相比,只是创建。
abstract void requestPartitions()请求消费 ResultPartition

三 .InputGate 实现

Task 通过循环调用 InputGate.getNextBufferOrEvent 方法获取输入数据,并将获取的数据交给它所封装的算子进行处理,这构成了一个 Task 的基本运行逻辑。

InputGate 有两个具体的实现,分别为 SingleInputGate 和 UnionInputGate, UnionInputGate 有多个 SingleInputGate 联合构成。

3.1. SingleInputGate

3.1.1. IndexedInputGate

SingleInputGate的父类为IndexedInputGate 里面定义了一下checkpoint相关的方法…

名称描述
abstract int getGateIndex()获取改inputgate的编号
void checkpointStarted(CheckpointBarrier barrier)启动 checkpoint
void checkpointStopped(long cancelledCheckpointId)停止checkpoint
getInputGateIndex()获取改inputgate的编号
void blockConsumption(InputChannelInfo channelInfo)未使用。网络堆栈通过取消信用卡自动阻止消费。

3.1.2. 属性

/**
 *
 Lock object to guard partition requests and runtime channel updates.
 锁定对象以保护分区请求和运行时通道更新。
 *  Lock object to guard partition requests and runtime channel updates. */
private final Object requestLock = new Object();

/**
 * 所属任务的名称,用于日志记录。
 * owningTaskName = "Flat Map (2/4)#0 (0ef8b3d70af60be8633af8af4e1c0698)"
 * The name of the owning task, for logging purposes. */
private final String owningTaskName;

private final int gateIndex;

/**
 * The ID of the consumed intermediate result. Each input gate consumes partitions of the
 * intermediate result specified by this ID. This ID also identifies the input gate at the
 * consuming task.
 *
 * 消费上一算子输出结果子分区的 ID
 * {IntermediateDataSetID@8214} "5eba1007ad48ad2243891e1eff29c32b"
 *
 */
private final IntermediateDataSetID consumedResultId;

/**
 * 结果分区的类型 : {ResultPartitionType@7380} "PIPELINED_BOUNDED"
 * The type of the partition the input gate is consuming. */
private final ResultPartitionType consumedPartitionType;

/**
 * 消费子分区的 index
 * The index of the consumed subpartition of each consumed partition. This index depends on the
 * {@link DistributionPattern} and the subtask indices of the producing and consuming task.
 */
private final int consumedSubpartitionIndex;

/**
 * inputchannel的数量
 * The number of input channels (equivalent to the number of consumed partitions). */
private final int numberOfInputChannels;

/**
 * InputGate中所有的 Input channels.
 * 结果分区 --> input channels
 * 每个消耗的中间结果分区都有一个输入通道。
 * 我们将其存储在一个映射中,用于单个通道的运行时更新
 * inputChannels = {HashMap@8215}  size = 1
 *         {IntermediateResultPartitionID@8237} "5eba1007ad48ad2243891e1eff29c32b#0" -> {LocalRecoveredInputChannel@8238}
 *
 *
 *
 * Input channels. There is a one input channel for each consumed intermediate result partition.
 * We store this in a map for runtime updates of single channels.
 */
private final Map<IntermediateResultPartitionID, InputChannel> inputChannels;

/**
 * InputGate中所有的 Input channels.
 *        channels = {InputChannel[1]@8216}
 *              0 = {LocalRecoveredInputChannel@8238}
 */
@GuardedBy("requestLock")
private final InputChannel[] channels;

/**
 * InputChannel 构成的队列,这些 InputChannel 中都有有可供消费的数据
 * inputChannelsWithData = {PrioritizedDeque@8217} "[]"
 * Channels, which notified this input gate about available data.
 * */
private final PrioritizedDeque<InputChannel> inputChannelsWithData = new PrioritizedDeque<>();

/**
 * 保证inputChannelsWithData队列唯一性的字段。
 *
 * 这两个字段应该统一到一个字段上。
 *
 * enqueuedInputChannelsWithData = {BitSet@8218} "{}"
 *
 * Field guaranteeing uniqueness for inputChannelsWithData queue.
 * Both of those fields should be unified onto one.
 */
@GuardedBy("inputChannelsWithData")
private final BitSet enqueuedInputChannelsWithData;

// 无分区事件的通道 ??
private final BitSet channelsWithEndOfPartitionEvents;

// 最后优先级序列号
@GuardedBy("inputChannelsWithData")
private int[] lastPrioritySequenceNumber;

/** The partition producer state listener. */
private final PartitionProducerStateProvider partitionProducerStateProvider;

/**
 * 内存管理器: LocalBufferPool
 * {LocalBufferPool@8221} "[size: 8, required: 1, requested: 1, available: 1, max: 8, listeners: 0,subpartitions: 0, maxBuffersPerChannel: 2147483647, destroyed: false]"
 *
 * Buffer pool for incoming buffers. Incoming data from remote channels is copied to buffers
 * from this pool.
 */
private BufferPool bufferPool;

private boolean hasReceivedAllEndOfPartitionEvents;

/**
 * 指示是否已请求分区的标志
 * Flag indicating whether partitions have been requested. */
private boolean requestedPartitionsFlag;

/**
 * 阻塞的Evnet
 */
private final List<TaskEvent> pendingEvents = new ArrayList<>();

//未初始化通道数
private int numberOfUninitializedChannels;

/**
 * 重新触发本地分区请求的计时器。仅在实际需要时初始化。
 * A timer to retrigger local partition requests. Only initialized if actually needed. */
private Timer retriggerLocalRequestTimer;

// bufferpoolFactory的工厂类
// {SingleInputGateFactory$lambda@8223}
private final SupplierWithException<BufferPool, IOException> bufferPoolFactory;

private final CompletableFuture<Void> closeFuture;

@Nullable private final BufferDecompressor bufferDecompressor;

// {NetworkBufferPool@7512}
private final MemorySegmentProvider memorySegmentProvider;

/**
 *  {HybridMemorySegment@8225}
 *  
 * The segment to read data from file region of bounded blocking partition by local input
 * channel.
 */
private final MemorySegment unpooledSegment;

3.1.3. setup

在InputGate的setup阶段为所有的input channel分配专属内存。查看SingleInputGate的setup方法

其实就是分配LocalBufferPool@8221 , 同一个InputGate里的所有inputchannel共用一个LocalBufferPool@8221 .

@Override
    public void setup() throws IOException {
        checkState(
                this.bufferPool == null,
                "Bug in input gate setup logic: Already registered buffer pool.");

        // 为所有的InputChannel分配专用buffer,剩下的作为浮动buffer
        setupChannels();

        // 设置bufferPool,用于分配浮动buffer
        BufferPool bufferPool = bufferPoolFactory.get();

        // 请求各个input channel需要读取的subpartition
        setBufferPool(bufferPool);
    }

	    /** Assign the exclusive buffers to all remote input channels directly for credit-based mode. */
    @VisibleForTesting
    public void setupChannels() throws IOException {
        synchronized (requestLock) {
            for (InputChannel inputChannel : inputChannels.values()) {
                // 分别调用SingleInputGate中每个InputChannel的setup方法。
                inputChannel.setup();
            }
        }
    }

3.1.4. requestPartitions

   //请求分区
    @Override
    public void requestPartitions() {
        synchronized (requestLock) {

            // 只能请求一次partition,第一次调用该方法后此flag会被设置为true
            if (!requestedPartitionsFlag) {
                if (closeFuture.isDone()) {
                    throw new IllegalStateException("Already released.");
                }

                // Sanity checks
                if (numberOfInputChannels != inputChannels.size()) {
                    throw new IllegalStateException(
                            String.format(
                                    "Bug in input gate setup logic: mismatch between "
                                            + "number of total input channels [%s] and the currently set number of input "
                                            + "channels [%s].",
                                    inputChannels.size(), numberOfInputChannels));
                }

                convertRecoveredInputChannels();

                // 请求分区数据
                internalRequestPartitions();
            }

            // 方法调用完毕设置flag为true,防止重复调用
            requestedPartitionsFlag = true;
        }
    }

  // 请求数据 ???
    private void internalRequestPartitions() {
        for (InputChannel inputChannel : inputChannels.values()) {
            try {
                //每一个channel都请求对应的子分区
                inputChannel.requestSubpartition(consumedSubpartitionIndex);
            } catch (Throwable t) {
                inputChannel.setError(t);
                return;
            }
        }
    }

3.1.5. getChannel(int channelIndex)

根据channelIndex获取指定的 InputChannel

@Override
public InputChannel getChannel(int channelIndex) {
    return channels[channelIndex];
}

3.1.6. updateInputChannel

根据 是否是本地操作,将unknownChannel转化为LocalInputChannel 或者 RemoteInputChannel.

public void updateInputChannel(
            ResourceID localLocation, NettyShuffleDescriptor shuffleDescriptor)
            throws IOException, InterruptedException {
        synchronized (requestLock) {
            if (closeFuture.isDone()) {
                // There was a race with a task failure/cancel
                return;
            }

            IntermediateResultPartitionID partitionId =
                    shuffleDescriptor.getResultPartitionID().getPartitionId();

            InputChannel current = inputChannels.get(partitionId);

            // 该InputChannel尚未明确...
            if (current instanceof UnknownInputChannel) {
                UnknownInputChannel unknownChannel = (UnknownInputChannel) current;
                boolean isLocal = shuffleDescriptor.isLocalTo(localLocation);
                InputChannel newChannel;
                if (isLocal) {
                    // LocalInputChannel
                    newChannel = unknownChannel.toLocalInputChannel();
                } else {
                    // RemoteInputChannel
                    RemoteInputChannel remoteInputChannel =
                            unknownChannel.toRemoteInputChannel(
                                    shuffleDescriptor.getConnectionId());
                    remoteInputChannel.setup();
                    newChannel = remoteInputChannel;
                }
                LOG.debug("{}: Updated unknown input channel to {}.", owningTaskName, newChannel);

                inputChannels.put(partitionId, newChannel);
                channels[current.getChannelIndex()] = newChannel;

                if (requestedPartitionsFlag) {
                    newChannel.requestSubpartition(consumedSubpartitionIndex);
                }

                for (TaskEvent event : pendingEvents) {
                    newChannel.sendTaskEvent(event);
                }

                if (--numberOfUninitializedChannels == 0) {
                    pendingEvents.clear();
                }
            }
        }
    }

3.1.7. retriggerPartitionRequest

重新触发分区请求。 其实就是触发对应的inputchannel的retriggerSubpartitionRequest

/** Retriggers a partition request. */
public void retriggerPartitionRequest(IntermediateResultPartitionID partitionId)
        throws IOException {
    synchronized (requestLock) {
        if (!closeFuture.isDone()) {
            final InputChannel ch = inputChannels.get(partitionId);

            checkNotNull(ch, "Unknown input channel with ID " + partitionId);

            LOG.debug(
                    "{}: Retriggering partition request {}:{}.",
                    owningTaskName,
                    ch.partitionId,
                    consumedSubpartitionIndex);

            if (ch.getClass() == RemoteInputChannel.class) {

                // RemoteInputChannel
                final RemoteInputChannel rch = (RemoteInputChannel) ch;
                rch.retriggerSubpartitionRequest(consumedSubpartitionIndex);
            } else if (ch.getClass() == LocalInputChannel.class) {

                // RemoteInputChannel
                final LocalInputChannel ich = (LocalInputChannel) ch;

                if (retriggerLocalRequestTimer == null) {
                    retriggerLocalRequestTimer = new Timer(true);
                }

                ich.retriggerSubpartitionRequest(
                        retriggerLocalRequestTimer, consumedSubpartitionIndex);
            } else {
                throw new IllegalStateException(
                        "Unexpected type of channel to retrigger partition: " + ch.getClass());
            }
        }
    }
}

3.1.8. close

关闭操作. 释放每一个InputChannel中的资源, 延迟释放LocalBufferPool , 如果释放成功通知所有…

@Override
public void close() throws IOException {
    boolean released = false;
    synchronized (requestLock) {
        if (!closeFuture.isDone()) {
            try {
                LOG.debug("{}: Releasing {}.", owningTaskName, 

以上是关于flinkFlink 1.12.2 源码浅析 : Task数据输入的主要内容,如果未能解决你的问题,请参考以下文章

flinkFlink 1.12.2 源码浅析 : Task 浅析

flinkFlink 1.12.2 源码浅析 : Task数据输入

flinkFlink 1.12.2 源码浅析 :Task数据输出

flinkFlink 1.12.2 源码浅析 : yarn-per-job模式解析 TaskMasger 启动

flinkFlink 1.12.2 源码浅析 : yarn-per-job模式解析 yarn 提交过程解析

flinkFlink 1.12.2 源码浅析 : yarn-per-job模式解析 JobMasger启动 YarnJobClusterEntrypoint