Flink1.15源码解析---- ClusterEntrypoint

Posted 宝哥大数据[离职找工作中,大佬帮内推下]

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flink1.15源码解析---- ClusterEntrypoint相关的知识,希望对你有一定的参考价值。

文章目录

Flink集群入口点的基类。

一、属性


    public static final ConfigOption<String> INTERNAL_CLUSTER_EXECUTION_MODE =
            ConfigOptions.key("internal.cluster.execution-mode")
                    .stringType()
                    .defaultValue(ExecutionMode.NORMAL.toString());

    protected static final Logger LOG = LoggerFactory.getLogger(ClusterEntrypoint.class);

    protected static final int STARTUP_FAILURE_RETURN_CODE = 1;
    protected static final int RUNTIME_FAILURE_RETURN_CODE = 2;

    private static final Time INITIALIZATION_SHUTDOWN_TIMEOUT = Time.seconds(30L);

    /** The lock to guard startup / shutdown / manipulation methods. */
    private final Object lock = new Object();

    private final Configuration configuration;

    private final CompletableFuture<ApplicationStatus> terminationFuture;

    private final AtomicBoolean isShutDown = new AtomicBoolean(false);

    @GuardedBy("lock")
    private DeterminismEnvelope<ResourceID> resourceId;

	/**
	* 封装 Dispatcher、ResourceManager、WebMonitorEndpoint 三大组件
 	* Component which starts a @link Dispatcher, @link ResourceManager and @link
 	* WebMonitorEndpoint in the same process.
 	*/
    @GuardedBy("lock")
    private DispatcherResourceManagerComponent clusterComponent;


	// 八大服务================================ start
    @GuardedBy("lock")
    private MetricRegistryImpl metricRegistry;

    @GuardedBy("lock")
    private ProcessMetricGroup processMetricGroup;

    @GuardedBy("lock")
    private HighAvailabilityServices haServices;

    @GuardedBy("lock")
    private BlobServer blobServer;

    @GuardedBy("lock")
    private HeartbeatServices heartbeatServices;

    @GuardedBy("lock")
    private RpcService commonRpcService;

    @GuardedBy("lock")
    private ExecutorService ioExecutor;
    
    private ExecutionGraphInfoStore executionGraphInfoStore;
    // 八大服务================================ end

    private RpcSystem rpcSystem;
    
    @GuardedBy("lock")
    private DeterminismEnvelope<WorkingDirectory> workingDirectory;
	
	private final Thread shutDownHook;

1.1、DispatcherResourceManagerComponent - 三大组件

1.2、八大服务

1.2.1、RpcService

基于Akka的RpcService实现。内部包装了ActorSystem,这个服务其实就是一个tcp的Rpc服务,端口为:6123

1.2.2、JMXService

启动一个JMXService,用于客户端连接JobManager JVM监控

1.2.3、 ExecutorService

启动一个线程池,大小为当前节点cpu核心数*4

1.2.4、HighAvailabilityServices

初始化一个基于Zookeeper的HA服务—ZookeeperHaServices ,提供对高可用性所需的所有服务的访问注册,分布式计数器和领导人选举

1.2.5、BlobServer

初始化大文件存储BlobServer服务端,所谓大文件例如上传Flink-job的jar时所依赖的一些需要一起上传的jar,或者TaskManager上传的log文件等

1.2.6、HeartbeatServices

提供心跳所需的所有服务,这包括创建心跳接收器和心跳发送者。

1.2.7、MetricRegistryImpl

启动Metric(性能监控) 相关服务,内部也是启动一个ActorSystem ,跟踪所有已注册的Metric,他作为连接MetricGroup和MetricReporter

1.2.8、ExecutionGraphInfoStore

存储执行图ExecutionGraph的可序列化形式。注意此处并不是JobGraphStore,JobGraphStore会在Dispatcher启动时启动。

二、初始化

