flinkFlink 1.12.2 源码浅析 : Task数据输入

Posted 九师兄


篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了flinkFlink 1.12.2 源码浅析 : Task数据输入相关的知识,希望对你有一定的参考价值。


转载:Flink 1.12.2 源码浅析 : Task数据输入

在 Task 中,InputGate 是对输入的封装,InputGate 是和 JobGraph 中 JobEdge 一一对应的。
也就是说,InputGate 实际上对应的是该 Task 依赖的上游算子(包含多个并行子任务),每个 InputGate 消费了一个或多个 ResultPartition。
InputGate 由 InputChannel 构成,InputChannel 和 ExecutionGraph 中的 ExecutionEdge 一一对应;也就是说, InputChannel 和 ResultSubpartition 一一相连,一个 InputChannel 接收一个 ResultSubpartition 的输出。根据读取的 ResultSubpartition 的位置,InputChannel 有 LocalInputChannel 和 RemoteInputChannel 两种不同的实现。

二 .InputGate

Task 的输入被抽象为 InputGate, 而 InputGate 则由 InputChannel 组成, InputChannel 和该 Task 需要消费的 ResultSubpartition 是一一对应的。

InputGate 消费 单个生成的中间结果的一个或多个分区。

例如,考虑一个map reduce程序,其中map操作符生成数据,reduce操作符使用生成的数据。

 * InputGate 消费 单个生成的中间结果的一个或多个分区。
 * 每个中间结果在其产生的并行子任务上被划分;
 * 这些分区中的每一个都被进一步划分为一个或多个子分区。
 * 例如,考虑一个map reduce程序,其中map操作符生成数据,reduce操作符使用生成的数据。
 * <pre>{@code
 * +-----+              +---------------------+              +--------+
 * | Map | = produce => | Intermediate Result | <= consume = | Reduce |
 * +-----+              +---------------------+              +--------+
 * }</pre>
 * */


 * <pre>{@code
 *                            Intermediate result
 *               +-----------------------------------------+
 *               |                      +----------------+ |              +-----------------------+
 * +-------+     | +-------------+  +=> | Subpartition 1 | | <=======+=== | Input Gate | Reduce 1 |
 * | Map 1 | ==> | | Partition 1 | =|   +----------------+ |         |    +-----------------------+
 * +-------+     | +-------------+  +=> | Subpartition 2 | | <==+    |
 *               |                      +----------------+ |    |    | Subpartition request
 *               |                                         |    |    |
 *               |                      +----------------+ |    |    |
 * +-------+     | +-------------+  +=> | Subpartition 1 | | <==+====+
 * | Map 2 | ==> | | Partition 2 | =|   +----------------+ |    |         +-----------------------+
 * +-------+     | +-------------+  +=> | Subpartition 2 | | <==+======== | Input Gate | Reduce 2 |
 *               |                      +----------------+ |              +-----------------------+
 *               +-----------------------------------------+
 * }</pre>

每个分区进一步划分为两个子分区 —— 每个并行reduce子任务一个子分区。

2.1. InputGate实现的接口

InputGate是一个抽象类, 它实现了 PullingAsyncDataInput, AutoCloseable, ChannelStateHolder 这三个 接口

2.1.1. PullingAsyncDataInput


