Flink Runtime 1.0 Notes: Plan 2 Task

Posted 张包峰

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flink Runtime 1.0 Notes: Plan 2 Task相关的知识,希望对你有一定的参考价值。

About

I will try to give the mainline of how does Flink buildint the logical plan 2 physical plan 2 task.

Main classes and methods are mentioned.

Format explaination

  • this is Class

  • this is method()

  • this is constant

Logical Plan 2 Physcial Plan 2 Task

JobGraph -> ExecutionGraph, on JobManager

  • JobManager receive JobGraph submitted from Client. using zookeeper to persist it

  • attachJobGraph(), add JobVertex to ExecutionGraph, do the transformation

  • scheduleForExecution(), give Scheduler to ExecutionGraph, do the kick-off for tasks

Logical Plan

JobGraph is composed of a map of JobVertex, which is called taskVertices.

JobVertex is composed of a list of IntermediateDataSet, a list ofJobEdge as inputs, an operatorName, an invokableClassName, the parallelism.

JobEdge has DistributionPattern, one IntermediateDataSet as source.

Physical Plan

ExecutionGraph is composed of a map of ExecutionJobVertex as its tasks.

ExecutionJobVertex has the whole ExecutionGraph, one JobVertex, an array of ExecutionVertex as the taskVertices, a list of IntermediateResult as inputs, an array of IntermediateResult as the producedDataSets. These are inited during construction from JobVertex.

ExecutionVertex is created during the construction of ExecutionJobVertex , by its vertex parallelism. It is a single subtask of the execution. Execution manages the real execution in it.

Besides, ExecutionEdge is created during ExecutionJobVertex .connectToPredecessors(), JobEdge maps to ExecutionEdge during ExecutionVertex.connectSource(), where DistributionPattern takes effect.

If DistributionPattern is ALL_TO_ALL, each ExecutionVertex has source partition num of ExecutionEdge; else if DistributionPattern is POINTWISE, compare the paralellism with source partition num, if equal, make it one-to-one, else if bigger, zero-or-one-to-one, else, one-to-many.

Schedule 4 Execution

Once all preparing phases done, JobManager invokes ExecutionGraph.schedulerForExecution(), given a Scheduler coming from the construction.

Default schedule mode is FROM_SOURCES, traverse the vertices then choose the input vertex to do ExecutionJobVertex.scheduleAll(), else in ALL mode, all vertices are scheduled.

The real tasks in it are ExecutionVertexs, location hosts may be set according to instance info from scheduler, then each ExecutionVertex.scheduleForExecution() invoked.

Inside ExecutionVertex, invoke Execution.scheduleForExecution(), which get slots from scheduler and do the deploy thing on assigned slots, either it is QUEUED scheduled or immediately scheduled(default is not queued).

According to vertex SlotSharingGroup and CoLocationConstraint, a ScheduledUnit is created, which is handled by Scheduler to do the task scheduling things. Considering location preferred and sharing group, a Slot is finally returned to Execution to do deployToSlot().

deployToSlot() aims to distribute tasks, a TaskDeploymentDescriptor is firstly created by ExecutionVertex, which contains job info, task info, configuration, className, and jar files, etc. Secondly, a SubmitTask message is send to the target Instance of the Slot, then goes the TaskManager.

Task Launched

Once TaskManager receives SubmitTask msg, Task then will be created and started, an ack msg sent back to sender.

Once started, a handleful of things will be done

  1. checking and updating ExecutionState
  2. loading classes and the invokable
  3. network being controlled by NetworkEnvironment
  4. creating TaskInputSplitProvider to ask for NextInputSplit(it should be source)
  5. RuntimeEnvironmentcreated and set to invokable, managers like MemoryManager, IOManager, BroadcastVariableManager all sit there
  6. StateHandle created and set to invokable
  7. invokable invoked

Task is a wrapper as a thread, more specific exception handling, life management, observing things are omitted.

:)

以上是关于Flink Runtime 1.0 Notes: Plan 2 Task的主要内容,如果未能解决你的问题,请参考以下文章

Flink Runtime 1.0 Notes: Plan 2 Task

org.apache.flink.runtime.entrypoint.StandaloneSessionClusterEntrypoint

面向流批一体的 Flink Runtime 新进展

结合Spark讲一下Flink的runtime

日常Exception第三十三回:Flink运行jar包报错NoSuchMethodError: org.apache.flink.api.common.functions.Runtime....

日常Exception第三十三回:Flink运行jar包报错NoSuchMethodError: org.apache.flink.api.common.functions.Runtime....