1. spark-2.4.6源码分析(基于yarn cluster模式)-任务提交

Posted Leo Han

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了1. spark-2.4.6源码分析(基于yarn cluster模式)-任务提交相关的知识,希望对你有一定的参考价值。

首先声明,这个系列研究的源码基于spark-2.4.6
我们在使用spark-shell提交任务的时候,一般采用如下模式提交任务:

park-submit
    --class xxxxx
    --name 'test_xxxx'
    --master yarn-cluster
    --queue yarn-test
    --principal ad-bigdata-test --keytab 'xxxx.keytab'
    --num-executors 30
    --driver-memory 8g
    --executor-memory 40g
    --executor-cores 20
    --conf spark.task.maxFailures=0
    --conf spark.memory.fraction=0.8
    --conf spark.storage.memoryFraction=0.2
    --conf spark.default.parallelism=600
    --conf spark.sql.shuffle.partitions=2400
    --conf spark.yarn.executor.memoryOverhead=2048
    --conf spark.executor.heartbeatInterval=100

当我们提交后,spark集群收到指令后,首先用SparkSubmit来接收来自命令行的请求:

def doSubmit(args: Array[String]): Unit = {
    val uninitLog = initializeLogIfNecessary(true, silent = true)
    val appArgs = parseArguments(args)
    if (appArgs.verbose) {
      logInfo(appArgs.toString)
    }
    appArgs.action match {
      case SparkSubmitAction.SUBMIT => submit(appArgs, uninitLog)
      case SparkSubmitAction.KILL => kill(appArgs)
      case SparkSubmitAction.REQUEST_STATUS => requestStatus(appArgs)
      case SparkSubmitAction.PRINT_VERSION => printVersion()
    }
  }

首先会调用parseArguments来处理命令行中的参数,最终参数都会解析在SparkSubmitArguments中,最终会调用runMain方法:

private def runMain(args: SparkSubmitArguments, uninitLog: Boolean): Unit = {
    val (childArgs, childClasspath, sparkConf, childMainClass) = prepareSubmitEnvironment(args)
    // Let the main class re-initialize the logging system once it starts.
    if (uninitLog) {
      Logging.uninitialize()
    }

    if (args.verbose) {
      logInfo(s"Main class:\\n$childMainClass")
      logInfo(s"Arguments:\\n${childArgs.mkString("\\n")}")
      // sysProps may contain sensitive information, so redact before printing
      logInfo(s"Spark config:\\n${Utils.redact(sparkConf.getAll.toMap).mkString("\\n")}")
      logInfo(s"Classpath elements:\\n${childClasspath.mkString("\\n")}")
      logInfo("\\n")
    }

    val loader =
      if (sparkConf.get(DRIVER_USER_CLASS_PATH_FIRST)) {
        new ChildFirstURLClassLoader(new Array[URL](0),
          Thread.currentThread.getContextClassLoader)
      } else {
        new MutableURLClassLoader(new Array[URL](0),
          Thread.currentThread.getContextClassLoader)
      }
    Thread.currentThread.setContextClassLoader(loader)

    for (jar <- childClasspath) {
      addJarToClasspath(jar, loader)
    }

    var mainClass: Class[_] = null

    try {
      mainClass = Utils.classForName(childMainClass)
    } catch {
      case e: ClassNotFoundException =>
        logWarning(s"Failed to load $childMainClass.", e)
        if (childMainClass.contains("thriftserver")) {
          logInfo(s"Failed to load main class $childMainClass.")
          logInfo("You need to build Spark with -Phive and -Phive-thriftserver.")
        }
        throw new SparkUserAppException(CLASS_NOT_FOUND_EXIT_STATUS)
      case e: NoClassDefFoundError =>
        logWarning(s"Failed to load $childMainClass: ${e.getMessage()}")
        if (e.getMessage.contains("org/apache/hadoop/hive")) {
          logInfo(s"Failed to load hive class.")
          logInfo("You need to build Spark with -Phive and -Phive-thriftserver.")
        }
        throw new SparkUserAppException(CLASS_NOT_FOUND_EXIT_STATUS)
    }

    val app: SparkApplication = if (classOf[SparkApplication].isAssignableFrom(mainClass)) {
      mainClass.newInstance().asInstanceOf[SparkApplication]
    } else {
      // SPARK-4170
      if (classOf[scala.App].isAssignableFrom(mainClass)) {
        logWarning("Subclasses of scala.App may not work correctly. Use a main() method instead.")
      }
      new JavaMainApplication(mainClass)
    }

 ....
    try {
      app.start(childArgs.toArray, sparkConf)
    } catch {
    }
  }

prepareSubmitEnvironment方法中,我们回针对传入的参数进行相关Spark应用启动的准备,其中,会判断spark的运行类型,如果是yarn cluster,则对饮的启动类为:

org.apache.spark.deploy.yarn.YarnClusterApplication

这里我们得到其实就是YarnClusterApplication,
prepareSubmitEnvironment方法,返回值如下:


 val (childArgs, childClasspath, sparkConf, childMainClass) = prepareSubmitEnvironment(args)

这里childMainClassYarnClusterApplicationchildArgs会将解析到的SparkSubmitArguments形成Seq,传给YarnClusterApplication,同时,我们启动命令行中传入的class会通过如下形式传入到后面:

childArgs += ("--class", args.mainClass)`java

然后判断通过prepareSubmitEnvironment得到的childMainClass是否是SparkApplication的子类,YarnClusterApplication是其子类

private[spark] class YarnClusterApplication extends SparkApplication {

 override def start(args: Array[String], conf: SparkConf): Unit = {
   conf.remove("spark.jars")
   conf.remove("spark.files")

   new Client(new ClientArguments(args), conf).run()
 }

}

然后启动YarnClusterApplication,我们看到,这里是直接调用了org.apache.spark.deploy.yarn.Client.run()方法,这个我们放在下一个节讲,这下面的就是之前说的,YARN客户端提交任务了,可以参考之前的内容: Yarn整体架构,客户端编程

以上是关于1. spark-2.4.6源码分析(基于yarn cluster模式)-任务提交的主要内容,如果未能解决你的问题,请参考以下文章

2. spark-2.4.6源码分析(基于yarn cluster模式)-YARN client启动,提交ApplicationMaster

4. spark-2.4.6源码分析(基于yarn cluster模式)- SparkContext启动

3. spark-2.4.6源码分析(基于yarn cluster模式)-YARN contaienr启动-CoarseGrainedExecutorBackend

5. spark-2.4.6源码分析(基于yarn cluster模式)- job任务提交Stage划分Stage提交

7. spark源码分析(基于yarn cluster模式)- Task划分提交

8. spark源码分析(基于yarn cluster模式)- Task执行,Map端写入实现