spark提交参数解析

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了spark提交参数解析相关的知识,希望对你有一定的参考价值。

参考技术A 上一节学习了 Spark 源码的编译方法,这一节我们跟踪一下spark-shell的启动.

spark-shell是spark提供一个控制台,通过它我们可以方便的学习spark的API,类似于 Scala 的REPL.

spark-shell在/bin目录下,下面简单分析一下spark-shell的启动。

spark-shell--------->spark-submit------------->spark-class

这是spark-shell启动时依次调用的三个shell脚本文件,最终在spark-class脚本里加载主类并执行。

1.spark-shell脚本主要做的工作是收集spark-submit相关参数,最后调用spark-submit

/bin/spark-submit --class org.apache.spark.repl.Main "$SUBMISSION_OPTS[@]" spark-shell "$APPLICATION_OPTS[@]

这是最终执行的命令,可以看到主要是收集submit参数和application参数到SUBMISSION_OPTS和APPLICATION_OPTS里,并传递给spark-submit.

2.spark-submit根据上一个脚本收集到的参数设置一些必要的环境变量,最后调用spark-class

exec "$SPARK_HOME"/bin/spark-class org.apache.spark.deploy.SparkSubmit "$ORIG_ARGS[@]"

可以看到在spark-shell传递的参数基础上,加上了主类org.apache.spark.deploy.SparkSubmit.这是真正的入口类。

3.spark-class主要是准备JVM环境相关的参数,并最终执行

exec "$RUNNER" -cp "$CLASSPATH" $JAVA_OPTS "$@"

其中RUNNER是$JAVA_HOME/bin/ Java

$CLASSPATH是spark-class收集的必要的CLASSPATH信息,包括设置spark自己的相关classpath。将所有参数展开后实际最终执行的命令是:

/usr/lib/java/jdk1.7.0_15/bin/java-cp/usr/local/spark/src/spark-1.3.1/spark-1.3.1/conf:/usr/local/spark/src/spark-1.3.1/spark-1.3.1/assembly/target/scala-2.10/spark-assembly-1.3.1-hadoop2.4.0.jar:/usr/local/spark/src/spark-1.3.1/spark-1.3.1/lib_managed/jars/datanucleus-core-3.2.10.jar:/usr/local/spark/src/spark-1.3.1/spark-1.3.1/lib_managed/jars/datanucleus-rdbms-3.2.9.jar:/usr/local/spark/src/spark-1.3.1/spark-1.3.1/lib_managed/jars/datanucleus-api-jdo-3.2.6.jar-XX:MaxPermSize=128m   -Dscala.usejavacp=true -Xms512m -Xmx512morg.apache.spark.deploy.SparkSubmit --class org.apache.spark.repl.Main spark-shell

可以看到,红色部分是CLASSPATH,绿色部分是JVM配置,紫色部分是最终加载的类和参数。

spark-shell启动分为三个脚本文件,体现了模块化的思想。不同脚本做不同方面的事,这样方便调试 测试 和修改。

我们在spark-class里稍做修改,在最终执行命令行前,加入以下内容:

export JAVA_OPTS="-XX:MaxPermSize=128m $OUR_JAVA_OPTS -Dcom.sun.management.jmxremote.port=8300 -Dcom.sun.management.jmxremote.authenticate=false

-Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=127.0.0.1"

配置这些信息,是为了使用jvisualvm跟踪调试。

然后启动spark-shell.启动完成后,启动$JAVA_HOME/bin/jvisualvm,可以看到spark-shell启动的栈。

Spark技术内幕: Task向Executor提交的源代码解析

在上文《Spark技术内幕:Stage划分及提交源代码分析》中,我们分析了Stage的生成和提交。可是Stage的提交,仅仅是DAGScheduler完毕了对DAG的划分,生成了一个计算拓扑,即须要依照顺序计算的Stage,Stage中包括了能够以partition为单位并行计算的Task。我们并没有分析Stage中得Task是怎样生成而且终于提交到Executor中去的。

这就是本文的主题。

从org.apache.spark.scheduler.DAGScheduler#submitMissingTasks開始,分析Stage是怎样生成TaskSet的。

假设一个Stage的全部的parent stage都已经计算完毕或者存在于cache中。那么他会调用submitMissingTasks来提交该Stage所包括的Tasks。

org.apache.spark.scheduler.DAGScheduler#submitMissingTasks的计算流程例如以下:

  1. 首先得到RDD中须要计算的partition,对于Shuffle类型的stage。须要推断stage中是否缓存了该结果;对于Result类型的Final Stage。则推断计算Job中该partition是否已经计算完毕。
  2. 序列化task的binary。Executor能够通过广播变量得到它。每一个task执行的时候首先会反序列化。这样在不同的executor上执行的task是隔离的,不会相互影响。
  3. 为每一个须要计算的partition生成一个task:对于Shuffle类型依赖的Stage,生成ShuffleMapTask类型的task;对于Result类型的Stage,生成一个ResultTask类型的task
  4. 确保Task是能够被序列化的。由于不同的cluster有不同的taskScheduler,在这里推断能够简化逻辑。保证TaskSet的task都是能够序列化的
  5. 通过TaskScheduler提交TaskSet。

TaskSet就是能够做pipeline的一组全然同样的task,每一个task的处理逻辑全然同样。不同的是处理数据。每一个task负责处理一个partition。

