flinkFlink 1.12.2 源码浅析 : yarn-per-job模式解析 JobMasger启动 YarnJobClusterEntrypoint

Posted 九师兄

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了flinkFlink 1.12.2 源码浅析 : yarn-per-job模式解析 JobMasger启动 YarnJobClusterEntrypoint相关的知识,希望对你有一定的参考价值。

1.概述

转载:Flink 1.12.2 源码浅析 : yarn-per-job模式解析 [三]

上一章:【flink】Flink 1.12.2 源码浅析 : yarn-per-job模式解析 yarn 提交过程解析

整体流程图

二 .代码分析


上节我们看到 yarnClient.submitApplication(appContext); 将任务提交到yarn集群.

在AppMaster中执行的程序的代码如下 :

$JAVA_HOME/bin/java 
-Xmx1073741824 
-Xms1073741824 
-XX:MaxMetaspaceSize=268435456 
-Dlog.file="<LOG_DIR>/jobmanager.log" 
-Dlog4j.configuration=file:log4j.properties 
-Dlog4j.configurationFile=file:log4j.properties 

org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint 

-D jobmanager.memory.off-heap.size=134217728b 
-D jobmanager.memory.jvm-overhead.min=201326592b 
-D jobmanager.memory.jvm-metaspace.size=268435456b 
-D jobmanager.memory.heap.size=1073741824b 
-D jobmanager.memory.jvm-overhead.max=201326592b 1> <LOG_DIR>/jobmanager.out 2> <LOG_DIR>/jobmanager.err

其中入口类为 : org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint

所以,我们就从这个类开始看…

2.1. YarnJobClusterEntrypoint#main

AM的程序入口就是 YarnJobClusterEntrypoint#main 方法.

  1. 加载配置
  2. 构建YarnJobClusterEntrypoint
  3. ClusterEntrypoint.runClusterEntrypoint(yarnJobClusterEntrypoint);
   // ------------------------------------------------------------------------
    //  The executable entry point for the Yarn Application Master Process
    //  for a single Flink job.
    // ------------------------------------------------------------------------

    public static void main(String[] args) {
        // startup checks and logging
        EnvironmentInformation.logEnvironmentInfo(
                LOG, YarnJobClusterEntrypoint.class.getSimpleName(), args);
        SignalHandler.register(LOG);
        JvmShutdownSafeguard.installAsShutdownHook(LOG);

        Map<String, String> env = System.getenv();

        final String workingDirectory = env.get(ApplicationConstants.Environment.PWD.key());

        Preconditions.checkArgument(
                workingDirectory != null,
                "Working directory variable (%s) not set",
                ApplicationConstants.Environment.PWD.key());

        try {
            YarnEntrypointUtils.logYarnEnvironmentInformation(env, LOG);
        } catch (IOException e) {
            LOG.warn("Could not log YARN environment information.", e);
        }

        final Configuration dynamicParameters =
                ClusterEntrypointUtils.parseParametersOrExit(
                        args,
                        new DynamicParametersConfigurationParserFactory(),
                        YarnJobClusterEntrypoint.class);

        // 1.构建配置
        final Configuration configuration =
                YarnEntrypointUtils.loadConfiguration(workingDirectory, dynamicParameters, env);

        // 2.构建YarnJobClusterEntrypoint
        YarnJobClusterEntrypoint yarnJobClusterEntrypoint = new YarnJobClusterEntrypoint(configuration);

        // 3.启动
        ClusterEntrypoint.runClusterEntrypoint(yarnJobClusterEntrypoint);
    }

2.2. ClusterEntrypoint#runCluster

ClusterEntrypoint.runClusterEntrypoint(yarnJobClusterEntrypoint); 之后会进行多次跳转, 最终调用ClusterEntrypoint#runCluster方法启动集群.

在这个方法里面通过createDispatcherResourceManagerComponentFactory$create 创建JobManager内的组件 : Dispatcher / ResourceManager / JobMaster

