Flink1.15源码解析--启动JobManager----Dispatcher启动

Posted 宝哥大数据

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flink1.15源码解析--启动JobManager----Dispatcher启动相关的知识,希望对你有一定的参考价值。

文章目录


一、前言
从上文 Flink1.15源码解析---- DispatcherResourceManagerComponent 我们知道ResourceManagerServiceImpl 的创建及启动大概流程

接下来我们详细的梳理一下

一、DispatcherRunner 创建

// org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory#create
            final HistoryServerArchivist historyServerArchivist =
                    HistoryServerArchivist.createHistoryServerArchivist(
                            configuration, webMonitorEndpoint, ioExecutor);

            final DispatcherOperationCaches dispatcherOperationCaches =
                    new DispatcherOperationCaches(
                            configuration.get(RestOptions.ASYNC_OPERATION_STORE_DURATION));

            final PartialDispatcherServices partialDispatcherServices =
                    new PartialDispatcherServices(
                            configuration,
                            highAvailabilityServices,
                            resourceManagerGatewayRetriever,
                            blobServer,
                            heartbeatServices,
                            () ->
                                    JobManagerMetricGroup.createJobManagerMetricGroup(
                                            metricRegistry, hostname),
                            executionGraphInfoStore,
                            fatalErrorHandler,
                            historyServerArchivist,
                            metricRegistry.getMetricQueryServiceGatewayRpcAddress(),
                            ioExecutor,
                            dispatcherOperationCaches);

            log.debug("Starting Dispatcher.");
            dispatcherRunner =
                    dispatcherRunnerFactory.createDispatcherRunner(
                            highAvailabilityServices.getDispatcherLeaderElectionService(),
                            fatalErrorHandler,
                            new HaServicesJobPersistenceComponentFactory(highAvailabilityServices),
                            ioExecutor,
                            rpcService,
                            partialDispatcherServices);

通过工厂 DefaultDispatcherRunnerFactory 创建 DispatcherRunner

// org.apache.flink.runtime.dispatcher.runner.DefaultDispatcherRunnerFactory#createDispatcherRunner
    @Override
    public DispatcherRunner createDispatcherRunner(
            LeaderElectionService leaderElectionService,
            FatalErrorHandler fatalErrorHandler,
            JobPersistenceComponentFactory jobPersistenceComponentFactory,
            Executor ioExecutor,
            RpcService rpcService,
            PartialDispatcherServices partialDispatcherServices)
            throws Exception 

        final DispatcherLeaderProcessFactory dispatcherLeaderProcessFactory =
                dispatcherLeaderProcessFactoryFactory.createFactory(
                        jobPersistenceComponentFactory,
                        ioExecutor,
                        rpcService,
                        partialDispatcherServices,
                        fatalErrorHandler);

        return DefaultDispatcherRunner.create(
                leaderElectionService, fatalErrorHandler, dispatcherLeaderProcessFactory);
    

1.1、LeaderElectionService 初始化

 highAvailabilityServices.getDispatcherLeaderElectionService()

// org.apache.flink.runtime.highavailability.AbstractHaServices#getDispatcherLeaderElectionService
    @Override
    public LeaderElectionService getDispatcherLeaderElectionService() 
        return createLeaderElectionService(getLeaderPathForDispatcher());
    


highAvailabilityServices 就是 ClusterEntrypoint 初始化的HA服务, 以 ZooKeeperHaServices 为例 DefaultLeaderElectionService

    @Override
    protected LeaderElectionService createLeaderElectionService(String leaderPath) 
        return ZooKeeperUtils.createLeaderElectionService(getCuratorFramework(), leaderPath);
    
    public static DefaultLeaderElectionService createLeaderElectionService(
            final CuratorFramework client, final String path) 
        return new DefaultLeaderElectionService(createLeaderElectionDriverFactory(client, path));
    

以上是关于Flink1.15源码解析--启动JobManager----Dispatcher启动的主要内容,如果未能解决你的问题,请参考以下文章

Flink1.15源码解析--启动JobManager----ResourceManager启动

Flink1.15源码解析--启动JobManager----ResourceManager启动

Flink1.15源码解析--启动JobManager----Dispatcher启动

Flink1.15源码解析--启动JobManager----Dispatcher启动

Flink1.15源码解析

Flink1.15源码解析--启动JobManager----WebMonitorEndpoint启动