flinkFlink 1.12.2 源码浅析 : yarn-per-job模式解析 JobMasger启动 YarnJobClusterEntrypoint
Posted 九师兄
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了flinkFlink 1.12.2 源码浅析 : yarn-per-job模式解析 JobMasger启动 YarnJobClusterEntrypoint相关的知识,希望对你有一定的参考价值。
1.概述
转载:Flink 1.12.2 源码浅析 : yarn-per-job模式解析 [三]
上一章:【flink】Flink 1.12.2 源码浅析 : yarn-per-job模式解析 yarn 提交过程解析
整体流程图
二 .代码分析
上节我们看到 yarnClient.submitApplication(appContext);
将任务提交到yarn集群.
在AppMaster中执行的程序的代码如下 :
$JAVA_HOME/bin/java
-Xmx1073741824
-Xms1073741824
-XX:MaxMetaspaceSize=268435456
-Dlog.file="<LOG_DIR>/jobmanager.log"
-Dlog4j.configuration=file:log4j.properties
-Dlog4j.configurationFile=file:log4j.properties
org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint
-D jobmanager.memory.off-heap.size=134217728b
-D jobmanager.memory.jvm-overhead.min=201326592b
-D jobmanager.memory.jvm-metaspace.size=268435456b
-D jobmanager.memory.heap.size=1073741824b
-D jobmanager.memory.jvm-overhead.max=201326592b 1> <LOG_DIR>/jobmanager.out 2> <LOG_DIR>/jobmanager.err
其中入口类为 : org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint
所以,我们就从这个类开始看…
2.1. YarnJobClusterEntrypoint#main
AM的程序入口就是 YarnJobClusterEntrypoint#main
方法.
- 加载配置
- 构建
YarnJobClusterEntrypoint
ClusterEntrypoint.runClusterEntrypoint(yarnJobClusterEntrypoint);
// ------------------------------------------------------------------------
// The executable entry point for the Yarn Application Master Process
// for a single Flink job.
// ------------------------------------------------------------------------
public static void main(String[] args) {
// startup checks and logging
EnvironmentInformation.logEnvironmentInfo(
LOG, YarnJobClusterEntrypoint.class.getSimpleName(), args);
SignalHandler.register(LOG);
JvmShutdownSafeguard.installAsShutdownHook(LOG);
Map<String, String> env = System.getenv();
final String workingDirectory = env.get(ApplicationConstants.Environment.PWD.key());
Preconditions.checkArgument(
workingDirectory != null,
"Working directory variable (%s) not set",
ApplicationConstants.Environment.PWD.key());
try {
YarnEntrypointUtils.logYarnEnvironmentInformation(env, LOG);
} catch (IOException e) {
LOG.warn("Could not log YARN environment information.", e);
}
final Configuration dynamicParameters =
ClusterEntrypointUtils.parseParametersOrExit(
args,
new DynamicParametersConfigurationParserFactory(),
YarnJobClusterEntrypoint.class);
// 1.构建配置
final Configuration configuration =
YarnEntrypointUtils.loadConfiguration(workingDirectory, dynamicParameters, env);
// 2.构建YarnJobClusterEntrypoint
YarnJobClusterEntrypoint yarnJobClusterEntrypoint = new YarnJobClusterEntrypoint(configuration);
// 3.启动
ClusterEntrypoint.runClusterEntrypoint(yarnJobClusterEntrypoint);
}
2.2. ClusterEntrypoint#runCluster
在 ClusterEntrypoint.runClusterEntrypoint(yarnJobClusterEntrypoint);
之后会进行多次跳转, 最终调用ClusterEntrypoint#runCluster
方法启动集群.
在这个方法里面通过createDispatcherResourceManagerComponentFactory$create
创建JobManager内的组件 : Dispatcher / ResourceManager / JobMaster
// 执行
ClusterEntrypoint#runClusterEntrypoint
--> ClusterEntrypoint#startCluster();
--> ClusterEntrypoint#runCluster(configuration, pluginManager);
private void runCluster(Configuration configuration, PluginManager pluginManager)
throws Exception {
synchronized (lock) {
// 初始化插件...
// 构建各种服务, 比如心跳,RPC ....
initializeServices(configuration, pluginManager);
// 在配置中写入host和port 信息
// write host information into configuration
configuration.setString(JobManagerOptions.ADDRESS, commonRpcService.getAddress());
configuration.setInteger(JobManagerOptions.PORT, commonRpcService.getPort());
// 创建JobManager内的组件 : Dispatcher / ResourceManager / JobMaster
// 构建 DispatcherResourceManagerComponentFactory
final DispatcherResourceManagerComponentFactory
dispatcherResourceManagerComponentFactory =
createDispatcherResourceManagerComponentFactory(configuration);
// 构建集群 组件 DispatcherResourceManagerComponent
clusterComponent =
dispatcherResourceManagerComponentFactory.create(
configuration,
ioExecutor,
commonRpcService,
haServices,
blobServer,
heartbeatServices,
metricRegistry,
archivedExecutionGraphStore,
new RpcMetricQueryServiceRetriever(
metricRegistry.getMetricQueryServiceRpcService()),
this);
// 关闭操作...
clusterComponent
.getShutDownFuture()
.whenComplete(
(ApplicationStatus applicationStatus, Throwable throwable) -> {
if (throwable != null) {
shutDownAsync(
ApplicationStatus.UNKNOWN,
ExceptionUtils.stringifyException(throwable),
false);
} else {
// This is the general shutdown path. If a separate more
// specific shutdown was
// already triggered, this will do nothing
shutDownAsync(applicationStatus, null, true);
}
});
}
}
2.3. DefaultDispatcherResourceManagerComponentFactory#create
核心的服务构建&启动都在这里,
包括ResourceManager , Dispatcher , JobManager 以及WEB UI , History Server 相关服务 .
@Override
public DispatcherResourceManagerComponent create(
Configuration configuration,
Executor ioExecutor,
RpcService rpcService,
HighAvailabilityServices highAvailabilityServices,
BlobServer blobServer,
HeartbeatServices heartbeatServices,
MetricRegistry metricRegistry,
ArchivedExecutionGraphStore archivedExecutionGraphStore,
MetricQueryServiceRetriever metricQueryServiceRetriever,
FatalErrorHandler fatalErrorHandler)
throws Exception {
LeaderRetrievalService dispatcherLeaderRetrievalService = null;
LeaderRetrievalService resourceManagerRetrievalService = null;
WebMonitorEndpoint<?> webMonitorEndpoint = null;
ResourceManager<?> resourceManager = null;
DispatcherRunner dispatcherRunner = null;
try {
// Dispatcher 高可用相关
dispatcherLeaderRetrievalService =
highAvailabilityServices.getDispatcherLeaderRetriever();
// ResourceManager 高可用相关
resourceManagerRetrievalService =
highAvailabilityServices.getResourceManagerLeaderRetriever();
// Dispatcher 网关相关
final LeaderGatewayRetriever<DispatcherGateway> dispatcherGatewayRetriever =
new RpcGatewayRetriever<>(
rpcService,
DispatcherGateway.class,
DispatcherId::fromUuid,
new ExponentialBackoffRetryStrategy(
12, Duration.ofMillis(10), Duration.ofMillis(50)));
// ResourceManager 网关相关
final LeaderGatewayRetriever<ResourceManagerGateway> resourceManagerGatewayRetriever =
new RpcGatewayRetriever<>(
rpcService,
ResourceManagerGateway.class,
ResourceManagerId::fromUuid,
new ExponentialBackoffRetryStrategy(
12, Duration.ofMillis(10), Duration.ofMillis(50)));
// 构建 Executor
final ScheduledExecutorService executor =
WebMonitorEndpoint.createExecutorService(
configuration.getInteger(RestOptions.SERVER_NUM_THREADS),
configuration.getInteger(RestOptions.SERVER_THREAD_PRIORITY),
"DispatcherRestEndpoint");
// 10000L
final long updateInterval =
configuration.getLong(MetricOptions.METRIC_FETCHER_UPDATE_INTERVAL);
final MetricFetcher metricFetcher =
updateInterval == 0
? VoidMetricFetcher.INSTANCE
: MetricFetcherImpl.fromConfiguration(
configuration,
metricQueryServiceRetriever,
dispatcherGatewayRetriever,
executor);
// WEB UI 相关服务
webMonitorEndpoint =
restEndpointFactory.createRestEndpoint(
configuration,
dispatcherGatewayRetriever,
resourceManagerGatewayRetriever,
blobServer,
executor,
metricFetcher,
highAvailabilityServices.getClusterRestEndpointLeaderElectionService(),
fatalErrorHandler);
log.debug("Starting Dispatcher REST endpoint.");
webMonitorEndpoint.start();
// 获取主机名称
final String hostname = RpcUtils.getHostname(rpcService);
// 获取resourceManager
resourceManager =
resourceManagerFactory.createResourceManager(
configuration,
ResourceID.generate(),
rpcService,
highAvailabilityServices,
heartbeatServices,
fatalErrorHandler,
new ClusterInformation(hostname, blobServer.getPort()),
webMonitorEndpoint.getRestBaseUrl(),
metricRegistry,
hostname,
ioExecutor);
// 获取 history server 相关
final HistoryServerArchivist historyServerArchivist =
HistoryServerArchivist.createHistoryServerArchivist(
configuration, webMonitorEndpoint, ioExecutor);
final PartialDispatcherServices partialDispatcherServices =
new PartialDispatcherServices(
configuration,
highAvailabilityServices,
resourceManagerGatewayRetriever,
blobServer,
heartbeatServices,
() ->
MetricUtils.instantiateJobManagerMetricGroup(
metricRegistry, hostname),
archivedExecutionGraphStore,
fatalErrorHandler,
historyServerArchivist,
metricRegistry.getMetricQueryServiceGatewayRpcAddress(),
ioExecutor);
// 创建/启动 Dispatcher : dispatcher会创建和启动JobManager
log.debug("Starting Dispatcher.");
dispatcherRunner =
dispatcherRunnerFactory.createDispatcherRunner(
highAvailabilityServices.getDispatcherLeaderElectionService(),
fatalErrorHandler,
new HaServicesJobGraphStoreFactory(highAvailabilityServices),
ioExecutor,
rpcService,
partialDispatcherServices);
// 启动 ResourceManager
log.debug("Starting ResourceManager.");
resourceManager.start();
resourceManagerRetrievalService.start(resourceManagerGatewayRetriever);
dispatcherLeaderRetrievalService.start(dispatcherGatewayRetriever);
return new DispatcherResourceManagerComponent(
dispatcherRunner,
DefaultResourceManagerService.createFor(resourceManager),
dispatcherLeaderRetrievalService,
resourceManagerRetrievalService,
webMonitorEndpoint,
fatalErrorHandler);
} catch (Exception exception) {
// clean up all started components
if (dispatcherLeaderRetrievalService != null) {
try {
dispatcherLeaderRetrievalService.stop();
} catch (Exception e) {
exception = ExceptionUtils.firstOrSuppressed(e, exception);
}
}
if (resourceManagerRetrievalService != null) {
try {
resourceManagerRetrievalService.stop();
} catch (Exception e) {
exception = ExceptionUtils.firstOrSuppressed(e, exception);
}
}
final Collection<CompletableFuture<Void>> terminationFutures = new ArrayList<>(3);
if (webMonitorEndpoint != null) {
terminationFutures.add(webMonitorEndpoint.closeAsync());
}
if (resourceManager != null) {
terminationFutures.add(resourceManager.closeAsync());
}
if (dispatcherRunner != null) {
terminationFutures.add(dispatcherRunner.closeAsync());
}
final FutureUtils.ConjunctFuture<Void> terminationFuture =
FutureUtils.completeAll(terminationFutures);
try {
terminationFuture.get();
} catch (Exception e) {
exception = ExceptionUtils.firstOrSuppressed(e, exception);
}
throw new FlinkException(
"Could not create the DispatcherResourceManagerComponent.", exception);
}
}
三 . Dispatcher 相关
Dispatcher 主要有两个用途.
- 接收用户代码
- 创建/启动JobManager
- 具体的实现类是DispatcherRunner .
dispatcherRunnerFactory.createDispatcherRunner(
highAvailabilityServices.getDispatcherLeaderElectionService(),
fatalErrorHandler,
new HaServicesJobGraphStoreFactory(highAvailabilityServices),
ioExecutor,
rpcService,
partialDispatcherServices);
3.1. 构建
调用的入口是DefaultDispatcherResourceManagerComponentFactory#create.
由DispatcherRunner 实现.
DispatcherRunner 的构建是通过DefaultDispatcherRunnerFactory#createDispatcherRunner 作为入口创建的.
实现类是 : DefaultDispatcherRunner.create
DefaultDispatcherResourceManagerComponentFactory#create
3.2. 启动
启动的入口是DefaultDispatcherRunnerFactory#createDispatcherRunner 最终不断的跳转到了
DefaultDispatcherRunner#startNewDispatcherLeaderProcess方法
JobDispatcherLeaderProcess#onStart
private void startNewDispatcherLeaderProcess(UUID leaderSessionID) {
// 停止之前的DispatcherLeader 进程
stopDispatcherLeaderProcess();
// 构建新的 DispatcherLeader 进程
dispatcherLeaderProcess = createNewDispatcherLeaderProcess(leaderSessionID);
final DispatcherLeaderProcess newDispatcherLeaderProcess = dispatcherLeaderProcess;
// 启动
FutureUtils.assertNoException(
previousDispatcherLeaderProcessTerminationFuture.thenRun(
newDispatcherLeaderProcess::start));
}
调用JobDispatcherLeaderProcess#onStart 方法…
@Override
protected void onStart() {
// create
final DispatcherGatewayService dispatcherService =
dispatcherGatewayServiceFactory.create(
DispatcherId.fromUuid(getLeaderSessionId()),
Collections.singleton(jobGraph),
ThrowingJobGraphWriter.INSTANCE);
completeDispatcherSetup(dispatcherService);
}
最终由 DefaultDispatcherGatewayServiceFactory#create
创建/启动Dispatcher
public AbstractDispatcherLeaderProcess.DispatcherGatewayService create(
DispatcherId fencingToken,
Collection<JobGraph> recoveredJobs,
JobGraphWriter jobGraphWriter) {
// 定义Dispatcher
final Dispatcher dispatcher;
try {
//创建Dispatcher
dispatcher =
dispatcherFactory.createDispatcher(
rpcService,
fencingToken,
recoveredJobs,
(dispatcherGateway, scheduledExecutor, errorHandler) ->
new NoOpDispatcherBootstrap(),
PartialDispatcherServicesWithJobGraphStore.from(
partialDispatcherServices, jobGraphWriter));
} catch (Exception e) {
throw new FlinkRuntimeException("Could not create the Dispatcher rpc endpoint.", e);
}
// 启动 Dispatcher : Dispatcher#OnStart
// 1. 接收用户作业
// 2. 启动JobMaster
dispatcher.start();
return DefaultDispatcherGatewayService.from(dispatcher);
}
四 .JobManager 相关
4.1. 启动
JobManager是由Dispatcher的onStart方法作为入口,进行启动的.
执行的方法为 : Dispatcher#startRecoveredJobs
private void startRecoveredJobs() {
// 处理需要恢复的Job
for (flinkFlink 1.12.2 源码浅析 : Task 浅析
flinkFlink 1.12.2 源码浅析 : Task数据输入
flinkFlink 1.12.2 源码浅析 :Task数据输出
flinkFlink 1.12.2 源码浅析 : yarn-per-job模式解析 TaskMasger 启动
flinkFlink 1.12.2 源码浅析 : yarn-per-job模式解析 yarn 提交过程解析
flinkFlink 1.12.2 源码浅析 : yarn-per-job模式解析 JobMasger启动 YarnJobClusterEntrypoint