为了获得最有效的使用,这个类的用户应该调用{@link #pollNext()},直到它返回不再有可用的元素为止。

如果发生这种情况,他应该检查输入{@link #isFinished()}。

如果没有,他应该等待{@link #getAvailableFuture()}{@link CompletableFuture}完成。例如:

 * <pre>{@code
 * AsyncDataInput<T> input = ...;
 * while (!input.isFinished()) {
 * 	Optional<T> next;
 * 	while (true) {
 * 		next = input.pollNext();
 * 		if (!next.isPresent()) {
 * 			break;
 * 		}
 * 		// do something with next
 * 	}
 * 	input.getAvailableFuture().get();
 * }
 */ }</pre>

PullingAsyncDataInput 就两个方法

Optional pollNext() throws Exception;获取下一个 元素, 该方法应该是非阻塞的
boolean isFinished();是否完成

2.1.2. ChannelStateHolder

由持有任何类型通道状态并需要引用{@link ChannelStateWriter}的实体实现。

仅仅有一个方法, 并且只能调用一次…

/** Injects the {@link ChannelStateWriter}. Must only be called once. */
    void setChannelStateWriter(ChannelStateWriter channelStateWriter);

2.1.3. AutoCloseable

只有一个 void close() throws Exception; 方法…

2.2. 方法清单

setChannelStateWriter(ChannelStateWriter channelStateWriter)设置ChannelStateWriter(
abstract int getNumberOfInputChannels是否完成
abstract Optional getNext()正在阻塞等待下一个{@link BufferOrEvent}的调用。( 在得到下一个缓冲区之前,应该保证上一个返回的缓冲区已经被回收。)
abstract Optional pollNext()非阻塞 , 轮询{@link BufferOrEvent}。
abstract void sendTaskEvent(TaskEvent event)发送 任务事件
abstract void resumeConsumption(InputChannelInfo channelInfo)请求消费 ResultPartition
abstract InputChannel getChannel(int channelIndex)返回次gate的channel
List getChannelInfos()返回这个gate 的 channel 信息
CompletableFuture<?> getPriorityEventAvailableFuture()当优先级事件已排队时通知。如果从任务线程查询这个future,可以保证优先级事件可用并通过 {@link #getNext()} 检索。
abstract void setup()设置gate,可能很重的重量,阻塞操作相比,只是创建。
abstract void requestPartitions()请求消费 ResultPartition

三 .InputGate 实现

Task 通过循环调用 InputGate.getNextBufferOrEvent 方法获取输入数据,并将获取的数据交给它所封装的算子进行处理,这构成了一个 Task 的基本运行逻辑。

InputGate 有两个具体的实现,分别为 SingleInputGate 和 UnionInputGate, UnionInputGate 有多个 SingleInputGate 联合构成。

3.1. SingleInputGate

3.1.1. IndexedInputGate

SingleInputGate的父类为IndexedInputGate 里面定义了一下checkpoint相关的方法…

abstract int getGateIndex()获取改inputgate的编号
void checkpointStarted(CheckpointBarrier barrier)启动 checkpoint
void checkpointStopped(long cancelledCheckpointId)停止checkpoint
void blockConsumption(InputChannelInfo channelInfo)未使用。网络堆栈通过取消信用卡自动阻止消费。

3.1.2. 属性

 Lock object to guard partition requests and runtime channel updates.
 *  Lock object to guard partition requests and runtime channel updates. */
private final Object requestLock = new Object();

 * 所属任务的名称,用于日志记录。
 * owningTaskName = "Flat Map (2/4)#0 (0ef8b3d70af60be8633af8af4e1c0698)"
 * The name of the owning task, for logging purposes. */
private final String owningTaskName;

private final int gateIndex;

 * The ID of the consumed intermediate result. Each input gate consumes partitions of the
 * intermediate result specified by this ID. This ID also identifies the input gate at the
 * consuming task.
 * 消费上一算子输出结果子分区的 ID
 * {IntermediateDataSetID@8214} "5eba1007ad48ad2243891e1eff29c32b"
private final IntermediateDataSetID consumedResultId;

 * 结果分区的类型 : {ResultPartitionType@7380} "PIPELINED_BOUNDED"
 * The type of the partition the input gate is consuming. */
private final ResultPartitionType consumedPartitionType;

 * 消费子分区的 index
 * The index of the consumed subpartition of each consumed partition. This index depends on the
 * {@link DistributionPattern} and the subtask indices of the producing and consuming task.
private final int consumedSubpartitionIndex;

 * inputchannel的数量
 * The number of input channels (equivalent to the number of consumed partitions). */
private final int numberOfInputChannels;

 * InputGate中所有的 Input channels.
 * 结果分区 --> input channels
 * 每个消耗的中间结果分区都有一个输入通道。
 * 我们将其存储在一个映射中,用于单个通道的运行时更新
 * inputChannels = {HashMap@8215}  size = 1
 *         {IntermediateResultPartitionID@8237} "5eba1007ad48ad2243891e1eff29c32b#0" -> {LocalRecoveredInputChannel@8238}
 * Input channels. There is a one input channel for each consumed intermediate result partition.
 * We store this in a map for runtime updates of single channels.
private final Map<IntermediateResultPartitionID, InputChannel> inputChannels;

 * InputGate中所有的 Input channels.
 *        channels = {InputChannel[1]@8216}
 *              0 = {LocalRecoveredInputChannel@8238}
private final InputChannel[] channels;

 * InputChannel 构成的队列,这些 InputChannel 中都有有可供消费的数据
 * inputChannelsWithData = {PrioritizedDeque@8217} "[]"
 * Channels, which notified this input gate about available data.
 * */
private final PrioritizedDeque<InputChannel> inputChannelsWithData = new PrioritizedDeque<>();

 * 保证inputChannelsWithData队列唯一性的字段。
 * 这两个字段应该统一到一个字段上。
 * enqueuedInputChannelsWithData = {BitSet@8218} "{}"
 * Field guaranteeing uniqueness for inputChannelsWithData queue.
 * Both of those fields should be unified onto one.
private final BitSet enqueuedInputChannelsWithData;

// 无分区事件的通道 ??
private final BitSet channelsWithEndOfPartitionEvents;

// 最后优先级序列号
private int[] lastPrioritySequenceNumber;

/** The partition producer state listener. */
private final PartitionProducerStateProvider partitionProducerStateProvider;

 * 内存管理器: LocalBufferPool
 * {LocalBufferPool@8221} "[size: 8, required: 1, requested: 1, available: 1, max: 8, listeners: 0,subpartitions: 0, maxBuffersPerChannel: 2147483647, destroyed: false]"
 * Buffer pool for incoming buffers. Incoming data from remote channels is copied to buffers
 * from this pool.
private BufferPool bufferPool;

private boolean hasReceivedAllEndOfPartitionEvents;

 * 指示是否已请求分区的标志
 * Flag indicating whether partitions have been requested. */
private boolean requestedPartitionsFlag;

 * 阻塞的Evnet
private final List<TaskEvent> pendingEvents = new ArrayList<>();

private int numberOfUninitializedChannels;

 * 重新触发本地分区请求的计时器。仅在实际需要时初始化。
 * A timer to retrigger local partition requests. Only initialized if actually needed. */
private Timer retriggerLocalRequestTimer;

// bufferpoolFactory的工厂类
// {SingleInputGateFactory$lambda@8223}
private final SupplierWithException<BufferPool, IOException> bufferPoolFactory;

private final CompletableFuture<Void> closeFuture;

@Nullable private final BufferDecompressor bufferDecompressor;

// {NetworkBufferPool@7512}
private final MemorySegmentProvider memorySegmentProvider;

 *  {HybridMemorySegment@8225}
 * The segment to read data from file region of bounded blocking partition by local input
 * channel.
private final MemorySegment unpooledSegment;

3.1.3. setup

在InputGate的setup阶段为所有的input channel分配专属内存。查看SingleInputGate的setup方法

其实就是分配LocalBufferPool@8221 , 同一个InputGate里的所有inputchannel共用一个LocalBufferPool@8221 .

    public void setup() throws IOException {
                this.bufferPool == null,
                "Bug in input gate setup logic: Already registered buffer pool.");

        // 为所有的InputChannel分配专用buffer,剩下的作为浮动buffer

        // 设置bufferPool,用于分配浮动buffer
        BufferPool bufferPool = bufferPoolFactory.get();

        // 请求各个input channel需要读取的subpartition

	    /** Assign the exclusive buffers to all remote input channels directly for credit-based mode. */
    public void setupChannels() throws IOException {
        synchronized (requestLock) {
            for (InputChannel inputChannel : inputChannels.values()) {
                // 分别调用SingleInputGate中每个InputChannel的setup方法。

3.1.4. requestPartitions

    public void requestPartitions() {
        synchronized (requestLock) {

            // 只能请求一次partition,第一次调用该方法后此flag会被设置为true
            if (!requestedPartitionsFlag) {
                if (closeFuture.isDone()) {
                    throw new IllegalStateException("Already released.");

                // Sanity checks
                if (numberOfInputChannels != inputChannels.size()) {
                    throw new IllegalStateException(
                                    "Bug in input gate setup logic: mismatch between "
                                            + "number of total input channels [%s] and the currently set number of input "
                                            + "channels [%s].",
                                    inputChannels.size(), numberOfInputChannels));


                // 请求分区数据

            // 方法调用完毕设置flag为true,防止重复调用
            requestedPartitionsFlag = true;

  // 请求数据 ???
    private void internalRequestPartitions() {
        for (InputChannel inputChannel : inputChannels.values()) {
            try {
            } catch (Throwable t) {

3.1.5. getChannel(int channelIndex)

根据channelIndex获取指定的 InputChannel

public InputChannel getChannel(int channelIndex) {
    return channels[channelIndex];

3.1.6. updateInputChannel

根据 是否是本地操作,将unknownChannel转化为LocalInputChannel 或者 RemoteInputChannel.

public void updateInputChannel(
            ResourceID localLocation, NettyShuffleDescriptor shuffleDescriptor)
            throws IOException, InterruptedException {
        synchronized (requestLock) {
            if (closeFuture.isDone()) {
                // There was a race with a task failure/cancel

            IntermediateResultPartitionID partitionId =

            InputChannel current = inputChannels.get(partitionId);

            // 该InputChannel尚未明确...
            if (current instanceof UnknownInputChannel) {
                UnknownInputChannel unknownChannel = (UnknownInputChannel) current;
                boolean isLocal = shuffleDescriptor.isLocalTo(localLocation);
                InputChannel newChannel;
                if (isLocal) {
                    // LocalInputChannel
                    newChannel = unknownChannel.toLocalInputChannel();
                } else {
                    // RemoteInputChannel
                    RemoteInputChannel remoteInputChannel =
                    newChannel = remoteInputChannel;
                LOG.debug("{}: Updated unknown input channel to {}.", owningTaskName, newChannel);

                inputChannels.put(partitionId, newChannel);
                channels[current.getChannelIndex()] = newChannel;

                if (requestedPartitionsFlag) {

                for (TaskEvent event : pendingEvents) {

                if (--numberOfUninitializedChannels == 0) {

3.1.7. retriggerPartitionRequest

重新触发分区请求。 其实就是触发对应的inputchannel的retriggerSubpartitionRequest

/** Retriggers a partition request. */
public void retriggerPartitionRequest(IntermediateResultPartitionID partitionId)
        throws IOException {
    synchronized (requestLock) {
        if (!closeFuture.isDone()) {
            final InputChannel ch = inputChannels.get(partitionId);

            checkNotNull(ch, "Unknown input channel with ID " + partitionId);

                    "{}: Retriggering partition request {}:{}.",

            if (ch.getClass() == RemoteInputChannel.class) {

                // RemoteInputChannel
                final RemoteInputChannel rch = (RemoteInputChannel) ch;
            } else if (ch.getClass() == LocalInputChannel.class) {

                // RemoteInputChannel
                final LocalInputChannel ich = (LocalInputChannel) ch;

                if (retriggerLocalRequestTimer == null) {
                    retriggerLocalRequestTimer = new Timer(true);

                        retriggerLocalRequestTimer, consumedSubpartitionIndex);
            } else {
                throw new IllegalStateException(
                        "Unexpected type of channel to retrigger partition: " + ch.getClass());

3.1.8. close

关闭操作. 释放每一个InputChannel中的资源, 延迟释放LocalBufferPool , 如果释放成功通知所有…

public void close() throws IOException {
    boolean released = false;
    synchronized (requestLock) {
        if (!closeFuture.isDone()) {
            try {
                LOG.debug("{}: Releasing {}.", owningTaskName, 

以上是关于flinkFlink 1.12.2 源码浅析 : Task数据输入的主要内容,如果未能解决你的问题,请参考以下文章

flinkFlink 1.12.2 源码浅析 : Task 浅析

flinkFlink 1.12.2 源码浅析 : Task数据输入

flinkFlink 1.12.2 源码浅析 :Task数据输出

flinkFlink 1.12.2 源码浅析 : yarn-per-job模式解析 TaskMasger 启动

flinkFlink 1.12.2 源码浅析 : yarn-per-job模式解析 yarn 提交过程解析

flinkFlink 1.12.2 源码浅析 : yarn-per-job模式解析 JobMasger启动 YarnJobClusterEntrypoint