Flink1.15源码解析--启动JobManager

Posted 宝哥大数据[离职找工作中,大佬帮内推下]

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flink1.15源码解析--启动JobManager相关的知识,希望对你有一定的参考价值。

文章目录

一、前言

从上文 Flink1.15源码解析–启动脚本----start-cluster.sh 我们知道 JobManager 的入口类为org.apache.flink.runtime.entrypoint.StandaloneSessionClusterEntrypoint,
JobManager的入口类实际有三个

本文我们将 以 StandaloneSessionClusterEntrypoint 为例查看 JobManager 启动的整个流程。

main() 主要做了4件事:

  • 1、解析提交作业命令的参数
  • 2、解析flink-conf.yaml配置文件
  • 3、创建 ClusterEntrypoint
  • 4、启动 ClusterEntrypoint
    public static void main(String[] args) 
        // startup checks and logging
        EnvironmentInformation.logEnvironmentInfo(
                LOG, StandaloneSessionClusterEntrypoint.class.getSimpleName(), args);
        SignalHandler.register(LOG);
        JvmShutdownSafeguard.installAsShutdownHook(LOG);

        // 1、加载入口参数
        final EntrypointClusterConfiguration entrypointClusterConfiguration =
                ClusterEntrypointUtils.parseParametersOrExit(
                        args,
                        new EntrypointClusterConfigurationParserFactory(),
                        StandaloneSessionClusterEntrypoint.class);
        // 2、 加载 flink-conf.yaml
        Configuration configuration = loadConfiguration(entrypointClusterConfiguration);

        // 3、创建 ClusterEntrypoint
        StandaloneSessionClusterEntrypoint entrypoint =
                new StandaloneSessionClusterEntrypoint(configuration);

        // 4、启动
        ClusterEntrypoint.runClusterEntrypoint(entrypoint);
    

二、解析提交作业命令的参数

三、解析flink-conf.yaml配置文件

四、创建主节点

五、启动主节点

        // 4、启动
        ClusterEntrypoint.runClusterEntrypoint(entrypoint);

继续,会发现调用的是父类的 startCluster()

    public static void runClusterEntrypoint(ClusterEntrypoint clusterEntrypoint) 

        final String clusterEntrypointName = clusterEntrypoint.getClass().getSimpleName();
        try 
            // 调用父类的 startCluster() 启动集群
            clusterEntrypoint.startCluster();
         catch (ClusterEntrypointException e) 
            LOG.error(
                    String.format("Could not start cluster entrypoint %s.", clusterEntrypointName),
                    e);
            System.exit(STARTUP_FAILURE_RETURN_CODE);
        

        int returnCode;
        Throwable throwable = null;

        try 
            returnCode = clusterEntrypoint.getTerminationFuture().get().processExitCode();
         catch (Throwable e) 
            throwable = ExceptionUtils.stripExecutionException(e);
            returnCode = RUNTIME_FAILURE_RETURN_CODE;
        

        LOG.info(
                "Terminating cluster entrypoint process  with exit code .",
                clusterEntrypointName,
                returnCode,
                throwable);
        System.exit(returnCode);
    

startCluster() 的详细了流程见 Flink1.15源码解析-- ClusterEntrypoint----startCluster

五、总结

关于Flink的主节点 JobManager,他只是一个逻辑上的主节点,针对不同的部署模式,主节点的实现类也不同。

JobManager(逻辑)有三大核心内容,分别为ResourceManager、Dispatcher和WebmonitorEndpoin:

ResourceManager

  • Flink集群的资源管理器,只有一个,关于Slot的管理和申请等工作,都有它负责

Dispatcher

  • 1、负责接收用户提交的JobGraph,然后启动一个JobMaster,类似于Yarn中的AppMaster和Spark中的Driver。

  • 2、内有一个持久服务:JobGraphStore,负责存储JobGraph。当构建执行图或物理执行图时主节点宕机并恢复,则可以从这里重新拉取作业JobGraph

WebMonitorEndpoint

  • Rest服务,内部有一个Netty服务,客户端的所有请求都由该组件接收处理

用一个例子来描述这三个组件的功能:

当Client提交一个Job到集群时(Client会把Job构建成一个JobGraph),主节点接收到提交的job的Rest请求后,WebMonitorEndpoint 会通过 Router 进行解析找到对应的 Handler 来执行处理,处理完毕后交由 Dispatcher,Dispatcher 负责大气 JobMaster 来负责这个Job内部的Task的部署执行,执行Task所需的资源,JobMaster向ResourceManager申请。

返回Flink1.15源码解析-总目录

以上是关于Flink1.15源码解析--启动JobManager的主要内容,如果未能解决你的问题,请参考以下文章

Flink1.15源码解析--启动JobManager----ResourceManager启动

Flink1.15源码解析--启动JobManager----ResourceManager启动

Flink1.15源码解析--启动JobManager----Dispatcher启动

Flink1.15源码解析--启动JobManager----Dispatcher启动

Flink1.15源码解析

Flink1.15源码解析--启动JobManager----WebMonitorEndpoint启动