2.1、创建 ClusterEntrypoint 对象

	// 子类创建对象
    public StandaloneSessionClusterEntrypoint(Configuration configuration) 
        super(configuration);
    

	// 通过 super(configuration) 实例化
    protected ClusterEntrypoint(Configuration configuration) 
        this.configuration = generateClusterConfiguration(configuration);
        this.terminationFuture = new CompletableFuture<>();

        if (configuration.get(JobManagerOptions.SCHEDULER_MODE) == SchedulerExecutionMode.REACTIVE
                && !supportsReactiveMode()) 
            final String msg =
                    "Reactive mode is configured for an unsupported cluster type. At the moment, reactive mode is only supported by standalone application clusters (bin/standalone-job.sh).";
            // log message as well, otherwise the error is only shown in the .out file of the
            // cluster
            LOG.error(msg);
            throw new IllegalConfigurationException(msg);
        

        shutDownHook =
                ShutdownHookUtil.addShutdownHook(
                        () -> this.closeAsync().join(), getClass().getSimpleName(), LOG);
    

三、startCluster

3.1、进入 satartCluster

public void startCluster() throws ClusterEntrypointException 
        LOG.info("Starting .", getClass().getSimpleName());

        try 
            FlinkSecurityManager.setFromConfiguration(configuration);
            PluginManager pluginManager =
                    PluginUtils.createPluginManagerFromRootFolder(configuration);
            configureFileSystems(configuration, pluginManager);

            SecurityContext securityContext = installSecurityContext(configuration);

            ClusterEntrypointUtils.configureUncaughtExceptionHandler(configuration);
            // 通过安全上下文启动
            securityContext.runSecured(
                    (Callable<Void>)
                            () -> 
                                runCluster(configuration, pluginManager);

                                return null;
                            );

         catch (Throwable t) 
            final Throwable strippedThrowable =
                    ExceptionUtils.stripException(t, UndeclaredThrowableException.class);

            try 
                // clean up any partial state
                shutDownAsync(
                                ApplicationStatus.FAILED,
                                ShutdownBehaviour.GRACEFUL_SHUTDOWN,
                                ExceptionUtils.stringifyException(strippedThrowable),
                                false)
                        .get(
                                INITIALIZATION_SHUTDOWN_TIMEOUT.toMilliseconds(),
                                TimeUnit.MILLISECONDS);
             catch (InterruptedException | ExecutionException | TimeoutException e) 
                strippedThrowable.addSuppressed(e);
            

            throw new ClusterEntrypointException(
                    String.format(
                            "Failed to initialize the cluster entrypoint %s.",
                            getClass().getSimpleName()),
                    strippedThrowable);
        
    

通过安全上下文启动 调用 runCluster 这个过程 和 flink run 提交job 有些类似

接下来我们看 runCluster

3.2、runCluster

runCluster 主要完成下面几个功能:

  • 1、初始化服务
  • 2、将jobmanager的host、port加入配置
  • 3、 创建工厂
  • 4、通过工厂创建 clusterComponent (三大组件封装),并启动
  • 5、结束返回回调消息
 private void runCluster(Configuration configuration, PluginManager pluginManager)
            throws Exception 
        synchronized (lock) 
            // 1、初始化服务
            initializeServices(configuration, pluginManager);

            // write host information into configuration
            // 2、将jobmanager的host、port加入配置
            configuration.setString(JobManagerOptions.ADDRESS, commonRpcService.getAddress());
            configuration.setInteger(JobManagerOptions.PORT, commonRpcService.getPort());

            // 3、 创建工厂
            final DispatcherResourceManagerComponentFactory
                    dispatcherResourceManagerComponentFactory =
                            createDispatcherResourceManagerComponentFactory(configuration);

            // 4、通过工厂创建 clusterComponent (三大组件封装),并启动
            clusterComponent =
                    dispatcherResourceManagerComponentFactory.create(
                            configuration,
                            resourceId.unwrap(),
                            ioExecutor,
                            commonRpcService,
                            haServices,
                            blobServer,
                            heartbeatServices,
                            metricRegistry,
                            executionGraphInfoStore,
                            new RpcMetricQueryServiceRetriever(
                                    metricRegistry.getMetricQueryServiceRpcService()),
                            this);

            // 5、结束,
            clusterComponent
                    .getShutDownFuture()
                    .whenComplete(
                            (ApplicationStatus applicationStatus, Throwable throwable) -> 
                                if (throwable != null) 
                                    shutDownAsync(
                                            ApplicationStatus.UNKNOWN,
                                            ShutdownBehaviour.GRACEFUL_SHUTDOWN,
                                            ExceptionUtils.stringifyException(throwable),
                                            false);
                                 else 
                                    // This is the general shutdown path. If a separate more
                                    // specific shutdown was
                                    // already triggered, this will do nothing
                                    shutDownAsync(
                                            applicationStatus,
                                            ShutdownBehaviour.GRACEFUL_SHUTDOWN,
                                            null,
                                            true);
                                
                            );
        
    

