flinkFlink 1.12.2 源码浅析 : StreamTask 浅析
Posted 九师兄
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了flinkFlink 1.12.2 源码浅析 : StreamTask 浅析相关的知识,希望对你有一定的参考价值。
1.概述
转载:Flink 1.12.2 源码浅析 : StreamTask 浅析
在Task类的doRun方法中, 首先会构建一个运行环境变量RuntimeEnvironment . 然后会调用loadAndInstantiateInvokable方法来加载&实例化task的可执行代码 .
可以看一下loadAndInstantiateInvokable 方法会根据传入的类加载器userCodeClassLoader.asClassLoader()、实例化类的名字nameOfInvokableClass以及构建实例化任务所需要的环境变量信息RuntimeEnvironment.
org.apache.flink.runtime.taskmanager.Task#doRun
private void doRun() {
// 初始化状态相关 代码..
// 记载执行代码所需要的各种任务相关...
// 请求与初始化用户的代码&方法
// 构建代码执行所需要的环境变量
Environment env =
new RuntimeEnvironment(
jobId,
vertexId,
executionId,
executionConfig,
taskInfo,
jobConfiguration,
taskConfiguration,
userCodeClassLoader,
memoryManager,
ioManager,
broadcastVariableManager,
taskStateManager,
aggregateManager,
accumulatorRegistry,
kvStateRegistry,
inputSplitProvider,
distributedCacheEntries,
consumableNotifyingPartitionWriters,
inputGates,
taskEventDispatcher,
checkpointResponder,
operatorCoordinatorEventGateway,
taskManagerConfig,
metrics,
this,
externalResourceInfoProvider);
// 加载&实例化task的可执行代码
// now load and instantiate the task's invokable code
invokable =
loadAndInstantiateInvokable(
userCodeClassLoader.asClassLoader(), nameOfInvokableClass, env);
// 执行代码
invokable.invoke();
// 其他代码略.......
}
在这里,我们看一下实例化的类nameOfInvokableClass的主要的四种类型 .
名称 | 描述 |
---|---|
org.apache.flink.streaming.runtime.tasks.SourceStreamTask | Source相关的StreamTask |
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask | 单输入的StreamTask |
org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask | 两输入的StreamTask |
org.apache.flink.streaming.runtime.tasks.MultipleInputStreamTask | 多输入的StreamTask |
org.apache.flink.streaming.runtime.tasks.StreamIterationHead | A special {@link StreamTask} that is used for executing feedback edges. |
org.apache.flink.streaming.runtime.tasks.StreamIterationTail | A special {@link StreamTask} that is used for executing feedback edges. |
二 .AbstractInvokable
这是TaskManager可以执行的每个任务的抽象基类。
具体的任务扩展了这个类,例如流式处理和批处理任务。
TaskManager在执行任务时调用{@link#invoke()}方法。
任务的所有操作都在此方法中发生(设置输入输出流读写器以及任务的核心操作)。
所有扩展的类都必须提供构造函数{@code MyTask(Environment,TaskStateSnapshot)}.
为了方便起见,总是无状态的任务也只能实现构造函数{@code MyTask(Environment)}.
开发说明:
虽然构造函数不能在编译时强制执行,但我们还没有冒险引入工厂(毕竟它只是一个内部API,对于java8,可以像工厂lambda一样使用 {@code Class::new} )。
注意:
没有接受初始任务状态快照并将其存储在变量中的构造函数。
这是出于目的,因为抽象调用本身不需要状态快照(只有StreamTask等子类需要状态),我们不希望无限期地存储引用,从而防止垃圾收集器清理初始状态结构。
任何支持可恢复状态并参与检查点设置的子类都需要重写
{@link #triggerCheckpointAsync(CheckpointMetaData, CheckpointOptions, boolean)},
{@link #triggerCheckpointOnBarrier(CheckpointMetaData, CheckpointOptions, CheckpointMetricsBuilder)},
{@link #abortCheckpointOnBarrier(long, Throwable)}and {@link #notifyCheckpointCompleteAsync(long)}.
2.1. 属性&初始化
AbstractInvokable 抽象类只有两个属性Environment environment和shouldInterruptOnCancel = true
属性
/**
* 分配给此可调用对象的环境。
* The environment assigned to this invokable.
* */
private final Environment environment;
/**
* 标记取消是否应中断正在执行的线程。
* Flag whether cancellation should interrupt the executing thread.
* */
private volatile boolean shouldInterruptOnCancel = true;
构造方法
构造方法就是传入一个Environment对象.
/**
* Create an Invokable task and set its environment.
*
* @param environment The environment assigned to this invokable.
*/
public AbstractInvokable(Environment environment) {
this.environment = checkNotNull(environment);
}
2.2. Environment
Environment 是AbstractInvokable抽象类(以及子类)的构造函数入参. 在构造Task的时候会把环境参数信息封装成Environment的子类.
交给 任务的实现类(比如: SourceStreamTask 或者 OneInputStreamTask 来处理.)
这个没啥可说的,就是封装了一系列的环境引用信息.
private final JobID jobId;
private final JobVertexID jobVertexId;
private final ExecutionAttemptID executionId;
private final TaskInfo taskInfo;
private final Configuration jobConfiguration;
private final Configuration taskConfiguration;
private final ExecutionConfig executionConfig;
private final UserCodeClassLoader userCodeClassLoader;
private final MemoryManager memManager;
private final IOManager ioManager;
private final BroadcastVariableManager bcVarManager;
private final TaskStateManager taskStateManager;
private final GlobalAggregateManager aggregateManager;
private final InputSplitProvider splitProvider;
private final ExternalResourceInfoProvider externalResourceInfoProvider;
private final Map<String, Future<Path>> distCacheEntries;
private final ResultPartitionWriter[] writers;
private final IndexedInputGate[] inputGates;
private final TaskEventDispatcher taskEventDispatcher;
private final CheckpointResponder checkpointResponder;
private final TaskOperatorEventGateway operatorEventGateway;
private final AccumulatorRegistry accumulatorRegistry;
private final TaskKvStateRegistry kvStateRegistry;
private final TaskManagerRuntimeInfo taskManagerInfo;
private final TaskMetricGroup metrics;
private final Task containingTask;
2.3. 方法清单
核心的方法
名称 | 描述 |
---|---|
invoke | Starts the execution 必须被具体的任务实现所覆盖。当任务的实际执行开始时,task manager 将调用此方法。 |
cancel | 当由于用户中止或执行失败而取消任务时,将调用此方法. 它可以被覆盖以响应正确关闭用户代码。 |
shouldInterruptOnCancel | 设置执行{@link #invoke()}方法的线程是否应在取消过程中中断。 此方法为 initial interrupt 和 repeated interrupt 设置标志。 |
dispatchOperatorEvent | 外部影响task执行的入口. Operator Events |
Checkpoint相关方法
名称 | 描述 |
---|---|
triggerCheckpointAsync | 此方法由检查点协调器异步调用以触发检查点。 |
triggerCheckpointOnBarrier | 在所有 input streams 上接收到检查点屏障而触发检查点时,将调用此方法。 |
abortCheckpointOnBarrier | 在接收一些checkpoint barriers 的结果时, 放弃checkpoint … |
notifyCheckpointCompleteAsync | 通知checkpoint完成 |
notifyCheckpointAbortAsync | 通知notifyCheckpointAbortAsync取消 |
三 .StreamTask
所有流式处理任务的基类。
Task是由TaskManager部署和执行的本地处理单元。
每个任务运行一个或多个{@link StreamOperator},这些{@link StreamOperator}构成任务的操作符 chained 。
chained 接在一起的运算符在同一线程中同步执行,因此在同一流分区上执行。
这些chained的常见情况是连续的map/flatmap/filter任务。
任务 chained 包含一个“head”operator和多个 chained operator。
StreamTask专门用于 head operator 的类型:
one-input : OneInputStreamTask
two-input tasks : TwoInputStreamTask
sources : SourceStreamTask
iteration heads : StreamIterationHead
iteration tails : StreamIterationTail
Task类处理由head操作符读取的流的设置,以及操作符在操作符 chained 的末端生成的流。
注意, chained 可能分叉,因此有多个端部。
任务的生命周期设置如下:
<pre>{@code
-- setInitialState -> 提供chain中所有operators的状态
-- invoke()
|
+----> Create basic utils (config, etc) and load the chain of operators
+----> operators.setup()
+----> task specific init()
+----> initialize-operator-states()
+----> open-operators()
+----> run()
+----> close-operators()
+----> dispose-operators()
+----> common cleanup
+----> task specific cleanup()
}</pre>
{@code StreamTask}有一个名为{@code lock}的锁对象。
必须在此锁对象上同步对{@code StreamOperator}上方法的所有调用,以确保没有方法被并发调用。
3.1. 属性& 构造方法
属性相关
/** The thread group that holds all trigger timer threads. */
public static final ThreadGroup TRIGGER_THREAD_GROUP = new ThreadGroup("Triggers");
/** The logger used by the StreamTask and its subclasses. */
protected static final Logger LOG = LoggerFactory.getLogger(StreamTask.class);
// ------------------------------------------------------------------------
/**
* 任务之外的所有操作{@link #mailboxProcessor mailbox} , 比如 (i.e. 另一个线程执行)
* 必须通过此执行器执行,以确保没有使一致检查点无效的并发方法调用。
*
*
*
* All actions outside of the task {@link #mailboxProcessor mailbox}
* (i.e. performed by another thread)
* must be executed through this executor to ensure that we don't have concurrent method
* calls that void consistent checkpoints.
*
* <p>CheckpointLock is superseded by {@link MailboxExecutor}, with {@link
* StreamTaskActionExecutor.SynchronizedStreamTaskActionExecutor
* SynchronizedStreamTaskActionExecutor} to provide lock to {@link SourceStreamTask}.
*/
private final StreamTaskActionExecutor actionExecutor;
/**
* 输入处理器。在{@link #init()}方法中初始化。
* The input processor. Initialized in {@link #init()} method. */
@Nullable protected StreamInputProcessor inputProcessor;
/**
* [重要] 使用此任务的输入流的主运算符。
* the main operator that consumes the input streams of this task.
* */
protected OP mainOperator;
/**
* task执行的 OperatorChain
* The chain of operators executed by this task. */
protected OperatorChain<OUT, OP> operatorChain;
/**
* streaming task的配置信息.
* The configuration of this streaming task. */
protected final StreamConfig configuration;
/**
* 我们的状态后端。
*
* 我们使用它来创建检查点流和 keyed 状态后端。
* Our state backend. We use this to create checkpoint streams and a keyed state backend. */
protected final StateBackend stateBackend;
/**
* 子任务 Checkpoint 协调器
*/
private final SubtaskCheckpointCoordinator subtaskCheckpointCoordinator;
/**
* 内部{@link TimerService}用于定义当前处理时间(默认值={@code System.currentTimeMillis()})
* 并为将来要执行的任务注册计时器。
*
* The internal {@link TimerService} used to define the current processing time (default =
* {@code System.currentTimeMillis()}) and register timers for tasks to be executed in the
* future.
*/
protected final TimerService timerService;
/**
* 当前活动的后台具体线程
* The currently active background materialization threads.
* */
private final CloseableRegistry cancelables = new CloseableRegistry();
/**
* 异常处理相关
*/
private final StreamTaskAsyncExceptionHandler asyncExceptionHandler;
/**
* 将任务标记为“操作中”的标志,在这种情况下,需要将check初始化为true,
* 以便invoke()之前的early cancel()正常工作。
*
* Flag to mark the task "in operation", in which case check needs to be initialized to true, so
* that early cancel() before invoke() behaves correctly.
*/
private volatile boolean isRunning;
/**
* 标识任务被取消.
* Flag to mark this task as canceled. */
private volatile boolean canceled;
/**
* 标识任务失败, 比如在invoke方法中发生异常...
*
* Flag to mark this task as failing, i.e. if an exception has occurred inside {@link
* #invoke()}.
*/
private volatile boolean failing;
/**
* ???? 干啥的
*/
private boolean disposedOperators;
/** Thread pool for async snapshot workers. */
private final ExecutorService asyncOperationsThreadPool;
private final RecordWriterDelegate<SerializationDelegate<StreamRecord<OUT>>> recordWriter;
protected final MailboxProcessor mailboxProcessor;
final MailboxExecutor mainMailboxExecutor;
/** TODO it might be replaced by the global IO executor on TaskManager level future. */
private final ExecutorService channelIOExecutor;
private Long syncSavepointId = null;
private Long activeSyncSavepointId = null;
private long latestAsyncCheckpointStartDelayNanos;
构造方法就是普通的赋值操作, 需要注意的是 this.mailboxProcessor = new MailboxProcessor(this::processInput, mailbox, actionExecutor);
protected StreamTask(
Environment environment,
@Nullable TimerService timerService,
Thread.UncaughtExceptionHandler uncaughtExceptionHandler,
StreamTaskActionExecutor actionExecutor,
TaskMailbox mailbox)
throws Exception {
super(environment);
this.configuration = new StreamConfig(getTaskConfiguration());
this.recordWriter = createRecordWriterDelegate(configuration, environment);
this.actionExecutor = Preconditions.checkNotNull(actionExecutor);
this.mailboxProcessor = new MailboxProcessor(this::processInput, mailbox, actionExecutor);
this.mailboxProcessor.initMetric(environment.getMetricGroup());
this.mainMailboxExecutor = mailboxProcessor.getMainMailboxExecutor();
this.asyncExceptionHandler = new StreamTaskAsyncExceptionHandler(environment);
this.asyncOperationsThreadPool =
Executors.newCachedThreadPool(
new ExecutorThreadFactory("AsyncOperations", uncaughtExceptionHandler));
this.stateBackend = createStateBackend();
// ????????????
this.subtaskCheckpointCoordinator =
new SubtaskCheckpointCoordinatorImpl(
stateBackend.createCheckpointStorage(getEnvironment().getJobID()),
getName(),
actionExecutor,
getCancelables(),
getAsyncOperationsThreadPool(),
getEnvironment(),
this,
configuration.isUnalignedCheckpointsEnabled(),
this::prepareInputSnapshot);
// if the clock is not already set, then assign a default TimeServiceProvider
if (timerService == null) {
ThreadFactory timerThreadFactory =
new DispatcherThreadFactory(
TRIGGER_THREAD_GROUP, "Time Trigger for " + getName());
this.timerService =
new SystemProcessingTimeService(this::handleTimerException, timerThreadFactory);
} else {
this.timerService = timerService;
}
this.channelIOExecutor =
Executors.newSingleThreadExecutor(
new ExecutorThreadFactory("channel-state-unspilling"));
injectChannelStateWriterIntoChannels();
}
3.2. invoke
invoke是Task的核心方法, 看下都干了啥…
Invoke之前 : beforeInvoke();
Invoke: runMailboxLoop();
Invoke之后: afterInvoke();
清理: cleanUpInvoke();
// map之类的算子...
@Override
public final void invoke() throws Exception {
try {
// 初始化行管...
beforeInvoke();
// final check to exit early before starting to run
if (canceled) {
throw new CancelTaskException();
}
// [核心] 执行任务...
// let the task do its work
runMailboxLoop();
// if this left the run() method cleanly despite the fact that this was canceled,
// make sure the "clean shutdown" is not attempted
if (canceled) {
throw new CancelTaskException();
}
afterInvoke();
} catch (Throwable invokeException) {
failing = !canceled;
try {
cleanUpInvoke();
}
// TODO: investigate why Throwable instead of Exception is used here.
catch (Throwable cleanUpException) {
Throwable throwable =
ExceptionUtils.firstOrSuppressed(cleanUpException, invokeException);
ExceptionUtils.rethrowException(throwable);
}
ExceptionUtils.rethrowException(invokeException);
}
cleanUpInvoke();
}
3.2.1. beforeInvoke
构造: OperatorChain 和 执行 实例化Task类的初始化init方法…
protected void beforeInvoke() throws Exception {
disposedOperators = false;
// Initializing Source: Socket Stream -> Flat Map (1/1)#0.
// Initializing Window(TumblingProcessingTimeWindows(5000), ProcessingTimeTrigger, ReduceFunction$1, PassThroughWindowFunction) -> Sink: Print to Std. Out (1/1)#0.
LOG.debug("Initializing {}.", getName());
operatorChain = new OperatorChain<>(this, recordWriter);
// mainOperator = {StreamSource@6752}
// ctx = null
// canceledOrStopped = false
// hasSentMaxWatermark = false
// userFunction = {SocketTextStreamFunction@6754}
// functionsClosed = false
// chainingStrategy = {ChainingStrategy@6755} "HEAD"
// container = {SourceStreamTask@6554} "Source: Socket Stream -> Flat Map (1/1)#0"
// config = {StreamConfig@6756} "\\n=======================Stream Config=======================\\nNumber of non-chained inputs: 0\\nNumber of non-chained outputs: 0\\nOutput names: []\\nPartitioning:\\nChained subtasks: [(Source: Socket Stream-1 -> Flat Map-2, typeNumber=0, outputPartitioner=FORWARD, bufferTimeout=-1, outputTag=null)]\\nOperator: SimpleUdfStreamOperatorFactory\\nState Monitoring: false\\n\\n\\n---------------------\\nChained task configs\\n---------------------\\n{2=\\n=======================Stream Config=======================\\nNumber of non-chained inputs: 0\\nNumber of non-chained outputs: 1\\nOutput names: [(Flat Map-2 -> Window(TumblingProcessingTimeWindows(5000), ProcessingTimeTrigger, ReduceFunction$1, PassThroughWindowFunction)-4, typeNumber=0, outputPartitioner=HASH, bufferTimeout=-1, outputTag=null)]\\nPartitioning:\\n\\t4: HASH\\nChained subtasks: []\\nOperator: SimpleUdfStreamOperatorFactory\\nState Monitoring: false}"
// output = {CountingOutput@6757}
// runtimeContext = {StreamingRuntimeContext@6758}
// stateKeySelector1 = null
// stateKeySelector2 = null
// stateHandler = null
// timeServiceManager = null
// metrics = {OperatorMetricGroup@6759}
// latencyStats = {LatencyStats@6760}
// processingTimeService = {ProcessingTimeServiceImpl@6761}
// combinedWatermark = -9223372036854775808
// input1Watermark = -9223372036854775808
// input2Watermark = -9223372036854775808
mainOperator = operatorChain.getMainOperator();
// 执行任务初始化操作.
// task specific initialization
init();
// save the work of reloading state, etc, if the task is already canceled
if (canceled) {
throw new CancelTaskException();
}
// -------- Invoke --------
// Invoking Source: Socket Stream -> Flat Map (1/1)#0
LOG.debug("Invoking {}", getName());
// 我们需要确保open()中安排的所有触发器在所有操作符打开之前都不能执行
// we need to make sure that any triggers scheduled in open() cannot be
// executed before all operators are opened
actionExecutor.runThrowing(
() -> {
SequentialChannelStateReader reader =
getEnvironment()
.getTaskStateManager()
.getSequentialChannelStateReader();
// TODO: for UC rescaling, reenable notifyAndBlockOnCompletion for non-iterative
// jobs
reader.readOutputData(getEnvironment().getAllWriters(), false);
operatorChain.initializeStateAndOpenOperators(
createStreamTaskStateInitializer());
channelIOExecutor.execute(
() -> {
try {
reader.readInputData(getEnvironment().getAllInputGates());
} catch (Exception e) {
asyncExceptionHandler.handleAsyncException(
"Unable to read channel state", e);
}
});
for (InputGate inputGate : getEnvironment().getAllInputGates()) {
inputGate
.getStateConsumedFuture()
.thenRun(
() ->
mainMailboxExecutor.execute(
inputGate::requestPartitions,
"Input gate request partitions"));
}
});
isRunning = true;
}
3.2.2. runMailboxLoop
public void runMailboxLoop() throws Exception {
// runMailboxLoop ??
//
mailboxProcessor.runMailboxLoop();以上是关于flinkFlink 1.12.2 源码浅析 : StreamTask 浅析的主要内容,如果未能解决你的问题,请参考以下文章
flinkFlink 1.12.2 源码浅析 : Task 浅析
flinkFlink 1.12.2 源码浅析 : Task数据输入
flinkFlink 1.12.2 源码浅析 :Task数据输出
flinkFlink 1.12.2 源码浅析 : yarn-per-job模式解析 TaskMasger 启动
flinkFlink 1.12.2 源码浅析 : yarn-per-job模式解析 yarn 提交过程解析
flinkFlink 1.12.2 源码浅析 : yarn-per-job模式解析 JobMasger启动 YarnJobClusterEntrypoint