// 执行
ClusterEntrypoint#runClusterEntrypoint
		--> ClusterEntrypoint#startCluster();
					--> ClusterEntrypoint#runCluster(configuration, pluginManager);



    private void runCluster(Configuration configuration, PluginManager pluginManager)
            throws Exception {
        synchronized (lock) {

            // 初始化插件...
            // 构建各种服务, 比如心跳,RPC ....
            initializeServices(configuration, pluginManager);

            // 在配置中写入host和port 信息
            // write host information into configuration
            configuration.setString(JobManagerOptions.ADDRESS, commonRpcService.getAddress());
            configuration.setInteger(JobManagerOptions.PORT, commonRpcService.getPort());


            // 创建JobManager内的组件 : Dispatcher / ResourceManager / JobMaster
            // 构建 DispatcherResourceManagerComponentFactory
            final DispatcherResourceManagerComponentFactory
                    dispatcherResourceManagerComponentFactory =
                            createDispatcherResourceManagerComponentFactory(configuration);

            // 构建集群 组件 DispatcherResourceManagerComponent
            clusterComponent =
                    dispatcherResourceManagerComponentFactory.create(
                            configuration,
                            ioExecutor,
                            commonRpcService,
                            haServices,
                            blobServer,
                            heartbeatServices,
                            metricRegistry,
                            archivedExecutionGraphStore,
                            new RpcMetricQueryServiceRetriever(
                                    metricRegistry.getMetricQueryServiceRpcService()),
                            this);

            // 关闭操作...
            clusterComponent
                    .getShutDownFuture()
                    .whenComplete(
                            (ApplicationStatus applicationStatus, Throwable throwable) -> {
                                if (throwable != null) {
                                    shutDownAsync(
                                            ApplicationStatus.UNKNOWN,
                                            ExceptionUtils.stringifyException(throwable),
                                            false);
                                } else {
                                    // This is the general shutdown path. If a separate more
                                    // specific shutdown was
                                    // already triggered, this will do nothing
                                    shutDownAsync(applicationStatus, null, true);
                                }
                            });
        }
    }


2.3. DefaultDispatcherResourceManagerComponentFactory#create

