flinkFlink 1.12.2 源码浅析 : yarn-per-job模式解析 TaskMasger 启动
Posted 九师兄
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了flinkFlink 1.12.2 源码浅析 : yarn-per-job模式解析 TaskMasger 启动相关的知识,希望对你有一定的参考价值。
1.概述
转载:Flink 1.12.2 源码浅析 : yarn-per-job模式解析 [四]
上一篇: 【flink】Flink 1.12.2 源码浅析 : yarn-per-job模式解析 JobMasger启动 YarnJobClusterEntrypoint
整体流程图
TaskManager启动大体流程
YarnTaskExecutorRunner:Yarn模式下的TaskManager的入口类
- 启动 TaskExecutor
- 向ResourceManager注册slot
- ResourceManager分配slot
- TaskExecutor接收到分配的指令,提供offset给JobMaster(slotpool)
- JobMaster提交任务给TaskExecutor去执行
二 .代码分析
2.1. 执行入口
TaskManager的执行的入口类为 YarnTaskExecutorRunner#mian
/**
* The entry point for the YARN task executor runner.
*
* @param args The command line arguments.
*/
public static void main(String[] args) {
EnvironmentInformation.logEnvironmentInfo(LOG, "YARN TaskExecutor runner", args);
SignalHandler.register(LOG);
JvmShutdownSafeguard.installAsShutdownHook(LOG);
runTaskManagerSecurely(args);
}
2.2. TaskManagerRunner#runTaskManagerProcessSecurely
TaskManagerRunner.runTaskManagerProcessSecurely 是TaskManager的启动方法.
通过 YarnTaskExecutorRunner#mian 进行跳转. 跳转顺序入下:
YarnTaskExecutorRunner#mian
--> YarnTaskExecutorRunner#runTaskManagerSecurely
--> TaskManagerRunner.runTaskManagerProcessSecurely(Preconditions.checkNotNull(configuration));
public static void runTaskManagerProcessSecurely(Configuration configuration) {
replaceGracefulExitWithHaltIfConfigured(configuration);
// 加载插件 ???
final PluginManager pluginManager =
PluginUtils.createPluginManagerFromRootFolder(configuration);
// 文件系统初始化
FileSystem.initialize(configuration, pluginManager);
int exitCode;
Throwable throwable = null;
try {
// 安全模块
SecurityUtils.install(new SecurityConfiguration(configuration));
exitCode =
SecurityUtils.getInstalledContext()
// 启动TaskManager = > runTaskManager
.runSecured(() -> runTaskManager(configuration, pluginManager) );
} catch (Throwable t) {
throwable = ExceptionUtils.stripException(t, UndeclaredThrowableException.class);
exitCode = FAILURE_EXIT_CODE;
}
if (throwable != null) {
LOG.error("Terminating TaskManagerRunner with exit code {}.", exitCode, throwable);
} else {
LOG.info("Terminating TaskManagerRunner with exit code {}.", exitCode);
}
System.exit(exitCode);
}
2.3. TaskManagerRunner#runTaskManager
主要负责构建&启动TaskManagerRunner
public static int runTaskManager(Configuration configuration, PluginManager pluginManager)
throws Exception {
final TaskManagerRunner taskManagerRunner;
try {
// 构建 TaskManagerRunner
taskManagerRunner =
new TaskManagerRunner(
configuration,
pluginManager,
TaskManagerRunner::createTaskExecutorService);
// 启动 TaskManagerRunner
taskManagerRunner.start();
} catch (Exception exception) {
throw new FlinkException("Failed to start the TaskManagerRunner.", exception);
}
try {
return taskManagerRunner.getTerminationFuture().get().getExitCode();
} catch (Throwable t) {
throw new FlinkException(
"Unexpected failure during runtime of TaskManagerRunner.",
ExceptionUtils.stripExecutionException(t));
}
}
2.4. 启动 TaskManagerRunner
TaskManagerRunner#runTaskManager 方法中的 taskManagerRunner.start() 会调用TaskExecutor#onStart 方法.
public void start() throws Exception {
taskExecutorService.start();
}
TaskExecutorToServiceAdapter
@Override
public void start() {
taskExecutor.start();
}
@Override
public void onStart() throws Exception {
try {
startTaskExecutorServices();
} catch (Throwable t) {
final TaskManagerException exception =
new TaskManagerException(
String.format("Could not start the TaskExecutor %s", getAddress()), t);
onFatalError(exception);
throw exception;
}
startRegistrationTimeout();
}
然后再TaskExecutor#onStart方法中会执行TaskExecutor#startTaskExecutorServices方法.
在这里会连接ResourceManager, 注册Slot
// 启动 TaskExecutor 服务.
private void startTaskExecutorServices() throws Exception {
try {
// start by connecting to the ResourceManager
// 连接 ResourceManager . StandaloneLeaderRetrievalService#Start
resourceManagerLeaderRetriever.start(new ResourceManagerLeaderListener());
// tell the task slot table who's responsible for the task slot actions
// 告诉 task slot table 谁负责 task slot 操作
taskSlotTable.start(new SlotActionsImpl(), getMainThreadExecutor());
// start the job leader service
jobLeaderService.start(
getAddress(), getRpcService(), haServices, new JobLeaderListenerImpl());
fileCache =
new FileCache(
taskManagerConfiguration.getTmpDirectories(),
blobCacheService.getPermanentBlobService());
} catch (Exception e) {
handleStartTaskExecutorServicesException(e);
}
}
2.5. 向ResourceManager注册&启动
TaskExecutor#startTaskExecutorServices方法. 中 的 resourceManagerLeaderRetriever.start.
向ResourceManager注册
跳转顺序
resourceManagerLeaderRetriever.start
--> StandaloneLeaderRetrievalService#Start
--> TaskExecutor#notifyLeaderAddress
--> TaskExecutor#notifyOfNewResourceManagerLeader
--> TaskExecutor#reconnectToResourceManager
--> TaskExecutor#tryConnectToResourceManager
--> TaskExecutor#connectToResourceManager
1
TaskExecutor#connectToResourceManager
private void connectToResourceManager() {
// 校验相关
assert (resourceManagerAddress != null);
assert (establishedResourceManagerConnection == null);
assert (resourceManagerConnection == null);
log.info("Connecting to ResourceManager {}.", resourceManagerAddress);
// 构建 TaskExecutor Registration
final TaskExecutorRegistration taskExecutorRegistration =
new TaskExecutorRegistration(
getAddress(),
getResourceID(),
unresolvedTaskManagerLocation.getDataPort(),
JMXService.getPort().orElse(-1),
hardwareDescription,
memoryConfiguration,
taskManagerConfiguration.getDefaultSlotResourceProfile(),
taskManagerConfiguration.getTotalResourceProfile());
// 资源管理 Connection
resourceManagerConnection =
new TaskExecutorToResourceManagerConnection(
log,
getRpcService(),
taskManagerConfiguration.getRetryingRegistrationConfiguration(),
resourceManagerAddress.getAddress(),
resourceManagerAddress.getResourceManagerId(),
getMainThreadExecutor(),
new ResourceManagerRegistrationListener(),
taskExecutorRegistration);
// 建立连接操作
resourceManagerConnection.start();
}
RegisteredRpcConnection#start 方法负责建立连接操作.
// ------------------------------------------------------------------------
// Life cycle
// ------------------------------------------------------------------------
public void start() {
checkState(!closed, "The RPC connection is already closed");
checkState(
!isConnected() && pendingRegistration == null,
"The RPC connection is already started");
// 构建注册... , 生成 generateRegistration
final RetryingRegistration<F, G, S> newRegistration = createNewRegistration();
if (REGISTRATION_UPDATER.compareAndSet(this, null, newRegistration)) {
// 开始注册...
// 注册成功之后,调用startRegistration
newRegistration.startRegistration();
} else {
// concurrent start operation
newRegistration.cancel();
}
}
RegisteredRpcConnection#createNewRegistration
// ------------------------------------------------------------------------
// Internal methods
// ------------------------------------------------------------------------
private RetryingRegistration<F, G, S> createNewRegistration() {
// 生成注册 : JobMaster: generateRegistration 方法
RetryingRegistration<F, G, S> newRegistration = checkNotNull(generateRegistration());
CompletableFuture<Tuple2<G, S>> future = newRegistration.getFuture();
future.whenCompleteAsync(
(Tuple2<G, S> result, Throwable failure) -> {
if (failure != null) {
if (failure instanceof CancellationException) {
// we ignore cancellation exceptions because they originate from
// cancelling
// the RetryingRegistration
log.debug(
"Retrying registration towards {} was cancelled.",
targetAddress);
} else {
// this future should only ever fail if there is a bug, not if the
// registration is declined
onRegistrationFailure(failure);
}
} else {
// 注册成功...
targetGateway = result.f0;
onRegistrationSuccess(result.f1);
}
},
executor);
return newRegistration;
}
TaskExecutorToResourceManagerConnection#generateRegistration
生成注册
@Override
protected RetryingRegistration<
ResourceManagerId, ResourceManagerGateway, TaskExecutorRegistrationSuccess>
generateRegistration() {
// 构建生成 TaskExecutorToResourceManagerConnection
return new TaskExecutorToResourceManagerConnection.ResourceManagerRegistration(
log,
rpcService,
getTargetAddress(),
getTargetLeaderId(),
retryingRegistrationConfiguration,
taskExecutorRegistration);
}
2.6. RetryingRegistration#startRegistration();
注册成功之后,调用startRegistration
- connect 建立连接…
- register 向RM 注册
- startRegistrationLater
/**
* This method resolves the target address to a callable gateway and starts the registration
* after that.
*/
@SuppressWarnings("unchecked")
public void startRegistration() {
if (canceled) {
// we already got canceled
return;
}
try {
// trigger resolution of the target address to a callable gateway
final CompletableFuture<G> rpcGatewayFuture;
if (FencedRpcGateway.class.isAssignableFrom(targetType)) {
rpcGatewayFuture =
(CompletableFuture<G>)
// 建立连接....
rpcService.connect(
targetAddress,
fencingToken,
targetType.asSubclass(FencedRpcGateway.class));
} else {
rpcGatewayFuture = rpcService.connect(targetAddress, targetType);
}
// upon success, start the registration attempts
CompletableFuture<Void> rpcGatewayAcceptFuture =
rpcGatewayFuture.thenAcceptAsync(
(G rpcGateway) -> {
log.info("Resolved {} address, beginning registration", targetName);
// 执行注册操作
register(
rpcGateway,
1,
retryingRegistrationConfiguration
.getInitialRegistrationTimeoutMillis());
},
rpcService.getExecutor());
// upon failure, retry, unless this is cancelled
rpcGatewayAcceptFuture.whenCompleteAsync(
(Void v, Throwable failure) -> {
if (failure != null && !canceled) {
final Throwable strippedFailure =
ExceptionUtils.stripCompletionException(failure);
if (log.isDebugEnabled()) {
log.debug(
"Could not resolve {} address {}, retrying in {} ms.",
targetName,
targetAddress,
retryingRegistrationConfiguration.getErrorDelayMillis(),
strippedFailure);
} else {
log.info(
"Could not resolve {} address {}, retrying in {} ms: {}",
targetName,
targetAddress,
retryingRegistrationConfiguration.getErrorDelayMillis(),
strippedFailure.getMessage());
}
// 开展注册
startRegistrationLater(
retryingRegistrationConfiguration.getErrorDelayMillis());
}
},
rpcService.getExecutor());
} catch (Throwable t) {
completionFuture.completeExceptionally(t);
cancel();
}
}
RetryingRegistration#register 注册的时候会调用 invokeRegistration 方法
// 开始注册的时候 RetryingRegistration#register 方法
// 会 调用 invokeRegistration 方法
@Override
protected CompletableFuture<RegistrationResponse> invokeRegistration(
ResourceManagerGateway resourceManager,
ResourceManagerId fencingToken,
long timeoutMillis)
throws Exception {
Time timeout = Time.milliseconds(timeoutMillis);
// ResourceManager#registerTaskExecutor
// 注册 TaskExecutor
return resourceManager.registerTaskExecutor(taskExecutorRegistration, timeout);
}
RetryingRegistration#register 中的代码片段… 会调用invokeRegistration 方法
// [重点]注册的时候会调用 invokeRegistration 方法 .................
// TaskExecutorToResourceManagerConnection#invokeRegistration
CompletableFuture<RegistrationResponse> registrationFuture =
invokeRegistration(gateway, fencingToken, timeoutMillis)
TaskExecutorToResourceManagerConnection#invokeRegistration
注册 TaskExecutor
// 开始注册的时候 RetryingRegistration#register 方法
// 会 调用 invokeRegistration 方法
@Override
protected CompletableFuture<RegistrationResponse> invokeRegistration(
ResourceManagerGateway resourceManager,
ResourceManagerId fencingToken,
long timeoutMillis)
throws Exception {
Time timeout = Time.milliseconds(timeoutMillis);
// ResourceManager#registerTaskExecutor
// 注册 TaskExecutor
return resourceManager.registerTaskExecutor(taskExecutorRegistration, timeout);
}
}
ResourceManager#registerTaskExecutor
向ResouceManager 注册 TaskExecutor
- 连接taskExecutor
- 加入缓存
- 开始注册TaskExecutor == > ResourceManager#registerTaskExecutorInternal
/**
* 注册 TaskExecutor
* Registers a new TaskExecutor.
*
* @param taskExecutorRegistration task executor registration parameters
* @return RegistrationResponse
*/
private RegistrationResponse registerTaskExecutorInternal(
TaskExecutorGateway taskExecutorGateway,
TaskExecutorRegistration taskExecutorRegistration) {
// 获取 taskExecutorResourceId
ResourceID taskExecutorResourceId = taskExecutorRegistration.getResourceId();
// 移除缓存信息
WorkerRegistration<WorkerType> oldRegistration = taskExecutors.remove(taskExecutorResourceId);
if (oldRegistration != null) {
// 清理就的注册操作....
// TODO :: suggest old taskExecutor to stop itself
log.debug(
"Replacing old registration of TaskExecutor {}.",
taskExecutorResourceId.getStringWithMetadata());
// remove old task manager registration from slot manager
slotManager.unregisterTaskManager(
oldRegistration.getInstanceIDflinkFlink 1.12.2 源码浅析 : Task 浅析
flinkFlink 1.12.2 源码浅析 : Task数据输入
flinkFlink 1.12.2 源码浅析 :Task数据输出
flinkFlink 1.12.2 源码浅析 : yarn-per-job模式解析 TaskMasger 启动
flinkFlink 1.12.2 源码浅析 : yarn-per-job模式解析 yarn 提交过程解析
flinkFlink 1.12.2 源码浅析 : yarn-per-job模式解析 JobMasger启动 YarnJobClusterEntrypoint