Flink1.15源码解析--启动TaskManager

Posted 宝哥大数据[离职找工作中,大佬帮内推下]

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flink1.15源码解析--启动TaskManager相关的知识,希望对你有一定的参考价值。

文章目录

一、前言

TaskManager(也称为 worker)执行作业流的 task,并且缓存和交换数据流

必须始终至少有一个 TaskManager。在 TaskManager 中资源调度的最小单位是 task slot。TaskManager 中 task slot 的数量表示并发处理 task 的数量。请注意一个 task slot 中可以执行多个算子

二、TaskManagerRunner

taskmanager.sh 脚本我们 知道 taskmanager的启动类是 org.apache.flink.runtime.taskexecutor.TaskManagerRunner

入口 main(), 前几行是检验参数,日志记录,然后调用 runTaskManagerProcessSecurely

    public static void main(String[] args) throws Exception 
        // startup checks and logging
        EnvironmentInformation.logEnvironmentInfo(LOG, "TaskManager", args);
        SignalHandler.register(LOG);
        JvmShutdownSafeguard.installAsShutdownHook(LOG);

        long maxOpenFileHandles = EnvironmentInformation.getOpenFileHandlesLimit();

        if (maxOpenFileHandles != -1L) 
            LOG.info("Maximum number of open file descriptors is .", maxOpenFileHandles);
         else 
            LOG.info("Cannot determine the maximum number of open file descriptors");
        

        runTaskManagerProcessSecurely(args);
    

接下来我们查看 runTaskManagerProcessSecurely 方法, 就是加载 flink 的配置文件 flink-config.yaml.

    public static void runTaskManagerProcessSecurely(String[] args) 
        Configuration configuration = null;

        try 
            configuration = loadConfiguration(args);
         catch (FlinkParseException fpe) 
            LOG.error("Could not load the configuration.", fpe);
            System.exit(FAILURE_EXIT_CODE);
        
		
        runTaskManagerProcessSecurely(checkNotNull(configuration));
    

然后将解析完的配置传入 runTaskManagerProcessSecurely

    public static void runTaskManagerProcessSecurely(Configuration configuration) 
        // 加载配置
        FlinkSecurityManager.setFromConfiguration(configuration);

        // 启动插件管理器
        final PluginManager pluginManager =
                PluginUtils.createPluginManagerFromRootFolder(configuration);
        FileSystem.initialize(configuration, pluginManager);

        StateChangelogStorageLoader.initialize(pluginManager);

        int exitCode;
        Throwable throwable = null;

        ClusterEntrypointUtils.configureUncaughtExceptionHandler(configuration);
        try 
            // 这个地方我们见了很多次,组件启动,都是通过 SecurityUtils
            // 安装 安全模块
            SecurityUtils.install(new SecurityConfiguration(configuration));

            // 通过安全上下文 启动 taskManager
            exitCode =
                    SecurityUtils.getInstalledContext()
                            .runSecured(() -> runTaskManager(configuration, pluginManager));
         catch (Throwable t) 
            throwable = ExceptionUtils.stripException(t, UndeclaredThrowableException.class);
            exitCode = FAILURE_EXIT_CODE;
        

        if (throwable != null) 
            LOG.error("Terminating TaskManagerRunner with exit code .", exitCode, throwable);
         else 
            LOG.info("Terminating TaskManagerRunner with exit code .", exitCode);
        

        System.exit(exitCode);
    

runTaskManager 通过名字我们不难看出,离TaskManager的构建越来越近了。我们点进runTaskManager方法:


    public static int runTaskManager(Configuration configuration, PluginManager pluginManager)
            throws Exception 
        final TaskManagerRunner taskManagerRunner;
        try 
            // 创建 TaskManagerRunner
            taskManagerRunner =
                    new TaskManagerRunner(
                            configuration,
                            pluginManager,
                            TaskManagerRunner::createTaskExecutorService); // 创建 TaskManagerRunner 相关服务
            // 启动 TaskManagerRunner
            taskManagerRunner.start();
         catch (Exception exception) 
            throw new FlinkException("Failed to start the TaskManagerRunner.", exception);
        

        // 返回结果码
        try 
            return taskManagerRunner.getTerminationFuture().get().getExitCode();
         catch (Throwable t) 
            throw new FlinkException(
                    "Unexpected failure during runtime of TaskManagerRunner.",
                    ExceptionUtils.stripExecutionException(t));
        
    

在这个方法里做了两件事:

  • 1、构建了一个TaskManagerRunner

  • 2、启动TaskManagerRunner

    • 基础服务初始化

实际上,TaskManager启动的所有准备工作,都是在这个TaskManagerRunner中完成的。

2.1、创建 TaskManagerRunner

            // 创建 TaskManagerRunner
            taskManagerRunner =
                    new TaskManagerRunner(
                            configuration,
                            pluginManager,
                            TaskManagerRunner::createTaskExecutorService); // 创建 TaskManagerRunner 相关服务

2.1.1、创建 TaskExecutorService, 用于创建 TaskExecutor

