2. spark-2.4.6源码分析(基于yarn cluster模式)-YARN client启动,提交ApplicationMaster
Posted Leo Han
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了2. spark-2.4.6源码分析(基于yarn cluster模式)-YARN client启动,提交ApplicationMaster相关的知识,希望对你有一定的参考价值。
通过上一节的研究,我们发现,spark提交任务后,启动了org.apache.spark.deploy.yarn.Client.run()
方法,这里实际上就是spark利用yanr的客户端API在向YARN提交任务了,接下来我们一起来分析这个过程:
def run(): Unit = {
this.appId = submitApplication()
if (!launcherBackend.isConnected() && fireAndForget) {
val report = getApplicationReport(appId)
val state = report.getYarnApplicationState
logInfo(s"Application report for $appId (state: $state)")
logInfo(formatReportDetails(report))
if (state == YarnApplicationState.FAILED || state == YarnApplicationState.KILLED) {
throw new SparkException(s"Application $appId finished with status: $state")
}
} else {
val YarnAppReport(appState, finalState, diags) = monitorApplication(appId)
if (appState == YarnApplicationState.FAILED || finalState == FinalApplicationStatus.FAILED) {
diags.foreach { err =>
logError(s"Application diagnostics message: $err")
}
...
}
}
}
可以看到,首先是利用YARNClient,向YARN提交应用,获取到了应用ID,过程如下:
def submitApplication(): ApplicationId = {
var appId: ApplicationId = null
try {
launcherBackend.connect()
yarnClient.init(hadoopConf)
yarnClient.start()
logInfo("Requesting a new application from cluster with %d NodeManagers"
.format(yarnClient.getYarnClusterMetrics.getNumNodeManagers))
// Get a new application from our RM
val newApp = yarnClient.createApplication()
val newAppResponse = newApp.getNewApplicationResponse()
appId = newAppResponse.getApplicationId()
new CallerContext("CLIENT", sparkConf.get(APP_CALLER_CONTEXT),
Option(appId.toString)).setCurrentContext()
// Verify whether the cluster has enough resources for our AM
verifyClusterResources(newAppResponse)
// Set up the appropriate contexts to launch our AM
val containerContext = createContainerLaunchContext(newAppResponse)
val appContext = createApplicationSubmissionContext(newApp, containerContext)
// Finally, submit and monitor the application
logInfo(s"Submitting application $appId to ResourceManager")
yarnClient.submitApplication(appContext)
launcherBackend.setAppId(appId.toString)
reportLauncherState(SparkAppHandle.State.SUBMITTED)
appId
} catch {...
}
这里首先会启动LauncherBackend
线程,该线程会和spark集群建立TCP连接,建立连接之后,接下来会创建YARN应用,然后创建ContainerLaunchContext
提交给YARN,在
createContainerLaunchContext
方法中:
val amClass =
if (isClusterMode) {
Utils.classForName("org.apache.spark.deploy.yarn.ApplicationMaster").getName
} else {
Utils.classForName("org.apache.spark.deploy.yarn.ExecutorLauncher").getName
}
val amClass =
if (isClusterMode) {
Utils.classForName("org.apache.spark.deploy.yarn.ApplicationMaster").getName
} else {
Utils.classForName("org.apache.spark.deploy.yarn.ExecutorLauncher").getName
}
if (args.primaryRFile != null && args.primaryRFile.endsWith(".R")) {
args.userArgs = ArrayBuffer(args.primaryRFile) ++ args.userArgs
}
val userArgs = args.userArgs.flatMap { arg =>
Seq("--arg", YarnSparkHadoopUtil.escapeForShell(arg))
}
val amArgs =
Seq(amClass) ++ userClass ++ userJar ++ primaryPyFile ++ primaryRFile ++ userArgs ++
Seq("--properties-file", buildPath(Environment.PWD.$$(), LOCALIZED_CONF_DIR, SPARK_CONF_FILE))
// Command for the ApplicationMaster
val commands = prefixEnv ++
Seq(Environment.JAVA_HOME.$$() + "/bin/java", "-server") ++
javaOpts ++ amArgs ++
Seq(
"1>", ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout",
"2>", ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr")
// TODO: it would be nicer to just make sure there are no null commands here
val printableCommands = commands.map(s => if (s == null) "null" else s).toList
amContainer.setCommands(printableCommands.asJava)
可以发现,我们通过YARN,发送给RresourManager执行的类是org.apache.spark.deploy.yarn.ApplicationMaster
另外在createApplicationSubmissionContext
,获取到ApplicationSubmissionContext
将会设置好需要的Container的内存和CPU容量,但是需要注意的是在cluster模式下,这里申请的内存和CPU是Driver的,并不是executor的,drvier的cpu不指定的话,默认就是1.
再回到run方法中,由于已经启动了launcherBackend
,这时候进入else分支:
val YarnAppReport(appState, finalState, diags) = monitorApplication(appId)
if (appState == YarnApplicationState.FAILED || finalState == FinalApplicationStatus.FAILED) {
diags.foreach { err =>
logError(s"Application diagnostics message: $err")
}
throw new SparkException(s"Application $appId finished with failed status")
}
if (appState == YarnApplicationState.KILLED || finalState == FinalApplicationStatus.KILLED) {
throw new SparkException(s"Application $appId is killed")
}
if (finalState == FinalApplicationStatus.UNDEFINED) {
throw new SparkException(s"The final status of application $appId is undefined")
}
而monitorApplication
则是以while死循环,一直在获取YarnAppReport
监控应用,到这里YARN client端就已经启动了,下一节我们分析ApplicationMaster启动。
以上是关于2. spark-2.4.6源码分析(基于yarn cluster模式)-YARN client启动,提交ApplicationMaster的主要内容,如果未能解决你的问题,请参考以下文章
1. spark-2.4.6源码分析(基于yarn cluster模式)-任务提交
4. spark-2.4.6源码分析(基于yarn cluster模式)- SparkContext启动
3. spark-2.4.6源码分析(基于yarn cluster模式)-YARN contaienr启动-CoarseGrainedExecutorBackend
5. spark-2.4.6源码分析(基于yarn cluster模式)- job任务提交Stage划分Stage提交