Flink1.15源码解析--启动JobManager

Posted 宝哥大数据

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flink1.15源码解析--启动JobManager相关的知识,希望对你有一定的参考价值。

文章目录

一、前言

从上文 Flink1.15源码解析–启动脚本----start-cluster.sh 我们知道 JobManager 的入口类为org.apache.flink.runtime.entrypoint.StandaloneSessionClusterEntrypoint,
JobManager的入口类实际有三个

本文我们将 以 StandaloneSessionClusterEntrypoint 为例查看 JobManager 启动的整个流程。

main() 主要做了4件事:

  • 1、解析提交作业命令的参数
  • 2、解析flink-conf.yaml配置文件
  • 3、创建 ClusterEntrypoint
  • 4、启动 ClusterEntrypoint
    public static void main(String[] args) 
        // startup checks and logging
        EnvironmentInformation.logEnvironmentInfo(
                LOG, StandaloneSessionClusterEntrypoint.class.getSimpleName(), args);
        SignalHandler.register(LOG);
        JvmShutdownSafeguard.installAsShutdownHook(LOG);

        // 1、加载入口参数
        final EntrypointClusterConfiguration entrypointClusterConfiguration =
                ClusterEntrypointUtils.parseParametersOrExit(
                        args,
                        new EntrypointClusterConfigurationParserFactory(),
                        StandaloneSessionClusterEntrypoint.class);
        // 2、 加载 flink-conf.yaml
        Configuration configuration = loadConfiguration(entrypointClusterConfiguration);

        // 3、创建 ClusterEntrypoint
        StandaloneSessionClusterEntrypoint entrypoint =
                new StandaloneSessionClusterEntrypoint(configuration);

        // 4、启动
        ClusterEntrypoint.runClusterEntrypoint(entrypoint);
    

二、解析提交作业命令的参数

三、解析flink-conf.yaml配置文件

四、创建主节点

五、启动主节点

        // 4、启动
        ClusterEntrypoint.runClusterEntrypoint(entrypoint);

继续,会发现调用的是父类的 startCluster()

    public static void runClusterEntrypoint(ClusterEntrypoint clusterEntrypoint) 

        final String clusterEntrypointName = clusterEntrypoint.getClass().getSimpleName();
        try 
            // 调用父类的 startCluster() 启动集群
            clusterEntrypoint.startCluster();
         catch (ClusterEntrypointException e) 
            LOG.error(
                    String.format("Could not start cluster entrypoint %s.", clusterEntrypointName),
                    e);
            System.exit(STARTUP_FAILURE_RETURN_CODE);
        

        int returnCode;
        Throwable throwable = null;

        try 
            returnCode = clusterEntrypoint.getTerminationFuture().get().processExitCode();
         catch (Throwable e) 
            throwable = ExceptionUtils.stripExecutionException(e);
            returnCode = RUNTIME_FAILURE_RETURN_CODE;
        

        LOG.info(
                "Terminating cluster entrypoint process  with exit code .",
                clusterEntrypointName,
                returnCode,
                throwable);
        System.exit(returnCode);
    

startCluster() 的详细了流程见 Flink1.15源码解析-- ClusterEntrypoint----startCluster

返回Flink1.15源码解析-总目录

以上是关于Flink1.15源码解析--启动JobManager的主要内容,如果未能解决你的问题,请参考以下文章

Flink1.15源码解析--启动JobManager----ResourceManager启动

Flink1.15源码解析--启动JobManager----ResourceManager启动

Flink1.15源码解析--启动JobManager----Dispatcher启动

Flink1.15源码解析--启动JobManager----Dispatcher启动

Flink1.15源码解析

Flink1.15源码解析--启动JobManager----WebMonitorEndpoint启动