核心的服务构建&启动都在这里,
包括ResourceManager , Dispatcher , JobManager 以及WEB UI , History Server 相关服务 .


    @Override
    public DispatcherResourceManagerComponent create(
            Configuration configuration,
            Executor ioExecutor,
            RpcService rpcService,
            HighAvailabilityServices highAvailabilityServices,
            BlobServer blobServer,
            HeartbeatServices heartbeatServices,
            MetricRegistry metricRegistry,
            ArchivedExecutionGraphStore archivedExecutionGraphStore,
            MetricQueryServiceRetriever metricQueryServiceRetriever,
            FatalErrorHandler fatalErrorHandler)
            throws Exception {

        LeaderRetrievalService dispatcherLeaderRetrievalService = null;
        LeaderRetrievalService resourceManagerRetrievalService = null;
        WebMonitorEndpoint<?> webMonitorEndpoint = null;
        ResourceManager<?> resourceManager = null;
        DispatcherRunner dispatcherRunner = null;

        try {

            // Dispatcher 高可用相关
            dispatcherLeaderRetrievalService =
                    highAvailabilityServices.getDispatcherLeaderRetriever();

            // ResourceManager 高可用相关
            resourceManagerRetrievalService =
                    highAvailabilityServices.getResourceManagerLeaderRetriever();

            // Dispatcher 网关相关
            final LeaderGatewayRetriever<DispatcherGateway> dispatcherGatewayRetriever =
                    new RpcGatewayRetriever<>(
                            rpcService,
                            DispatcherGateway.class,
                            DispatcherId::fromUuid,
                            new ExponentialBackoffRetryStrategy(
                                    12, Duration.ofMillis(10), Duration.ofMillis(50)));
            // ResourceManager 网关相关
            final LeaderGatewayRetriever<ResourceManagerGateway> resourceManagerGatewayRetriever =
                    new RpcGatewayRetriever<>(
                            rpcService,
                            ResourceManagerGateway.class,
                            ResourceManagerId::fromUuid,
                            new ExponentialBackoffRetryStrategy(
                                    12, Duration.ofMillis(10), Duration.ofMillis(50)));


            // 构建  Executor
            final ScheduledExecutorService executor =
                    WebMonitorEndpoint.createExecutorService(
                            configuration.getInteger(RestOptions.SERVER_NUM_THREADS),
                            configuration.getInteger(RestOptions.SERVER_THREAD_PRIORITY),
                            "DispatcherRestEndpoint");

            // 10000L
            final long updateInterval =
                    configuration.getLong(MetricOptions.METRIC_FETCHER_UPDATE_INTERVAL);

            final MetricFetcher metricFetcher =
                    updateInterval == 0
                            ? VoidMetricFetcher.INSTANCE
                            : MetricFetcherImpl.fromConfiguration(
                                    configuration,
                                    metricQueryServiceRetriever,
                                    dispatcherGatewayRetriever,
                                    executor);

            // WEB UI 相关服务
            webMonitorEndpoint =
                    restEndpointFactory.createRestEndpoint(
                            configuration,
                            dispatcherGatewayRetriever,
                            resourceManagerGatewayRetriever,
                            blobServer,
                            executor,
                            metricFetcher,
                            highAvailabilityServices.getClusterRestEndpointLeaderElectionService(),
                            fatalErrorHandler);

            log.debug("Starting Dispatcher REST endpoint.");
            webMonitorEndpoint.start();


            // 获取主机名称
            final String hostname = RpcUtils.getHostname(rpcService);

            // 获取resourceManager
            resourceManager =
                    resourceManagerFactory.createResourceManager(
                            configuration,
                            ResourceID.generate(),
                            rpcService,
                            highAvailabilityServices,
                            heartbeatServices,
                            fatalErrorHandler,
                            new ClusterInformation(hostname, blobServer.getPort()),
                            webMonitorEndpoint.getRestBaseUrl(),
                            metricRegistry,
                            hostname,
                            ioExecutor);

            // 获取 history server 相关
            final HistoryServerArchivist historyServerArchivist =
                    HistoryServerArchivist.createHistoryServerArchivist(
                            configuration, webMonitorEndpoint, ioExecutor);

            final PartialDispatcherServices partialDispatcherServices =
                    new PartialDispatcherServices(
                            configuration,
                            highAvailabilityServices,
                            resourceManagerGatewayRetriever,
                            blobServer,
                            heartbeatServices,
                            () ->
                                    MetricUtils.instantiateJobManagerMetricGroup(
                                            metricRegistry, hostname),
                            archivedExecutionGraphStore,
                            fatalErrorHandler,
                            historyServerArchivist,
                            metricRegistry.getMetricQueryServiceGatewayRpcAddress(),
                            ioExecutor);

            // 创建/启动   Dispatcher : dispatcher会创建和启动JobManager
            log.debug("Starting Dispatcher.");
            dispatcherRunner =
                    dispatcherRunnerFactory.createDispatcherRunner(
                            highAvailabilityServices.getDispatcherLeaderElectionService(),
                            fatalErrorHandler,
                            new HaServicesJobGraphStoreFactory(highAvailabilityServices),
                            ioExecutor,
                            rpcService,
                            partialDispatcherServices);

            // 启动 ResourceManager
            log.debug("Starting ResourceManager.");
            resourceManager.start();

            resourceManagerRetrievalService.start(resourceManagerGatewayRetriever);
            dispatcherLeaderRetrievalService.start(dispatcherGatewayRetriever);

            return new DispatcherResourceManagerComponent(
                    dispatcherRunner,
                    DefaultResourceManagerService.createFor(resourceManager),
                    dispatcherLeaderRetrievalService,
                    resourceManagerRetrievalService,
                    webMonitorEndpoint,
                    fatalErrorHandler);

        } catch (Exception exception) {
            // clean up all started components
            if (dispatcherLeaderRetrievalService != null) {
                try {
                    dispatcherLeaderRetrievalService.stop();
                } catch (Exception e) {
                    exception = ExceptionUtils.firstOrSuppressed(e, exception);
                }
            }

            if (resourceManagerRetrievalService != null) {
                try {
                    resourceManagerRetrievalService.stop();
                } catch (Exception e) {
                    exception = ExceptionUtils.firstOrSuppressed(e, exception);
                }
            }

            final Collection<CompletableFuture<Void>> terminationFutures = new ArrayList<>(3);

            if (webMonitorEndpoint != null) {
                terminationFutures.add(webMonitorEndpoint.closeAsync());
            }

            if (resourceManager != null) {
                terminationFutures.add(resourceManager.closeAsync());
            }

            if (dispatcherRunner != null) {
                terminationFutures.add(dispatcherRunner.closeAsync());
            }

            final FutureUtils.ConjunctFuture<Void> terminationFuture =
                    FutureUtils.completeAll(terminationFutures);

            try {
                terminationFuture.get();
            } catch (Exception e) {
                exception = ExceptionUtils.firstOrSuppressed(e, exception);
            }

            throw new FlinkException(
                    "Could not create the DispatcherResourceManagerComponent.", exception);
        }
    }