3.2.1、initializeServices 初始化服务

    protected void initializeServices(Configuration configuration, PluginManager pluginManager)
            throws Exception 

        LOG.info("Initializing cluster services.");

        synchronized (lock) 
            // 1、生产 jobmanager 的 resourceId, 如果没有配置就随机生成一个
            resourceId =
                    configuration
                            .getOptional(JobManagerOptions.JOB_MANAGER_RESOURCE_ID)
                            .map(
                                    value ->
                                            DeterminismEnvelope.deterministicValue(
                                                    new ResourceID(value)))
                            .orElseGet(
                                    () ->
                                            DeterminismEnvelope.nondeterministicValue(
                                                    ResourceID.generate()));

            LOG.debug(
                    "Initialize cluster entrypoint  with resource id .",
                    getClass().getSimpleName(),
                    resourceId);

            // 2、创建 jobmanager 的工作目录
            workingDirectory =
                    ClusterEntrypointUtils.createJobManagerWorkingDirectory(
                            configuration, resourceId);
            // Using working directory: WorkingDirectory(/tmp/jm_d5e86014d966cefbbac51289de71fecb).
            LOG.info("Using working directory: .", workingDirectory);

            // 3、 创建 rpcSystem
            rpcSystem = RpcSystem.load(configuration);

            // 4、 初始化 rpcService
            commonRpcService =
                    RpcUtils.createRemoteRpcService(
                            rpcSystem,
                            configuration,
                            configuration.getString(JobManagerOptions.ADDRESS),
                            getRPCPortRange(configuration),
                            configuration.getString(JobManagerOptions.BIND_HOST),
                            configuration.getOptional(JobManagerOptions.RPC_BIND_PORT));

            // 5、启动了一个JMXService,用于客户端连接JobManager JVM监控
            JMXService.startInstance(configuration.getString(JMXServerOptions.JMX_SERVER_PORT));

            // update the configuration used to create the high availability services
            configuration.setString(JobManagerOptions.ADDRESS, commonRpcService.getAddress());
            configuration.setInteger(JobManagerOptions.PORT, commonRpcService.getPort());

            // 6、初始化IO线程池,大小为当前节点cpu核心数*4
            ioExecutor =
                    Executors.newFixedThreadPool(
                            ClusterEntrypointUtils.getPoolSize(configuration),
                            new ExecutorThreadFactory("cluster-io"));

            // 7、创建 HA 服务
            haServices = createHaServices(configuration, ioExecutor, rpcSystem);

            // 8、初始化 初始化大文件存储BlobServer服务端,所谓大文件例如上传Flink-job的jar时所依赖的一些需要一起上传的jar,或者TaskManager上传的log文件等
            blobServer =
                    BlobUtils.createBlobServer(
                            configuration,
                            Reference.borrowed(workingDirectory.unwrap().getBlobStorageDirectory()),
                            haServices.createBlobStore());
            blobServer.start();
            configuration.setString(BlobServerOptions.PORT, String.valueOf(blobServer.getPort()));

            // 9、心跳服务
            heartbeatServices = createHeartbeatServices(configuration);


            // 10、启动Metric(性能监控) 相关服务,
            metricRegistry = createMetricRegistry(configuration, pluginManager, rpcSystem);
            final RpcService metricQueryServiceRpcService =
                    MetricUtils.startRemoteMetricsRpcService(
                            configuration,
                            commonRpcService.getAddress(),
                            configuration.getString(JobManagerOptions.BIND_HOST),
                            rpcSystem);
            metricRegistry.startQueryService(metricQueryServiceRpcService, null);


            // 11、 processMetricGroup
            final String hostname = RpcUtils.getHostname(commonRpcService);
            processMetricGroup =
                    MetricUtils.instantiateProcessMetricGroup(
                            metricRegistry,
                            hostname,
                            ConfigurationUtils.getSystemResourceMetricsProbingInterval(
                                    configuration));

            // 12、 初始化一个用来存储ExecutionGraph的Store, 根据子类实现 创建 FileExecutionGraphInfoStore or MemoryExecutionGraphInfoStore
            executionGraphInfoStore =
                    createSerializableExecutionGraphStore(
                            configuration, commonRpcService.getScheduledExecutor());
        
    

