Flink源码四YarnJobClusterEntrypoint

Posted 持枢

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flink源码四YarnJobClusterEntrypoint相关的知识,希望对你有一定的参考价值。

一、YarnJobClusterEntrypoint

进入main方法

SignalHandler.register(LOG);
		JvmShutdownSafeguard.installAsShutdownHook(LOG);

		Map<String, String> env = System.getenv();

		final String workingDirectory = env.get(ApplicationConstants.Environment.PWD.key());
		Preconditions.checkArgument(
			workingDirectory != null,
			"Working directory variable (%s) not set",
			ApplicationConstants.Environment.PWD.key());

		try {
			YarnEntrypointUtils.logYarnEnvironmentInformation(env, LOG);
		} catch (IOException e) {
			LOG.warn("Could not log YARN environment information.", e);
		}

		final Configuration dynamicParameters = ClusterEntrypointUtils.parseParametersOrExit(
			args,
			new DynamicParametersConfigurationParserFactory(),
			YarnJobClusterEntrypoint.class);
		final Configuration configuration = YarnEntrypointUtils.loadConfiguration(workingDirectory, dynamicParameters, env);

		YarnJobClusterEntrypoint yarnJobClusterEntrypoint = new YarnJobClusterEntrypoint(configuration);
          //执行程序的入口
		ClusterEntrypoint.runClusterEntrypoint(yarnJobClusterEntrypoint);

  

ClusterEntrypoint.runClusterEntrypoint(yarnJobClusterEntrypoint);

clusterEntrypoint.startCluster();

securityContext.runSecured((Callable<Void>) () -> {
runCluster(configuration, pluginManager);

return null;
});


synchronized (lock) {
  //初始化服务rpc相关
initializeServices(configuration, pluginManager);

// write host information into configuration
configuration.setString(JobManagerOptions.ADDRESS, commonRpcService.getAddress());
configuration.setInteger(JobManagerOptions.PORT, commonRpcService.getPort());

final DispatcherResourceManagerComponentFactory dispatcherResourceManagerComponentFactory = createDispatcherResourceManagerComponentFactory(configuration);

//创建ResourceManage,创建、启动Dispatcher,启动ResourceManage
clusterComponent = dispatcherResourceManagerComponentFactory.create(
configuration,
ioExecutor,
commonRpcService,
haServices,
blobServer,
heartbeatServices,
metricRegistry,
archivedExecutionGraphStore,
new RpcMetricQueryServiceRetriever(metricRegistry.getMetricQueryServiceRpcService()),
this);

clusterComponent.getShutDownFuture().whenComplete(
(ApplicationStatus applicationStatus, Throwable throwable) -> {
if (throwable != null) {
shutDownAsync(
ApplicationStatus.UNKNOWN,
ExceptionUtils.stringifyException(throwable),
false);
} else {
// This is the general shutdown path. If a separate more specific shutdown was
// already triggered, this will do nothing
shutDownAsync(
applicationStatus,
null,
true);
}
});
}

  创建ResourceMange、Dispatcher,并启动

clusterComponent = dispatcherResourceManagerComponentFactory.create(
				configuration,
				ioExecutor,
				commonRpcService,
				haServices,
				blobServer,
				heartbeatServices,
				metricRegistry,
				archivedExecutionGraphStore,
				new RpcMetricQueryServiceRetriever(metricRegistry.getMetricQueryServiceRpcService()),
				this); 

具体实现:网页的打开

webMonitorEndpoint = restEndpointFactory.createRestEndpoint(
				configuration,
				dispatcherGatewayRetriever,
				resourceManagerGatewayRetriever,
				blobServer,
				executor,
				metricFetcher,
				highAvailabilityServices.getClusterRestEndpointLeaderElectionService(),
				fatalErrorHandler);

			log.debug("Starting Dispatcher REST endpoint.");
			webMonitorEndpoint.start();

 resourceManage的启动

resourceManager = resourceManagerFactory.createResourceManager(
				configuration,
				ResourceID.generate(),
				rpcService,
				highAvailabilityServices,
				heartbeatServices,
				fatalErrorHandler,
				new ClusterInformation(hostname, blobServer.getPort()),
				webMonitorEndpoint.getRestBaseUrl(),
				metricRegistry,
				hostname,
				ioExecutor);

  创建Dispatcher

dispatcherRunner = dispatcherRunnerFactory.createDispatcherRunner(
				highAvailabilityServices.getDispatcherLeaderElectionService(),
				fatalErrorHandler,
				new HaServicesJobGraphStoreFactory(highAvailabilityServices),
				ioExecutor,
				rpcService,
				partialDispatcherServices);

  选举服务:每个组件都有选举服务,最终要调用这个

contender.grantLeadership(HighAvailabilityServices.DEFAULT_LEADER_ID);

  具体实现:

 

 最终执行这个东西:lamda表达式

previousDispatcherLeaderProcessTerminationFuture.thenRun(newDispatcherLeaderProcess::start));

  start的实现

 

 

runIfStateIs(
			State.CREATED,
			this::startInternal);

  

final DispatcherGatewayService dispatcherService = dispatcherGatewayServiceFactory.create(
			DispatcherId.fromUuid(getLeaderSessionId()),
			Collections.singleton(jobGraph),
			ThrowingJobGraphWriter.INSTANCE);

  最终启动dispatcher,并启动

final Dispatcher dispatcher;
		try {
			dispatcher = dispatcherFactory.createDispatcher(
				rpcService,
				fencingToken,
				recoveredJobs,
				(dispatcherGateway, scheduledExecutor, errorHandler) -> new NoOpDispatcherBootstrap(),
				PartialDispatcherServicesWithJobGraphStore.from(partialDispatcherServices, jobGraphWriter));
		} catch (Exception e) {
			throw new FlinkRuntimeException("Could not create the Dispatcher rpc endpoint.", e);
		}

		dispatcher.start();

  最终rpc调用,akka组件通信onStart方法

rpcServer.start();

  

 

以上是关于Flink源码四YarnJobClusterEntrypoint的主要内容,如果未能解决你的问题,请参考以下文章

Flink1.15源码解析--启动JobManager

Flink1.15源码解析--启动JobManager

Flink SQL Window源码全解析

flink exectly-once系列之两阶段提交概述

flinkFlink 1.12.2 源码浅析 : yarn-per-job模式解析 TaskMasger 启动

Flink源码系列Flink 源码分析之 Client 端启动流程分析