Flink1.15源码解析--启动JobManager
Posted 宝哥大数据[离职找工作中,大佬帮内推下]
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flink1.15源码解析--启动JobManager相关的知识,希望对你有一定的参考价值。
文章目录
- 一、前言
- 二、解析提交作业命令的参数
- 三、解析flink-conf.yaml配置文件
- 四、创建主节点
- 五、启动主节点
- 五、总结
- 返回[Flink1.15源码解析-总目录](https://blog.csdn.net/wuxintdrh/article/details/127796678)
一、前言
从上文 Flink1.15源码解析–启动脚本----start-cluster.sh 我们知道 JobManager 的入口类为org.apache.flink.runtime.entrypoint.StandaloneSessionClusterEntrypoint,
JobManager的入口类实际有三个
本文我们将 以 StandaloneSessionClusterEntrypoint 为例查看 JobManager 启动的整个流程。
main() 主要做了4件事:
- 1、解析提交作业命令的参数
- 2、解析flink-conf.yaml配置文件
- 3、创建 ClusterEntrypoint
- 4、启动 ClusterEntrypoint
public static void main(String[] args)
// startup checks and logging
EnvironmentInformation.logEnvironmentInfo(
LOG, StandaloneSessionClusterEntrypoint.class.getSimpleName(), args);
SignalHandler.register(LOG);
JvmShutdownSafeguard.installAsShutdownHook(LOG);
// 1、加载入口参数
final EntrypointClusterConfiguration entrypointClusterConfiguration =
ClusterEntrypointUtils.parseParametersOrExit(
args,
new EntrypointClusterConfigurationParserFactory(),
StandaloneSessionClusterEntrypoint.class);
// 2、 加载 flink-conf.yaml
Configuration configuration = loadConfiguration(entrypointClusterConfiguration);
// 3、创建 ClusterEntrypoint
StandaloneSessionClusterEntrypoint entrypoint =
new StandaloneSessionClusterEntrypoint(configuration);
// 4、启动
ClusterEntrypoint.runClusterEntrypoint(entrypoint);
二、解析提交作业命令的参数
三、解析flink-conf.yaml配置文件
四、创建主节点
五、启动主节点
// 4、启动
ClusterEntrypoint.runClusterEntrypoint(entrypoint);
继续,会发现调用的是父类的 startCluster()
public static void runClusterEntrypoint(ClusterEntrypoint clusterEntrypoint)
final String clusterEntrypointName = clusterEntrypoint.getClass().getSimpleName();
try
// 调用父类的 startCluster() 启动集群
clusterEntrypoint.startCluster();
catch (ClusterEntrypointException e)
LOG.error(
String.format("Could not start cluster entrypoint %s.", clusterEntrypointName),
e);
System.exit(STARTUP_FAILURE_RETURN_CODE);
int returnCode;
Throwable throwable = null;
try
returnCode = clusterEntrypoint.getTerminationFuture().get().processExitCode();
catch (Throwable e)
throwable = ExceptionUtils.stripExecutionException(e);
returnCode = RUNTIME_FAILURE_RETURN_CODE;
LOG.info(
"Terminating cluster entrypoint process with exit code .",
clusterEntrypointName,
returnCode,
throwable);
System.exit(returnCode);
startCluster() 的详细了流程见 Flink1.15源码解析-- ClusterEntrypoint----startCluster
五、总结
关于Flink的主节点 JobManager,他只是一个逻辑上的主节点,针对不同的部署模式,主节点的实现类也不同。
JobManager(逻辑)有三大核心内容,分别为ResourceManager、Dispatcher和WebmonitorEndpoin:
ResourceManager:
- Flink集群的资源管理器,只有一个,关于Slot的管理和申请等工作,都有它负责
Dispatcher:
-
1、负责接收用户提交的JobGraph,然后启动一个JobMaster,类似于Yarn中的AppMaster和Spark中的Driver。
-
2、内有一个持久服务:JobGraphStore,负责存储JobGraph。当构建执行图或物理执行图时主节点宕机并恢复,则可以从这里重新拉取作业JobGraph
WebMonitorEndpoint:
- Rest服务,内部有一个Netty服务,客户端的所有请求都由该组件接收处理
用一个例子来描述这三个组件的功能:
当Client提交一个Job到集群时(Client会把Job构建成一个JobGraph),主节点接收到提交的job的Rest请求后,WebMonitorEndpoint 会通过 Router 进行解析找到对应的 Handler 来执行处理,处理完毕后交由 Dispatcher,Dispatcher 负责大气 JobMaster 来负责这个Job内部的Task的部署执行,执行Task所需的资源,JobMaster向ResourceManager申请。
返回Flink1.15源码解析-总目录
以上是关于Flink1.15源码解析--启动JobManager的主要内容,如果未能解决你的问题,请参考以下文章
Flink1.15源码解析--启动JobManager----ResourceManager启动
Flink1.15源码解析--启动JobManager----ResourceManager启动
Flink1.15源码解析--启动JobManager----Dispatcher启动