pipeline。能够称为大数据处理的基石。仅仅有数据进行pipeline处理,才干将其放到集群中去执行。

对于一个task来说,它从数据源获得逻辑。然后依照拓扑顺序,顺序执行(实际上是调用rdd的compute)。

TaskSet是一个数据结构,存储了这一组task:
private[spark] class TaskSet(
    val tasks: Array[Task[_]],
    val stageId: Int,
    val attempt: Int,
    val priority: Int,
    val properties: Properties) {
    val id: String = stageId + "." + attempt

  override def toString: String = "TaskSet " + id
}


管理调度这个TaskSet的时org.apache.spark.scheduler.TaskSetManager。TaskSetManager会负责task的失败重试。跟踪每一个task的执行状态。处理locality-aware的调用。
具体的调用堆栈例如以下:
  1. org.apache.spark.scheduler.TaskSchedulerImpl#submitTasks
  2. org.apache.spark.scheduler.SchedulableBuilder#addTaskSetManager
  3. org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend#reviveOffers
  4. org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend.DriverActor#makeOffers
  5. org.apache.spark.scheduler.TaskSchedulerImpl#resourceOffers
  6. org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend.DriverActor#launchTasks
  7. org.apache.spark.executor.CoarseGrainedExecutorBackend.receiveWithLogging#launchTask
  8. org.apache.spark.executor.Executor#launchTask

首先看一下org.apache.spark.executor.Executor#launchTask:
  def launchTask(
      context: ExecutorBackend, taskId: Long, taskName: String, serializedTask: ByteBuffer) {
    val tr = new TaskRunner(context, taskId, taskName, serializedTask)
    runningTasks.put(taskId, tr)
    threadPool.execute(tr) // 開始在executor中执行
  }


TaskRunner会从序列化的task中反序列化得到task。这个须要看 org.apache.spark.executor.Executor.TaskRunner#run 的实现:task.run(taskId.toInt)。而task.run的实现是:
 final def run(attemptId: Long): T = {
    context = new TaskContext(stageId, partitionId, attemptId, runningLocally = false)
    context.taskMetrics.hostname = Utils.localHostName()
    taskThread = Thread.currentThread()
    if (_killed) {
      kill(interruptThread = false)
    }
    runTask(context)
  }

对于原来提到的两种Task,即
  1.  org.apache.spark.scheduler.ShuffleMapTask
  2.  org.apache.spark.scheduler.ResultTask
分别实现了不同的runTask:
org.apache.spark.scheduler.ResultTask#runTask即顺序调用rdd的compute,通过rdd的拓扑顺序依次对partition进行计算:
  override def runTask(context: TaskContext): U = {
    // Deserialize the RDD and the func using the broadcast variables.
    val ser = SparkEnv.get.closureSerializer.newInstance()
    val (rdd, func) = ser.deserialize[(RDD[T], (TaskContext, Iterator[T]) => U)](
      ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)

    metrics = Some(context.taskMetrics)
    try {
      func(context, rdd.iterator(partition, context))
    } finally {
      context.markTaskCompleted()
    }
  }


而org.apache.spark.scheduler.ShuffleMapTask#runTask则是写shuffle的结果。

  override def runTask(context: TaskContext): MapStatus = {
    // Deserialize the RDD using the broadcast variable.
    val ser = SparkEnv.get.closureSerializer.newInstance()
    val (rdd, dep) = ser.deserialize[(RDD[_], ShuffleDependency[_, _, _])](
      ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)
      //此处的taskBinary即为在org.apache.spark.scheduler.DAGScheduler#submitMissingTasks序列化的task的广播变量取得的

    metrics = Some(context.taskMetrics)
    var writer: ShuffleWriter[Any, Any] = null
    try {
      val manager = SparkEnv.get.shuffleManager
      writer = manager.getWriter[Any, Any](dep.shuffleHandle, partitionId, context)
      writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]]) // 将rdd计算的结果写入memory或者disk
      return writer.stop(success = true).get
    } catch {
      case e: Exception =>
        if (writer != null) {
          writer.stop(success = false)
        }
        throw e
    } finally {
      context.markTaskCompleted()
    }
  }


这两个task都不要依照拓扑顺序调用rdd的compute来完毕对partition的计算。不同的是ShuffleMapTask须要shuffle write。以供child stage读取shuffle的结果。

对于这两个task都用到的taskBinary,即为在org.apache.spark.scheduler.DAGScheduler#submitMissingTasks序列化的task的广播变量取得的。


通过上述几篇博文,实际上我们已经粗略的分析了从用户定义SparkContext開始。集群是假设为每一个Application分配Executor的,回想一下这个序列图:
技术分享图片
还有就是用户触发某个action,集群是怎样生成DAG,假设将DAG划分为能够成Stage,已经Stage是怎样将这些能够pipeline执行的task提交到Executor去执行的。当然了,具体细节还是很值得推敲的。

以后的每一个周末。都会奉上某个细节的实现。

歇息了。明天又会開始忙碌的一周。



以上是关于spark提交参数解析的主要内容,如果未能解决你的问题,请参考以下文章

spark on yarn的提交过程

spark on yarn的提交过程

Spark集群任务提交流程----2.1.0源码解析

使用SparkR的Sparklyr:麻烦解析函数参数

Scala零基础教学61-80

ChunJun任务提交-源码分析