Flink1.15源码解析--启动TaskManager
Posted 宝哥大数据
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flink1.15源码解析--启动TaskManager相关的知识,希望对你有一定的参考价值。
文章目录
一、前言
TaskManager(也称为 worker)执行作业流的 task,并且缓存和交换数据流。
必须始终至少有一个 TaskManager。在 TaskManager 中资源调度的最小单位是 task slot。TaskManager 中 task slot 的数量表示并发处理 task 的数量。请注意一个 task slot 中可以执行多个算子
二、TaskManagerRunner
从 taskmanager.sh 脚本我们 知道 taskmanager的启动类是 org.apache.flink.runtime.taskexecutor.TaskManagerRunner
。
2.1、TaskManagerRunner
入口 main(), 前几行是检验参数,日志记录,然后调用 runTaskManagerProcessSecurely
public static void main(String[] args) throws Exception
// startup checks and logging
EnvironmentInformation.logEnvironmentInfo(LOG, "TaskManager", args);
SignalHandler.register(LOG);
JvmShutdownSafeguard.installAsShutdownHook(LOG);
long maxOpenFileHandles = EnvironmentInformation.getOpenFileHandlesLimit();
if (maxOpenFileHandles != -1L)
LOG.info("Maximum number of open file descriptors is .", maxOpenFileHandles);
else
LOG.info("Cannot determine the maximum number of open file descriptors");
runTaskManagerProcessSecurely(args);
接下来我们查看 runTaskManagerProcessSecurely 方法, 就是加载 flink 的配置文件 flink-config.yaml.
public static void runTaskManagerProcessSecurely(String[] args)
Configuration configuration = null;
try
configuration = loadConfiguration(args);
catch (FlinkParseException fpe)
LOG.error("Could not load the configuration.", fpe);
System.exit(FAILURE_EXIT_CODE);
runTaskManagerProcessSecurely(checkNotNull(configuration));
然后将解析完的配置传入 runTaskManagerProcessSecurely
public static void runTaskManagerProcessSecurely(Configuration configuration)
// 加载配置
FlinkSecurityManager.setFromConfiguration(configuration);
// 启动插件管理器
final PluginManager pluginManager =
PluginUtils.createPluginManagerFromRootFolder(configuration);
FileSystem.initialize(configuration, pluginManager);
StateChangelogStorageLoader.initialize(pluginManager);
int exitCode;
Throwable throwable = null;
ClusterEntrypointUtils.configureUncaughtExceptionHandler(configuration);
try
// 这个地方我们见了很多次,组件启动,都是通过 SecurityUtils
// 安装 安全模块
SecurityUtils.install(new SecurityConfiguration(configuration));
// 通过安全上下文 启动 taskManager
exitCode =
SecurityUtils.getInstalledContext()
.runSecured(() -> runTaskManager(configuration, pluginManager));
catch (Throwable t)
throwable = ExceptionUtils.stripException(t, UndeclaredThrowableException.class);
exitCode = FAILURE_EXIT_CODE;
if (throwable != null)
LOG.error("Terminating TaskManagerRunner with exit code .", exitCode, throwable);
else
LOG.info("Terminating TaskManagerRunner with exit code .", exitCode);
System.exit(exitCode);
runTaskManager 通过名字我们不难看出,离TaskManager的构建越来越近了。我们点进runTaskManager方法:
public static int runTaskManager(Configuration configuration, PluginManager pluginManager)
throws Exception
final TaskManagerRunner taskManagerRunner;
try
// 创建 TaskManagerRunner
taskManagerRunner =
new TaskManagerRunner(
configuration,
pluginManager,
TaskManagerRunner::createTaskExecutorService); // 创建 TaskManagerRunner 相关服务
// 启动 TaskManagerRunner
taskManagerRunner.start();
catch (Exception exception)
throw new FlinkException("Failed to start the TaskManagerRunner.", exception);
// 返回结果码
try
return taskManagerRunner.getTerminationFuture().get().getExitCode();
catch (Throwable t)
throw new FlinkException(
"Unexpected failure during runtime of TaskManagerRunner.",
ExceptionUtils.stripExecutionException(t));
在这个方法里做了两件事:
-
1、构建了一个TaskManagerRunner
-
2、启动TaskManagerRunner
- 基础服务初始化
实际上,TaskManager启动的所有准备工作,都是在这个TaskManagerRunner中完成的。
2.1.1、创建 TaskManagerRunner
// 创建 TaskManagerRunner
taskManagerRunner =
new TaskManagerRunner(
configuration,
pluginManager,
TaskManagerRunner::createTaskExecutorService); // 创建 TaskManagerRunner 相关服务
2.1.1.1、创建 TaskExecutorService, 用于创建 TaskExecutor
TaskManagerRunner::createTaskExecutorService
public static TaskExecutorService createTaskExecutorService(
Configuration configuration,
ResourceID resourceID,
RpcService rpcService,
HighAvailabilityServices highAvailabilityServices,
HeartbeatServices heartbeatServices,
MetricRegistry metricRegistry,
BlobCacheService blobCacheService,
boolean localCommunicationOnly,
ExternalResourceInfoProvider externalResourceInfoProvider,
WorkingDirectory workingDirectory,
FatalErrorHandler fatalErrorHandler)
throws Exception
// 构建 TaskExecutor
final TaskExecutor taskExecutor =
startTaskManager(
configuration,
resourceID,
rpcService,
highAvailabilityServices,
heartbeatServices,
metricRegistry,
blobCacheService,
localCommunicationOnly,
externalResourceInfoProvider,
workingDirectory,
fatalErrorHandler);
return TaskExecutorToServiceAdapter.createFor(taskExecutor);
以上是关于Flink1.15源码解析--启动TaskManager的主要内容,如果未能解决你的问题,请参考以下文章
Flink1.15源码解析--启动JobManager----ResourceManager启动
Flink1.15源码解析--启动JobManager----ResourceManager启动
Flink1.15源码解析--启动JobManager----Dispatcher启动