TaskManagerRunner::createTaskExecutorService


   public static TaskExecutorService createTaskExecutorService(
            Configuration configuration,
            ResourceID resourceID,
            RpcService rpcService,
            HighAvailabilityServices highAvailabilityServices,
            HeartbeatServices heartbeatServices,
            MetricRegistry metricRegistry,
            BlobCacheService blobCacheService,
            boolean localCommunicationOnly,
            ExternalResourceInfoProvider externalResourceInfoProvider,
            WorkingDirectory workingDirectory,
            FatalErrorHandler fatalErrorHandler)
            throws Exception 
		// 构建  TaskExecutor
        final TaskExecutor taskExecutor =
                startTaskManager(
                        configuration,
                        resourceID,
                        rpcService,
                        highAvailabilityServices,
                        heartbeatServices,
                        metricRegistry,
                        blobCacheService,
                        localCommunicationOnly,
                        externalResourceInfoProvider,
                        workingDirectory,
                        fatalErrorHandler);

        return TaskExecutorToServiceAdapter.createFor(taskExecutor);
    

2.2、启动 TaskManagerRunner

    public void start() throws Exception 
        synchronized (lock) 
        	// 
            startTaskManagerRunnerServices();
            taskExecutorService.start();
        
    

2.2.1、基础服务的初始化, 构建 TaskExecutorService

 private void startTaskManagerRunnerServices() throws Exception 
        synchronized (lock) 
            rpcSystem = RpcSystem.load(configuration);

            //  TaskManager 内部线程池,用来处理从节点内部各个组件的Io的线程池
            this.executor =
                    Executors.newScheduledThreadPool(
                            Hardware.getNumberCPUCores(), // 线程池大小为当前节点的cpu核心数
                            new ExecutorThreadFactory("taskmanager-future"));

            // 高可用服务
            highAvailabilityServices =
                    HighAvailabilityServicesUtils.createHighAvailabilityServices(
                            configuration,
                            executor,
                            AddressResolution.NO_ADDRESS_RESOLUTION,
                            rpcSystem,
                            this);

            // flink1.12 引入新功能 JMX服务,提供监控信息
            JMXService.startInstance(configuration.getString(JMXServerOptions.JMX_SERVER_PORT));

            // 启动RPC服务,内部为Akka模型的ActorSystem
            rpcService = createRpcService(configuration, highAvailabilityServices, rpcSystem);

            // 为TaskManager生成了一个ResourceID
            this.resourceId =
                    getTaskManagerResourceID(
                            configuration, rpcService.getAddress(), rpcService.getPort());

            this.workingDirectory =
                    ClusterEntrypointUtils.createTaskManagerWorkingDirectory(
                            configuration, resourceId);

            LOG.info("Using working directory: ", workingDirectory);

            // 初始化心跳服务,主要是初始化心跳间隔和心跳超时参数配置
            HeartbeatServices heartbeatServices =
                    HeartbeatServices.fromConfiguration(configuration);

            // 启动Metric(性能监控) 相关服务
            metricRegistry =
                    new MetricRegistryImpl(
                            MetricRegistryConfiguration.fromConfiguration(
                                    configuration,
                                    rpcSystem.getMaximumMessageSizeInBytes(configuration)),
                            ReporterSetup.fromConfiguration(configuration, pluginManager));

            final RpcService metricQueryServiceRpcService =
                    MetricUtils.startRemoteMetricsRpcService(
                            configuration,
                            rpcService.getAddress(),
                            configuration.getString(TaskManagerOptions.BIND_HOST),
                            rpcSystem);
            metricRegistry.startQueryService(metricQueryServiceRpcService, resourceId.unwrap());

            // 在主节点启动的时候,事实上已经启动了有个BolbServer,
            // 从节点启动的时候,会启动一个BlobCacheService,做文件缓存的服务
            blobCacheService =
                    BlobUtils.createBlobCacheService(
                            configuration,
                            Reference.borrowed(workingDirectory.unwrap().getBlobStorageDirectory()),
                            highAvailabilityServices.createBlobStore(),
                            null);

            final ExternalResourceInfoProvider externalResourceInfoProvider =
                    ExternalResourceUtils.createStaticExternalResourceInfoProviderFromConfig(
                            configuration, pluginManager);

            //  创建得到一个TaskExecutorService,内部封装了TaskExecutor,同时TaskExecutor的构建也在内部完成
            taskExecutorService =
                    taskExecutorServiceFactory.createTaskExecutor(
                            this.configuration,
                            this.resourceId.unwrap(),
                            rpcService,
                            highAvailabilityServices,
                            heartbeatServices,
                            metricRegistry,
                            blobCacheService,
                            false,
                            externalResourceInfoProvider,
                            workingDirectory.unwrap(),
                            this);

            handleUnexpectedTaskExecutorServiceTermination();

            MemoryLogger.startIfConfigured(
                    LOG, configuration, terminationFuture.thenAccept(ignored -> ));
        
    

