flinkFlink 1.12.2 源码浅析 : Task数据输入
Posted 九师兄
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了flinkFlink 1.12.2 源码浅析 : Task数据输入相关的知识,希望对你有一定的参考价值。
1.概述
转载:Flink 1.12.2 源码浅析 : Task数据输入
在 Task 中,InputGate 是对输入的封装,InputGate 是和 JobGraph 中 JobEdge 一一对应的。
也就是说,InputGate 实际上对应的是该 Task 依赖的上游算子(包含多个并行子任务),每个 InputGate 消费了一个或多个 ResultPartition。
InputGate 由 InputChannel 构成,InputChannel 和 ExecutionGraph 中的 ExecutionEdge 一一对应;也就是说, InputChannel 和 ResultSubpartition 一一相连,一个 InputChannel 接收一个 ResultSubpartition 的输出。根据读取的 ResultSubpartition 的位置,InputChannel 有 LocalInputChannel 和 RemoteInputChannel 两种不同的实现。
二 .InputGate
Task 的输入被抽象为 InputGate, 而 InputGate 则由 InputChannel 组成, InputChannel 和该 Task 需要消费的 ResultSubpartition 是一一对应的。
InputGate 消费 单个生成的中间结果的一个或多个分区。
每个中间结果在其产生的并行子任务上被划分;
这些分区中的每一个都被进一步划分为一个或多个子分区。
例如,考虑一个map reduce程序,其中map操作符生成数据,reduce操作符使用生成的数据。
/**
* InputGate 消费 单个生成的中间结果的一个或多个分区。
*
* 每个中间结果在其产生的并行子任务上被划分;
*
* 这些分区中的每一个都被进一步划分为一个或多个子分区。
*
* 例如,考虑一个map reduce程序,其中map操作符生成数据,reduce操作符使用生成的数据。
*
* <pre>{@code
* +-----+ +---------------------+ +--------+
* | Map | = produce => | Intermediate Result | <= consume = | Reduce |
* +-----+ +---------------------+ +--------+
* }</pre>
*
*
*
* */
当并行部署这样一个程序时,中间结果将被划分到它产生的并行子任务上;
这些分区中的每一个都被进一步划分为一个或多个子分区。
* <pre>{@code
* Intermediate result
* +-----------------------------------------+
* | +----------------+ | +-----------------------+
* +-------+ | +-------------+ +=> | Subpartition 1 | | <=======+=== | Input Gate | Reduce 1 |
* | Map 1 | ==> | | Partition 1 | =| +----------------+ | | +-----------------------+
* +-------+ | +-------------+ +=> | Subpartition 2 | | <==+ |
* | +----------------+ | | | Subpartition request
* | | | |
* | +----------------+ | | |
* +-------+ | +-------------+ +=> | Subpartition 1 | | <==+====+
* | Map 2 | ==> | | Partition 2 | =| +----------------+ | | +-----------------------+
* +-------+ | +-------------+ +=> | Subpartition 2 | | <==+======== | Input Gate | Reduce 2 |
* | +----------------+ | +-----------------------+
* +-----------------------------------------+
* }</pre>
*
*/
在上述示例中,两个map子任务并行生成中间结果,从而产生两个分区(分区1和分区2)。
每个分区进一步划分为两个子分区 —— 每个并行reduce子任务一个子分区。
如图所示,每个reduce任务都有一个连接到它的输入gate。
这将提供它的输入,它将由中间结果的每个分区的一个子分区组成。
2.1. InputGate实现的接口
InputGate是一个抽象类, 它实现了 PullingAsyncDataInput, AutoCloseable, ChannelStateHolder 这三个 接口
调重点看下
2.1.1. PullingAsyncDataInput
接口定义了异步和非阻塞数据轮询的两种基本方法。
为了获得最有效的使用,这个类的用户应该调用{@link #pollNext()},直到它返回不再有可用的元素为止。
如果发生这种情况,他应该检查输入{@link #isFinished()}。
如果没有,他应该等待{@link #getAvailableFuture()}{@link CompletableFuture}完成。例如:
/**
* <pre>{@code
* AsyncDataInput<T> input = ...;
* while (!input.isFinished()) {
* Optional<T> next;
*
* while (true) {
* next = input.pollNext();
* if (!next.isPresent()) {
* break;
* }
* // do something with next
* }
*
* input.getAvailableFuture().get();
* }
*/ }</pre>
方法
PullingAsyncDataInput 就两个方法
名称 | 描述 |
---|---|
Optional pollNext() throws Exception; | 获取下一个 元素, 该方法应该是非阻塞的 |
boolean isFinished(); | 是否完成 |
2.1.2. ChannelStateHolder
由持有任何类型通道状态并需要引用{@link ChannelStateWriter}的实体实现。
仅仅有一个方法, 并且只能调用一次…
/** Injects the {@link ChannelStateWriter}. Must only be called once. */
void setChannelStateWriter(ChannelStateWriter channelStateWriter);
2.1.3. AutoCloseable
只有一个 void close() throws Exception; 方法…
2.2. 方法清单
名称 | 描述 |
---|---|
setChannelStateWriter(ChannelStateWriter channelStateWriter) | 设置ChannelStateWriter( |
abstract int getNumberOfInputChannels | 是否完成 |
abstract Optional getNext() | 正在阻塞等待下一个{@link BufferOrEvent}的调用。( 在得到下一个缓冲区之前,应该保证上一个返回的缓冲区已经被回收。) |
abstract Optional pollNext() | 非阻塞 , 轮询{@link BufferOrEvent}。 |
abstract void sendTaskEvent(TaskEvent event) | 发送 任务事件 |
abstract void resumeConsumption(InputChannelInfo channelInfo) | 请求消费 ResultPartition |
abstract InputChannel getChannel(int channelIndex) | 返回次gate的channel |
List getChannelInfos() | 返回这个gate 的 channel 信息 |
CompletableFuture<?> getPriorityEventAvailableFuture() | 当优先级事件已排队时通知。如果从任务线程查询这个future,可以保证优先级事件可用并通过 {@link #getNext()} 检索。 |
abstract void setup() | 设置gate,可能很重的重量,阻塞操作相比,只是创建。 |
abstract void requestPartitions() | 请求消费 ResultPartition |
三 .InputGate 实现
Task 通过循环调用 InputGate.getNextBufferOrEvent 方法获取输入数据,并将获取的数据交给它所封装的算子进行处理,这构成了一个 Task 的基本运行逻辑。
InputGate 有两个具体的实现,分别为 SingleInputGate 和 UnionInputGate, UnionInputGate 有多个 SingleInputGate 联合构成。
3.1. SingleInputGate
3.1.1. IndexedInputGate
SingleInputGate的父类为IndexedInputGate 里面定义了一下checkpoint相关的方法…
名称 | 描述 |
---|---|
abstract int getGateIndex() | 获取改inputgate的编号 |
void checkpointStarted(CheckpointBarrier barrier) | 启动 checkpoint |
void checkpointStopped(long cancelledCheckpointId) | 停止checkpoint |
getInputGateIndex() | 获取改inputgate的编号 |
void blockConsumption(InputChannelInfo channelInfo) | 未使用。网络堆栈通过取消信用卡自动阻止消费。 |
3.1.2. 属性
/**
*
Lock object to guard partition requests and runtime channel updates.
锁定对象以保护分区请求和运行时通道更新。
* Lock object to guard partition requests and runtime channel updates. */
private final Object requestLock = new Object();
/**
* 所属任务的名称,用于日志记录。
* owningTaskName = "Flat Map (2/4)#0 (0ef8b3d70af60be8633af8af4e1c0698)"
* The name of the owning task, for logging purposes. */
private final String owningTaskName;
private final int gateIndex;
/**
* The ID of the consumed intermediate result. Each input gate consumes partitions of the
* intermediate result specified by this ID. This ID also identifies the input gate at the
* consuming task.
*
* 消费上一算子输出结果子分区的 ID
* {IntermediateDataSetID@8214} "5eba1007ad48ad2243891e1eff29c32b"
*
*/
private final IntermediateDataSetID consumedResultId;
/**
* 结果分区的类型 : {ResultPartitionType@7380} "PIPELINED_BOUNDED"
* The type of the partition the input gate is consuming. */
private final ResultPartitionType consumedPartitionType;
/**
* 消费子分区的 index
* The index of the consumed subpartition of each consumed partition. This index depends on the
* {@link DistributionPattern} and the subtask indices of the producing and consuming task.
*/
private final int consumedSubpartitionIndex;
/**
* inputchannel的数量
* The number of input channels (equivalent to the number of consumed partitions). */
private final int numberOfInputChannels;
/**
* InputGate中所有的 Input channels.
* 结果分区 --> input channels
* 每个消耗的中间结果分区都有一个输入通道。
* 我们将其存储在一个映射中,用于单个通道的运行时更新
* inputChannels = {HashMap@8215} size = 1
* {IntermediateResultPartitionID@8237} "5eba1007ad48ad2243891e1eff29c32b#0" -> {LocalRecoveredInputChannel@8238}
*
*
*
* Input channels. There is a one input channel for each consumed intermediate result partition.
* We store this in a map for runtime updates of single channels.
*/
private final Map<IntermediateResultPartitionID, InputChannel> inputChannels;
/**
* InputGate中所有的 Input channels.
* channels = {InputChannel[1]@8216}
* 0 = {LocalRecoveredInputChannel@8238}
*/
@GuardedBy("requestLock")
private final InputChannel[] channels;
/**
* InputChannel 构成的队列,这些 InputChannel 中都有有可供消费的数据
* inputChannelsWithData = {PrioritizedDeque@8217} "[]"
* Channels, which notified this input gate about available data.
* */
private final PrioritizedDeque<InputChannel> inputChannelsWithData = new PrioritizedDeque<>();
/**
* 保证inputChannelsWithData队列唯一性的字段。
*
* 这两个字段应该统一到一个字段上。
*
* enqueuedInputChannelsWithData = {BitSet@8218} "{}"
*
* Field guaranteeing uniqueness for inputChannelsWithData queue.
* Both of those fields should be unified onto one.
*/
@GuardedBy("inputChannelsWithData")
private final BitSet enqueuedInputChannelsWithData;
// 无分区事件的通道 ??
private final BitSet channelsWithEndOfPartitionEvents;
// 最后优先级序列号
@GuardedBy("inputChannelsWithData")
private int[] lastPrioritySequenceNumber;
/** The partition producer state listener. */
private final PartitionProducerStateProvider partitionProducerStateProvider;
/**
* 内存管理器: LocalBufferPool
* {LocalBufferPool@8221} "[size: 8, required: 1, requested: 1, available: 1, max: 8, listeners: 0,subpartitions: 0, maxBuffersPerChannel: 2147483647, destroyed: false]"
*
* Buffer pool for incoming buffers. Incoming data from remote channels is copied to buffers
* from this pool.
*/
private BufferPool bufferPool;
private boolean hasReceivedAllEndOfPartitionEvents;
/**
* 指示是否已请求分区的标志
* Flag indicating whether partitions have been requested. */
private boolean requestedPartitionsFlag;
/**
* 阻塞的Evnet
*/
private final List<TaskEvent> pendingEvents = new ArrayList<>();
//未初始化通道数
private int numberOfUninitializedChannels;
/**
* 重新触发本地分区请求的计时器。仅在实际需要时初始化。
* A timer to retrigger local partition requests. Only initialized if actually needed. */
private Timer retriggerLocalRequestTimer;
// bufferpoolFactory的工厂类
// {SingleInputGateFactory$lambda@8223}
private final SupplierWithException<BufferPool, IOException> bufferPoolFactory;
private final CompletableFuture<Void> closeFuture;
@Nullable private final BufferDecompressor bufferDecompressor;
// {NetworkBufferPool@7512}
private final MemorySegmentProvider memorySegmentProvider;
/**
* {HybridMemorySegment@8225}
*
* The segment to read data from file region of bounded blocking partition by local input
* channel.
*/
private final MemorySegment unpooledSegment;
3.1.3. setup
在InputGate的setup阶段为所有的input channel分配专属内存。查看SingleInputGate的setup方法
其实就是分配LocalBufferPool@8221 , 同一个InputGate里的所有inputchannel共用一个LocalBufferPool@8221 .
@Override
public void setup() throws IOException {
checkState(
this.bufferPool == null,
"Bug in input gate setup logic: Already registered buffer pool.");
// 为所有的InputChannel分配专用buffer,剩下的作为浮动buffer
setupChannels();
// 设置bufferPool,用于分配浮动buffer
BufferPool bufferPool = bufferPoolFactory.get();
// 请求各个input channel需要读取的subpartition
setBufferPool(bufferPool);
}
/** Assign the exclusive buffers to all remote input channels directly for credit-based mode. */
@VisibleForTesting
public void setupChannels() throws IOException {
synchronized (requestLock) {
for (InputChannel inputChannel : inputChannels.values()) {
// 分别调用SingleInputGate中每个InputChannel的setup方法。
inputChannel.setup();
}
}
}
3.1.4. requestPartitions
//请求分区
@Override
public void requestPartitions() {
synchronized (requestLock) {
// 只能请求一次partition,第一次调用该方法后此flag会被设置为true
if (!requestedPartitionsFlag) {
if (closeFuture.isDone()) {
throw new IllegalStateException("Already released.");
}
// Sanity checks
if (numberOfInputChannels != inputChannels.size()) {
throw new IllegalStateException(
String.format(
"Bug in input gate setup logic: mismatch between "
+ "number of total input channels [%s] and the currently set number of input "
+ "channels [%s].",
inputChannels.size(), numberOfInputChannels));
}
convertRecoveredInputChannels();
// 请求分区数据
internalRequestPartitions();
}
// 方法调用完毕设置flag为true,防止重复调用
requestedPartitionsFlag = true;
}
}
// 请求数据 ???
private void internalRequestPartitions() {
for (InputChannel inputChannel : inputChannels.values()) {
try {
//每一个channel都请求对应的子分区
inputChannel.requestSubpartition(consumedSubpartitionIndex);
} catch (Throwable t) {
inputChannel.setError(t);
return;
}
}
}
3.1.5. getChannel(int channelIndex)
根据channelIndex获取指定的 InputChannel
@Override
public InputChannel getChannel(int channelIndex) {
return channels[channelIndex];
}
3.1.6. updateInputChannel
根据 是否是本地操作,将unknownChannel转化为LocalInputChannel 或者 RemoteInputChannel.
public void updateInputChannel(
ResourceID localLocation, NettyShuffleDescriptor shuffleDescriptor)
throws IOException, InterruptedException {
synchronized (requestLock) {
if (closeFuture.isDone()) {
// There was a race with a task failure/cancel
return;
}
IntermediateResultPartitionID partitionId =
shuffleDescriptor.getResultPartitionID().getPartitionId();
InputChannel current = inputChannels.get(partitionId);
// 该InputChannel尚未明确...
if (current instanceof UnknownInputChannel) {
UnknownInputChannel unknownChannel = (UnknownInputChannel) current;
boolean isLocal = shuffleDescriptor.isLocalTo(localLocation);
InputChannel newChannel;
if (isLocal) {
// LocalInputChannel
newChannel = unknownChannel.toLocalInputChannel();
} else {
// RemoteInputChannel
RemoteInputChannel remoteInputChannel =
unknownChannel.toRemoteInputChannel(
shuffleDescriptor.getConnectionId());
remoteInputChannel.setup();
newChannel = remoteInputChannel;
}
LOG.debug("{}: Updated unknown input channel to {}.", owningTaskName, newChannel);
inputChannels.put(partitionId, newChannel);
channels[current.getChannelIndex()] = newChannel;
if (requestedPartitionsFlag) {
newChannel.requestSubpartition(consumedSubpartitionIndex);
}
for (TaskEvent event : pendingEvents) {
newChannel.sendTaskEvent(event);
}
if (--numberOfUninitializedChannels == 0) {
pendingEvents.clear();
}
}
}
}
3.1.7. retriggerPartitionRequest
重新触发分区请求。 其实就是触发对应的inputchannel的retriggerSubpartitionRequest
/** Retriggers a partition request. */
public void retriggerPartitionRequest(IntermediateResultPartitionID partitionId)
throws IOException {
synchronized (requestLock) {
if (!closeFuture.isDone()) {
final InputChannel ch = inputChannels.get(partitionId);
checkNotNull(ch, "Unknown input channel with ID " + partitionId);
LOG.debug(
"{}: Retriggering partition request {}:{}.",
owningTaskName,
ch.partitionId,
consumedSubpartitionIndex);
if (ch.getClass() == RemoteInputChannel.class) {
// RemoteInputChannel
final RemoteInputChannel rch = (RemoteInputChannel) ch;
rch.retriggerSubpartitionRequest(consumedSubpartitionIndex);
} else if (ch.getClass() == LocalInputChannel.class) {
// RemoteInputChannel
final LocalInputChannel ich = (LocalInputChannel) ch;
if (retriggerLocalRequestTimer == null) {
retriggerLocalRequestTimer = new Timer(true);
}
ich.retriggerSubpartitionRequest(
retriggerLocalRequestTimer, consumedSubpartitionIndex);
} else {
throw new IllegalStateException(
"Unexpected type of channel to retrigger partition: " + ch.getClass());
}
}
}
}
3.1.8. close
关闭操作. 释放每一个InputChannel中的资源, 延迟释放LocalBufferPool , 如果释放成功通知所有…
@Override
public void close() throws IOException {
boolean released = false;
synchronized (requestLock) {
if (!closeFuture.isDone()) {
try {
LOG.debug("{}: Releasing {}.", owningTaskName, 以上是关于flinkFlink 1.12.2 源码浅析 : Task数据输入的主要内容,如果未能解决你的问题,请参考以下文章
flinkFlink 1.12.2 源码浅析 : Task 浅析
flinkFlink 1.12.2 源码浅析 : Task数据输入
flinkFlink 1.12.2 源码浅析 :Task数据输出
flinkFlink 1.12.2 源码浅析 : yarn-per-job模式解析 TaskMasger 启动
flinkFlink 1.12.2 源码浅析 : yarn-per-job模式解析 yarn 提交过程解析
flinkFlink 1.12.2 源码浅析 : yarn-per-job模式解析 JobMasger启动 YarnJobClusterEntrypoint