FlinkOnYarn源码分析

Posted 架构与英文

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了FlinkOnYarn源码分析相关的知识,希望对你有一定的参考价值。

Flink可以对接第三方资源进行弹性伸缩,常见的有Yarn、K8S。Flink对接Yarn有两种模式,分别是session模式和per-job模式。由于生产环境多用per-job模式,因此本文主要分析对接Yarn时per-Job模式的提交过程。

Flink提交application语句:./flink run -m yarn-cluster ./flinkApp.jar

客户端提交命令过程分析

该语句最终有入口类org.apache.flink.client.cli.CliFrontend处理,

通过run()函数触发整个流程,最终是调用PackagedPrograminvokeInteractiveModeForExecution() 

FlinkOnYarn源码分析

callMainMethodmainClass,args) ,可以了解这是调用里我们程序的main函数。这里并不是我们程序直接提交的地方,而是我们业务代码程序进行编译的地方。我们的业务代码通过StreamExecutionEnvironment.execute()开始编译,首先会编译成Streamgraph,然后通过executeAsync(streamgraph)-AbstractJobClusterExecutor.execute()进行异步编译并提交。

FlinkOnYarn源码分析

FlinkOnYarn源码分析

我们重点关注下deployJobCluster接口,对接三方资源需要重写该接口。如下

FlinkOnYarn源码分析

FlinkOnYarn源码分析

在yarn的per-job模式下,使用的是yarnClusterDescriptor。

这里会把YarnJobClusterEntrypoint的类名当作参数传入,在deployInternal触发startAppMaster执行,顾名思义,是启动Application Master

FlinkOnYarn源码分析

startAppMaster函数里主要逻辑有打包本地jar和编译好的jobgraph一起上传到HDFS,然后初始化appcontext,最后提交到yarn。

FlinkOnYarn源码分析

提交到yarn之后的流程分析

1FlinkDispatcherResourceManager和JobManagerRunner启动流程分析

上文提到的YarnJobClusterEntrypoint是执行入口,会被resource manager通过反射调用其main函数。

FlinkOnYarn源码分析

因此通过一系列调用有ClusterEntrypoint类中runClusterEntrypoint-》startCluster-》runCluster。

在RunCluster内部有resourceManager和Dispatcher线程的创建和启动。由于session模式和per-job模式的最大区别是Dispatcher对任务的处理不同。Session模式下通过restful提交job,而per-job模式下,Dispatcher是从文件系统里获取到JobGraph的,我们重点看下Dispatcher。

FlinkOnYarn源码分析

FlinkOnYarn源码分析

FlinkOnYarn源码分析

FlinkOnYarn源码分析

FlinkOnYarn源码分析

经过调用,最终获得了job Graph,经过层层调用,该jobGraph会被传到dispatcher的recoveredJobs成员变量集合里。另外需要注意的是此时创建的Dispatcher的类型是Mini Dispatcher。recoveredJobs里面的job在Dispatcher.onstart函数里触发。

FlinkOnYarn源码分析

FlinkOnYarn源码分析

最后触发runjob,在runjob内部会创建JobManagerRunnerImpl,然后创建jobmaster,Jobmaster开始调度job,job会首先向resource manager申请资源。如果资源不足,则创建container并启动task Executor。

2FlinkTask Executor启动流程

ResourceManager的requestSlot(),通过slotManager.registerSlotRequest(slotRequest)触发slot申请,然后调用实现类slotManagerImpl的internalRequestSlot-》 fulfillPendingSlotRequestWithPendingTaskManagerSlot -》allocateResource 

FlinkOnYarn源码分析

FlinkOnYarn源码分析

再调用Resourcemanager内部类ResourceActionImp的allocateResource函数的startNewWorker  函数内部申请container并启动taskexecutor,其中startNewWorker接口对接yarn、k8s等平台实现container的申请工作。其过程如下:

FlinkOnYarn源码分析

FlinkOnYarn源码分析

ResourceActionImp的allocateResource -》startNewWorker -》ActiveResourceManager. startNewWorker -》requestNewWorker,最终触发resourceManagerDriver.requestResource 

FlinkOnYarn源码分析

FlinkOnYarn源码分析

该ResourceManagerDriver在per-job模式下,指的是YarnResourceManagerDriver,该类主要是分配container并启动TaskExecutor。YarnresourceManagerDriver内提供一个回调类 YarnContainerEventHandler  ,实现CallbackHandler接口 。这个回调类主要实现了两个方法:onContainersCompleted(), 和onContainersAllocated(), 其中onContainersAllocated是获得到已申请的 Container ,创建一个新线程, 通过异步的方式启动TaskExecutor,

FlinkOnYarn源码分析

startTaskExecutorInContainerAsyn调用createTaskExecutorLaunchContext

这里看到了YarnTaskExecutorRunner被传入到context中,在container内会调用YarnTaskExecutorRunner.runTaskManagerSecurely,最终触发taskManagerRunner.start执行。

通过以上分析我们就知道了在Yarn上Flink之Dispatcher、ResourceManager和JobManagerRunner、Task Executor是如何启动及协作的。它们之间详细的作业调度、作业执行、数据交换、应用容错及指标监控可以通过调试代码逐步分析。


以上是关于FlinkOnYarn源码分析的主要内容,如果未能解决你的问题,请参考以下文章

Flink on Yarn模式启动流程源代码分析

v73.02 鸿蒙内核源码分析(参考手册) | 阅读内核源码必备工具 | 百篇博客分析OpenHarmony源码

Mesos源码分析

Mybatis源码分析

Spring源码分析专题——目录

ARouter源码分析