3.2.1.7、创建 HA 服务

            // 7、创建 HA 服务
            haServices = createHaServices(configuration, ioExecutor, rpcSystem);

org.apache.flink.runtime.entrypoint.ClusterEntrypoint#createHaServices

    protected HighAvailabilityServices createHaServices(
            Configuration configuration, Executor executor, RpcSystemUtils rpcSystemUtils)
            throws Exception 
        return HighAvailabilityServicesUtils.createHighAvailabilityServices(
                configuration,
                executor,
                AddressResolution.NO_ADDRESS_RESOLUTION,
                rpcSystemUtils,
                this);
    

根据 high-availability的类型创建不同的 HighAvailabilityServices

    public static HighAvailabilityServices createHighAvailabilityServices(
            Configuration configuration,
            Executor executor,
            AddressResolution addressResolution,
            RpcSystemUtils rpcSystemUtils,
            FatalErrorHandler fatalErrorHandler)
            throws Exception 

        HighAvailabilityMode highAvailabilityMode = HighAvailabilityMode.fromConfig(configuration);

        switch (highAvailabilityMode) 
            case NONE:
                final Tuple2<String, Integer> hostnamePort = getJobManagerAddress(configuration);

                final String resourceManagerRpcUrl =
                        rpcSystemUtils.getRpcUrl(
                                hostnamePort.f0,
                                hostnamePort.f1,
                                RpcServiceUtils.createWildcardName(
                                        ResourceManager.RESOURCE_MANAGER_NAME),
                                addressResolution,
                                configuration);
                final String dispatcherRpcUrl =
                        rpcSystemUtils.getRpcUrl(
                                hostnamePort.f0,
                                hostnamePort.f1,
                                RpcServiceUtils.createWildcardName(Dispatcher.DISPATCHER_NAME),
                                addressResolution,
                                configuration);
                final String webMonitorAddress =
                        getWebMonitorAddress(configuration, addressResolution);

                return new StandaloneHaServices(
                        resourceManagerRpcUrl, dispatcherRpcUrl, webMonitorAddress);
            case ZOOKEEPER:
                return createZooKeeperHaServices(configuration, executor, fatalErrorHandler);

            case FAC

以上是关于Flink1.15源码解析---- ClusterEntrypoint的主要内容,如果未能解决你的问题,请参考以下文章

Flink1.15源码解析--启动JobManager

Flink1.15源码解析--启动JobManager

Flink1.15源码解析--启动JobManager----ResourceManager启动

Flink1.15源码解析--启动JobManager----ResourceManager启动

Flink1.15源码解析--启动JobManager----Dispatcher启动

Flink1.15源码解析--启动JobManager----Dispatcher启动