三 . Dispatcher 相关

Dispatcher 主要有两个用途.

  1. 接收用户代码
  2. 创建/启动JobManager
  3. 具体的实现类是DispatcherRunner .
dispatcherRunnerFactory.createDispatcherRunner(
                            highAvailabilityServices.getDispatcherLeaderElectionService(),
                            fatalErrorHandler,
                            new HaServicesJobGraphStoreFactory(highAvailabilityServices),
                            ioExecutor,
                            rpcService,
                            partialDispatcherServices);

3.1. 构建

调用的入口是DefaultDispatcherResourceManagerComponentFactory#create.
由DispatcherRunner 实现.
DispatcherRunner 的构建是通过DefaultDispatcherRunnerFactory#createDispatcherRunner 作为入口创建的.

实现类是 : DefaultDispatcherRunner.create

DefaultDispatcherResourceManagerComponentFactory#create

3.2. 启动

启动的入口是DefaultDispatcherRunnerFactory#createDispatcherRunner 最终不断的跳转到了
DefaultDispatcherRunner#startNewDispatcherLeaderProcess方法
JobDispatcherLeaderProcess#onStart

    private void startNewDispatcherLeaderProcess(UUID leaderSessionID) {

        // 停止之前的DispatcherLeader 进程
        stopDispatcherLeaderProcess();

        // 构建新的 DispatcherLeader 进程
        dispatcherLeaderProcess = createNewDispatcherLeaderProcess(leaderSessionID);

        final DispatcherLeaderProcess newDispatcherLeaderProcess = dispatcherLeaderProcess;

        // 启动
        FutureUtils.assertNoException(
                previousDispatcherLeaderProcessTerminationFuture.thenRun(
                        newDispatcherLeaderProcess::start));
    }

调用JobDispatcherLeaderProcess#onStart 方法…

    @Override
    protected void onStart() {
        // create
        final DispatcherGatewayService dispatcherService =
                dispatcherGatewayServiceFactory.create(
                        DispatcherId.fromUuid(getLeaderSessionId()),
                        Collections.singleton(jobGraph),
                        ThrowingJobGraphWriter.INSTANCE);

        completeDispatcherSetup(dispatcherService);
    }

最终由 DefaultDispatcherGatewayServiceFactory#create 创建/启动Dispatcher

public AbstractDispatcherLeaderProcess.DispatcherGatewayService create(
            DispatcherId fencingToken,
            Collection<JobGraph> recoveredJobs,
            JobGraphWriter jobGraphWriter) {

        // 定义Dispatcher
        final Dispatcher dispatcher;
        try {
            //创建Dispatcher
            dispatcher =
                    dispatcherFactory.createDispatcher(
                            rpcService,
                            fencingToken,
                            recoveredJobs,
                            (dispatcherGateway, scheduledExecutor, errorHandler) ->
                                    new NoOpDispatcherBootstrap(),
                            PartialDispatcherServicesWithJobGraphStore.from(
                                    partialDispatcherServices, jobGraphWriter));
        } catch (Exception e) {
            throw new FlinkRuntimeException("Could not create the Dispatcher rpc endpoint.", e);
        }

        // 启动 Dispatcher :  Dispatcher#OnStart
        //      1. 接收用户作业
        //      2. 启动JobMaster
        dispatcher.start();

        return DefaultDispatcherGatewayService.from(dispatcher);
    }

四 .JobManager 相关

4.1. 启动

JobManager是由Dispatcher的onStart方法作为入口,进行启动的.
执行的方法为 : Dispatcher#startRecoveredJobs

    private void startRecoveredJobs() {
        // 处理需要恢复的Job
        for (flinkFlink 1.12.2 源码浅析 : Task 浅析

flinkFlink 1.12.2 源码浅析 : Task数据输入

flinkFlink 1.12.2 源码浅析 :Task数据输出

flinkFlink 1.12.2 源码浅析 : yarn-per-job模式解析 TaskMasger 启动

flinkFlink 1.12.2 源码浅析 : yarn-per-job模式解析 yarn 提交过程解析

flinkFlink 1.12.2 源码浅析 : yarn-per-job模式解析 JobMasger启动 YarnJobClusterEntrypoint