Flink1.15源码解析--启动TaskManager

Posted 宝哥大数据

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flink1.15源码解析--启动TaskManager相关的知识,希望对你有一定的参考价值。

文章目录

一、前言

TaskManager(也称为 worker)执行作业流的 task,并且缓存和交换数据流

必须始终至少有一个 TaskManager。在 TaskManager 中资源调度的最小单位是 task slot。TaskManager 中 task slot 的数量表示并发处理 task 的数量。请注意一个 task slot 中可以执行多个算子

二、TaskManagerRunner

taskmanager.sh 脚本我们 知道 taskmanager的启动类是 org.apache.flink.runtime.taskexecutor.TaskManagerRunner

2.1、TaskManagerRunner

入口 main(), 前几行是检验参数,日志记录,然后调用 runTaskManagerProcessSecurely

    public static void main(String[] args) throws Exception 
        // startup checks and logging
        EnvironmentInformation.logEnvironmentInfo(LOG, "TaskManager", args);
        SignalHandler.register(LOG);
        JvmShutdownSafeguard.installAsShutdownHook(LOG);

        long maxOpenFileHandles = EnvironmentInformation.getOpenFileHandlesLimit();

        if (maxOpenFileHandles != -1L) 
            LOG.info("Maximum number of open file descriptors is .", maxOpenFileHandles);
         else 
            LOG.info("Cannot determine the maximum number of open file descriptors");
        

        runTaskManagerProcessSecurely(args);
    

接下来我们查看 runTaskManagerProcessSecurely 方法, 就是加载 flink 的配置文件 flink-config.yaml.

    public static void runTaskManagerProcessSecurely(String[] args) 
        Configuration configuration = null;

        try 
            configuration = loadConfiguration(args);
         catch (FlinkParseException fpe) 
            LOG.error("Could not load the configuration.", fpe);
            System.exit(FAILURE_EXIT_CODE);
        
		
        runTaskManagerProcessSecurely(checkNotNull(configuration));
    

然后将解析完的配置传入 runTaskManagerProcessSecurely

    public static void runTaskManagerProcessSecurely(Configuration configuration) 
        // 加载配置
        FlinkSecurityManager.setFromConfiguration(configuration);

        // 启动插件管理器
        final PluginManager pluginManager =
                PluginUtils.createPluginManagerFromRootFolder(configuration);
        FileSystem.initialize(configuration, pluginManager);

        StateChangelogStorageLoader.initialize(pluginManager);

        int exitCode;
        Throwable throwable = null;

        ClusterEntrypointUtils.configureUncaughtExceptionHandler(configuration);
        try 
            // 这个地方我们见了很多次,组件启动,都是通过 SecurityUtils
            // 安装 安全模块
            SecurityUtils.install(new SecurityConfiguration(configuration));

            // 通过安全上下文 启动 taskManager
            exitCode =
                    SecurityUtils.getInstalledContext()
                            .runSecured(() -> runTaskManager(configuration, pluginManager));
         catch (Throwable t) 
            throwable = ExceptionUtils.stripException(t, UndeclaredThrowableException.class);
            exitCode = FAILURE_EXIT_CODE;
        

        if (throwable != null) 
            LOG.error("Terminating TaskManagerRunner with exit code .", exitCode, throwable);
         else 
            LOG.info("Terminating TaskManagerRunner with exit code .", exitCode);
        

        System.exit(exitCode);
    

runTaskManager 通过名字我们不难看出,离TaskManager的构建越来越近了。我们点进runTaskManager方法:


    public static int runTaskManager(Configuration configuration, PluginManager pluginManager)
            throws Exception 
        final TaskManagerRunner taskManagerRunner;
        try 
            // 创建 TaskManagerRunner
            taskManagerRunner =
                    new TaskManagerRunner(
                            configuration,
                            pluginManager,
                            TaskManagerRunner::createTaskExecutorService); // 创建 TaskManagerRunner 相关服务
            // 启动 TaskManagerRunner
            taskManagerRunner.start();
         catch (Exception exception) 
            throw new FlinkException("Failed to start the TaskManagerRunner.", exception);
        

        // 返回结果码
        try 
            return taskManagerRunner.getTerminationFuture().get().getExitCode();
         catch (Throwable t) 
            throw new FlinkException(
                    "Unexpected failure during runtime of TaskManagerRunner.",
                    ExceptionUtils.stripExecutionException(t));
        
    

在这个方法里做了两件事:

  • 1、构建了一个TaskManagerRunner

  • 2、启动TaskManagerRunner

    • 基础服务初始化

实际上,TaskManager启动的所有准备工作,都是在这个TaskManagerRunner中完成的。

2.1.1、创建 TaskManagerRunner

            // 创建 TaskManagerRunner
            taskManagerRunner =
                    new TaskManagerRunner(
                            configuration,
                            pluginManager,
                            TaskManagerRunner::createTaskExecutorService); // 创建 TaskManagerRunner 相关服务

2.1.1.1、创建 TaskExecutorService, 用于创建 TaskExecutor

TaskManagerRunner::createTaskExecutorService


   public static TaskExecutorService createTaskExecutorService(
            Configuration configuration,
            ResourceID resourceID,
            RpcService rpcService,
            HighAvailabilityServices highAvailabilityServices,
            HeartbeatServices heartbeatServices,
            MetricRegistry metricRegistry,
            BlobCacheService blobCacheService,
            boolean localCommunicationOnly,
            ExternalResourceInfoProvider externalResourceInfoProvider,
            WorkingDirectory workingDirectory,
            FatalErrorHandler fatalErrorHandler)
            throws Exception 
		// 构建  TaskExecutor
        final TaskExecutor taskExecutor =
                startTaskManager(
                        configuration,
                        resourceID,
                        rpcService,
                        highAvailabilityServices,
                        heartbeatServices,
                        metricRegistry,
                        blobCacheService,
                        localCommunicationOnly,
                        externalResourceInfoProvider,
                        workingDirectory,
                        fatalErrorHandler);

        return TaskExecutorToServiceAdapter.createFor(taskExecutor);
    

返回Flink1.15源码解析-总目录

以上是关于Flink1.15源码解析--启动TaskManager的主要内容,如果未能解决你的问题,请参考以下文章

Flink1.15源码解析--启动JobManager----ResourceManager启动

Flink1.15源码解析--启动JobManager----ResourceManager启动

Flink1.15源码解析--启动JobManager----Dispatcher启动

Flink1.15源码解析--启动JobManager----Dispatcher启动

Flink1.15源码解析

Flink1.15源码解析--启动JobManager----WebMonitorEndpoint启动