Flink1.15源码解析--启动JobManager----ResourceManager启动

Posted 宝哥大数据[离职找工作中,大佬帮内推下]

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flink1.15源码解析--启动JobManager----ResourceManager启动相关的知识,希望对你有一定的参考价值。

一、前言

从上文 Flink1.15源码解析---- DispatcherResourceManagerComponent 我们知道ResourceManagerServiceImpl 的创建及启动

二、ResourceManagerServiceImpl 创建

//org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory#create
            resourceManagerService =
                    ResourceManagerServiceImpl.create(
                            resourceManagerFactory,
                            configuration,
                            resourceId,
                            rpcService,
                            highAvailabilityServices,
                            heartbeatServices,
                            fatalErrorHandler,
                            new ClusterInformation(hostname, blobServer.getPort()),
                            webMonitorEndpoint.getRestBaseUrl(),
                            metricRegistry,
                            hostname,
                            ioExecutor);


   // org.apache.flink.runtime.resourcemanager.ResourceManagerServiceImpl#create
    public static ResourceManagerServiceImpl create(
            ResourceManagerFactory<?> resourceManagerFactory,
            Configuration configuration,
            ResourceID resourceId,
            RpcService rpcService,
            HighAvailabilityServices highAvailabilityServices,
            HeartbeatServices heartbeatServices,
            FatalErrorHandler fatalErrorHandler,
            ClusterInformation clusterInformation,
            @Nullable String webInterfaceUrl,
            MetricRegistry metricRegistry,
            String hostname,
            Executor ioExecutor)
            throws ConfigurationException 

        return new ResourceManagerServiceImpl(
                resourceManagerFactory,
                resourceManagerFactory.createResourceManagerProcessContext(
                        configuration,
                        resourceId,
                        rpcService,
                        highAvailabilityServices,
                        heartbeatServices,
                        fatalErrorHandler,
                        clusterInformation,
                        webInterfaceUrl,
                        metricRegistry,
                        hostname,
                        ioExecutor));
    



    private ResourceManagerServiceImpl(
            ResourceManagerFactory<?> resourceManagerFactory,
            ResourceManagerProcessContext rmProcessContext) 
        this.resourceManagerFactory = checkNotNull(resourceManagerFactory);
        this.rmProcessContext = checkNotNull(rmProcessContext);

        this.leaderElectionService =
                rmProcessContext
                        .getHighAvailabilityServices()
                        .getResourceManagerLeaderElectionService();
        this.fatalErrorHandler = rmProcessContext.getFatalErrorHandler();
        this.ioExecutor = rmProcessContext.getIoExecutor();

        this.handleLeaderEventExecutor = Executors.newSingleThreadExecutor();
        this.serviceTerminationFuture = new CompletableFuture<>();

        this.running = false;
        // ResourceManager
        this.leaderResourceManager = null;
        this.leaderSessionID = null;
        this.previousResourceManagerTerminationFuture = FutureUtils.completedVoidFuture();
    

1.1、LeaderElectionService 初始化

        this.leaderElectionService =
                rmProcessContext
                        .getHighAvailabilityServices()
                        .getResourceManagerLeaderElectionService();
	
	// org.apache.flink.runtime.resourcemanager.ResourceManagerProcessContext#getHighAvailabilityServices
    public HighAvailabilityServices getHighAvailabilityServices() 
        return highAvailabilityServices;
    

	// org.apache.flink.runtime.highavailability.AbstractHaServices#getResourceManagerLeaderElectionService
	// 以 AbstractHaServices 为例, 查看子类实例
    @Override
    public LeaderElectionService getResourceManagerLeaderElectionService() 
        return createLeaderElectionService(getLeaderPathForResourceManager());
    

	// org.apache.flink.runtime.highavailability.zookeeper.ZooKeeperHaServices#createLeaderElectionService
    @Override
    protected LeaderElectionService createLeaderElectionService(String leaderPath) 
        return ZooKeeperUtils.createLeaderElectionService(getCuratorFramework(), leaderPath);
    
	
	// org.apache.flink.runtime.util.ZooKeeperUtils#createLeaderElectionService 
	public static DefaultLeaderElectionService createLeaderElectionService(
            final CuratorFramework client, final String path) 
        return new DefaultLeaderElectionService(createLeaderElectionDriverFactory(client, path));
    

highAvailabilityServices 就是 ClusterEntrypoint初始化的HA服务, 以 ZooKeeperHaServices 为例, 创建 DefaultLeaderElectionService

二、ResourceManagerServiceImpl 启动

// org/apache/flink/runtime/entrypoint/component/DefaultDispatcherResourceManagerComponentFactory#create
           log.debug("Starting ResourceManagerService.");
            resourceManagerService.start();

实际是

    @Override
    public void start() throws Exception 
        synchronized (lock) 
            if (running) 
                LOG.debug("Resource manager service has already started.");
                return;
            
            running = true;
        

        LOG.info("Starting resource manager service.");
	·	// 调用 DefaultLeaderElectionService.start, 传入 ResourceManagerServiceImpl 
        leaderElectionService.start(this);
    

接下来就是选举服务了, 和 WebMonitorEndpoint选举流程一样

以上是关于Flink1.15源码解析--启动JobManager----ResourceManager启动的主要内容,如果未能解决你的问题,请参考以下文章

Flink1.15源码解析--启动JobManager----ResourceManager启动

Flink1.15源码解析--启动JobManager----ResourceManager启动

Flink1.15源码解析--启动JobManager----Dispatcher启动

Flink1.15源码解析--启动JobManager----Dispatcher启动

Flink1.15源码解析

Flink1.15源码解析--启动JobManager----WebMonitorEndpoint启动