Flink1.15源码解析--启动JobManager
Posted 宝哥大数据
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flink1.15源码解析--启动JobManager相关的知识,希望对你有一定的参考价值。
文章目录
- 一、前言
- 二、解析提交作业命令的参数
- 三、解析flink-conf.yaml配置文件
- 四、创建主节点
- 五、启动主节点
- 返回[Flink1.15源码解析-总目录](https://blog.csdn.net/wuxintdrh/article/details/127796678)
一、前言
从上文 Flink1.15源码解析–启动脚本----start-cluster.sh 我们知道 JobManager 的入口类为org.apache.flink.runtime.entrypoint.StandaloneSessionClusterEntrypoint,
JobManager的入口类实际有三个
本文我们将 以 StandaloneSessionClusterEntrypoint 为例查看 JobManager 启动的整个流程。
main() 主要做了4件事:
- 1、解析提交作业命令的参数
- 2、解析flink-conf.yaml配置文件
- 3、创建 ClusterEntrypoint
- 4、启动 ClusterEntrypoint
public static void main(String[] args)
// startup checks and logging
EnvironmentInformation.logEnvironmentInfo(
LOG, StandaloneSessionClusterEntrypoint.class.getSimpleName(), args);
SignalHandler.register(LOG);
JvmShutdownSafeguard.installAsShutdownHook(LOG);
// 1、加载入口参数
final EntrypointClusterConfiguration entrypointClusterConfiguration =
ClusterEntrypointUtils.parseParametersOrExit(
args,
new EntrypointClusterConfigurationParserFactory(),
StandaloneSessionClusterEntrypoint.class);
// 2、 加载 flink-conf.yaml
Configuration configuration = loadConfiguration(entrypointClusterConfiguration);
// 3、创建 ClusterEntrypoint
StandaloneSessionClusterEntrypoint entrypoint =
new StandaloneSessionClusterEntrypoint(configuration);
// 4、启动
ClusterEntrypoint.runClusterEntrypoint(entrypoint);
二、解析提交作业命令的参数
三、解析flink-conf.yaml配置文件
四、创建主节点
五、启动主节点
// 4、启动
ClusterEntrypoint.runClusterEntrypoint(entrypoint);
继续,会发现调用的是父类的 startCluster()
public static void runClusterEntrypoint(ClusterEntrypoint clusterEntrypoint)
final String clusterEntrypointName = clusterEntrypoint.getClass().getSimpleName();
try
// 调用父类的 startCluster() 启动集群
clusterEntrypoint.startCluster();
catch (ClusterEntrypointException e)
LOG.error(
String.format("Could not start cluster entrypoint %s.", clusterEntrypointName),
e);
System.exit(STARTUP_FAILURE_RETURN_CODE);
int returnCode;
Throwable throwable = null;
try
returnCode = clusterEntrypoint.getTerminationFuture().get().processExitCode();
catch (Throwable e)
throwable = ExceptionUtils.stripExecutionException(e);
returnCode = RUNTIME_FAILURE_RETURN_CODE;
LOG.info(
"Terminating cluster entrypoint process with exit code .",
clusterEntrypointName,
returnCode,
throwable);
System.exit(returnCode);
startCluster() 的详细了流程见 Flink1.15源码解析-- ClusterEntrypoint----startCluster
返回Flink1.15源码解析-总目录
以上是关于Flink1.15源码解析--启动JobManager的主要内容,如果未能解决你的问题,请参考以下文章
Flink1.15源码解析--启动JobManager----ResourceManager启动
Flink1.15源码解析--启动JobManager----ResourceManager启动
Flink1.15源码解析--启动JobManager----Dispatcher启动