这里所做的工作和JobManager启动时一样,是一些基础服务的构建和启动,在这里一共做了以下这些工作:

  • 1、初始化了一个TaskManager内部线程池,用来处理从节点内部各个组件的IO,该线程池的大小为当前节点CPU的核心数。

  • 2、构建了一个高可用服务。

  • 3、初始化JMX服务,用于提供监控信息。

  • 4、启动RPC服务,内部为Akka模型的ActorSystem(点此查看Flink 1.13 源码解析前导——Akka通信模型)

  • 4、为TaskManager生成了一个ResourceID。

  • 5、初始化心跳服务,根据配置文件获取心跳间隔时间参数以及心跳超时参数

  • 6、初始化metric服务

  • 7、启动BlobCacheService服务,做文件缓存的服务。

  • 8、构建了一个TaskExecutorService,内部封装了TaskExecutor。

2.2.1.1、BlobCacheService的初始化

在这个构造方法里,主要做了两件事:

  • 1、初始化了一个持久化Blob缓存服务

  • 2、初始化了一个临时Blob缓存服务

在这两个服务的内部,都会在启动的时候启动一个定时服务,就是将过期的某个Job的对应资源都删除掉。

    public BlobCacheService(
            final Configuration blobClientConfig,
            final Reference<File> storageDir,
            final BlobView blobView,
            @Nullable final InetSocketAddress serverAddress)
            throws IOException 

        this(
                // 持久化Blob缓存服务
                new PermanentBlobCache(blobClientConfig, storageDir, blobView, serverAddress),
                // 临时Blob缓存服务
                new TransientBlobCache(blobClientConfig, storageDir, serverAddress));
    

以 PermanentBlobCache 为例

    @VisibleForTesting
    public PermanentBlobCache(
            final Configuration blobClientConfig,
            final Reference<File> storageDir,
            final BlobView blobView,
            @Nullable final InetSocketAddress serverAddress,
            BlobCacheSizeTracker blobCacheSizeTracker)
            throws IOException 
        super(
                blobClientConfig,
                storageDir,
                blobView,
                LoggerFactory.getLogger(PermanentBlobCache.class),
                serverAddress);

        // Initializing the clean up task
        this.cleanupTimer = new Timer(true);

        this.cleanupInterval = blobClientConfig.getLong(BlobServerOptions.CLEANUP_INTERVAL) * 1000;
        // TODO 启动定时任务
        this.cleanupTimer.schedule(
                new PermanentBlobCleanupTask(), // 任务
                cleanupInterval, cleanupInterval);
        // TODO 为永久BLOB文件提供缓存,包括每个作业的引用计数和分阶段清理。
        this.blobCacheSizeTracker = blobCacheSizeTracker;

        registerDetectedJobs();
    

我们可以看到有以下操作:

  • 1、首先在方法里通过引用计数的方式,获取所有job引用的资源文件。

  • 2、遍历这些文件,并判断是否过期。

  • 3、如果过期则删除该资源文件夹。

在临时缓存blob服务中也是一样的工作

接下来到了重要环节

2.3、TaskExecutor的初始化

// org.apache.flink.runtime.taskexecutor.TaskManagerRunner#createTaskExecutorService

    public static TaskExecutorService createTaskExecutorService(
            Configuration configuration,
            ResourceID resourceID,
            RpcService rpcService,
            HighAvailabilityServices highAvailabilityServices,
            HeartbeatServices heartbeatServices,
            MetricRegistry metricRegistry,
            BlobCacheService blobCacheService,
            boolean localCommunicationOnly,
            ExternalResourceInfoProvider externalResourceInfoProvider,
            WorkingDirectory workingDirectory,
            FatalErrorHandler fatalErrorHandler)
            throws Exception 

        final TaskExecutor taskExecutor =
                startTaskManager(
                        configuration,
                        resourceID,
                        rpcService,
                        highAvailabilityServices,
                        heartbeatServices,
                        metricRegistry,
                        blobCacheService,
                        localCommunicationOnly,
                        externalResourceInfoProvider,
                        workingDirectory,
                        fatalErrorHandler);
        /*
         TODO 封装了一下TaskExecutor
          TaskExecutor是TaskExecutorToServiceAdapter的成员变量
          TaskExecutorToServiceAdapter是TaskManagerRunner的成员变量
         */
        return TaskExecutorToServiceAdapter.createFor(taskExecutor);
    

可以看到在这里真正初始化了一个TaskExecutor,并将TaskExecutor封装了一下,我们首先来看TaskExecutor的初始化,我们进入startTaskManager方法:

在该方法内部依然是初始化了一些基础服务:

返回Flink1.15源码解析-总目录

以上是关于Flink1.15源码解析--启动TaskManager的主要内容,如果未能解决你的问题,请参考以下文章

Flink1.15源码解析--启动JobManager----ResourceManager启动

Flink1.15源码解析--启动JobManager----ResourceManager启动

Flink1.15源码解析--启动JobManager----Dispatcher启动

Flink1.15源码解析--启动JobManager----Dispatcher启动

Flink1.15源码解析

Flink1.15源码解析--启动JobManager----WebMonitorEndpoint启动