Flink Runtime 1.0 Notes: Plan 2 Task
Posted 张包峰
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flink Runtime 1.0 Notes: Plan 2 Task相关的知识,希望对你有一定的参考价值。
About
I will try to give the mainline of how does Flink
buildint the logical plan 2 physical plan 2 task.
Main classes and methods are mentioned.
Format explaination
this is
Class
this is method()
this is constant
Logical Plan 2 Physcial Plan 2 Task
JobGraph
-> ExecutionGraph
, on JobManager
JobManager
receiveJobGraph
submitted from Client. using zookeeper to persist itattachJobGraph(), add
JobVertex
toExecutionGraph
, do the transformationscheduleForExecution(), give
Scheduler
toExecutionGraph
, do the kick-off for tasks
Logical Plan
JobGraph
is composed of a map of JobVertex
, which is called taskVertices.
JobVertex
is composed of a list of IntermediateDataSet
, a list ofJobEdge
as inputs, an operatorName, an invokableClassName, the parallelism.
JobEdge
has DistributionPattern
, one IntermediateDataSet
as source.
Physical Plan
ExecutionGraph
is composed of a map of ExecutionJobVertex
as its tasks.
ExecutionJobVertex
has the whole ExecutionGraph
, one JobVertex
, an array of ExecutionVertex
as the taskVertices, a list of IntermediateResult
as inputs, an array of IntermediateResult
as the producedDataSets. These are inited during construction from JobVertex
.
ExecutionVertex
is created during the construction of ExecutionJobVertex
, by its vertex parallelism. It is a single subtask of the execution. Execution
manages the real execution in it.
Besides, ExecutionEdge
is created during ExecutionJobVertex
.connectToPredecessors(), JobEdge
maps to ExecutionEdge
during ExecutionVertex
.connectSource(), where DistributionPattern
takes effect.
If DistributionPattern
is ALL_TO_ALL, each ExecutionVertex
has source partition num of ExecutionEdge
; else if DistributionPattern
is POINTWISE, compare the paralellism with source partition num, if equal, make it one-to-one, else if bigger, zero-or-one-to-one, else, one-to-many.
Schedule 4 Execution
Once all preparing phases done, JobManager
invokes ExecutionGraph
.schedulerForExecution(), given a Scheduler
coming from the construction.
Default schedule mode is FROM_SOURCES, traverse the vertices then choose the input vertex to do ExecutionJobVertex
.scheduleAll(), else in ALL mode, all vertices are scheduled.
The real tasks in it are ExecutionVertex
s, location hosts may be set according to instance info from scheduler, then each ExecutionVertex
.scheduleForExecution() invoked.
Inside ExecutionVertex
, invoke Execution
.scheduleForExecution(), which get slots from scheduler and do the deploy thing on assigned slots, either it is QUEUED scheduled or immediately scheduled(default is not queued).
According to vertex SlotSharingGroup
and CoLocationConstraint
, a ScheduledUnit
is created, which is handled by Scheduler
to do the task scheduling things. Considering location preferred and sharing group, a Slot
is finally returned to Execution
to do deployToSlot().
deployToSlot() aims to distribute tasks, a TaskDeploymentDescriptor
is firstly created by ExecutionVertex
, which contains job info, task info, configuration, className, and jar files, etc. Secondly, a SubmitTask
message is send to the target Instance
of the Slot
, then goes the TaskManager
.
Task Launched
Once TaskManager
receives SubmitTask
msg, Task
then will be created and started, an ack msg sent back to sender.
Once started, a handleful of things will be done
- checking and updating
ExecutionState
- loading classes and the invokable
- network being controlled by
NetworkEnvironment
- creating
TaskInputSplitProvider
to ask forNextInputSplit
(it should be source) RuntimeEnvironment
created and set to invokable, managers likeMemoryManager
,IOManager
,BroadcastVariableManager
all sit thereStateHandle
created and set to invokable- invokable invoked
Task
is a wrapper as a thread, more specific exception handling, life management, observing things are omitted.
:)
以上是关于Flink Runtime 1.0 Notes: Plan 2 Task的主要内容,如果未能解决你的问题,请参考以下文章
Flink Runtime 1.0 Notes: Plan 2 Task
org.apache.flink.runtime.entrypoint.StandaloneSessionClusterEntrypoint
日常Exception第三十三回:Flink运行jar包报错NoSuchMethodError: org.apache.flink.api.common.functions.Runtime....
日常Exception第三十三回:Flink运行jar包报错NoSuchMethodError: org.apache.flink.api.common.functions.Runtime....