深入理解spark-DAGscheduler源码分析(下)

Posted yankang

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了深入理解spark-DAGscheduler源码分析(下)相关的知识,希望对你有一定的参考价值。

 

上篇中已经分析了DAGscheduler的监听机制,以及job的划分,这次我们再来看一看stage是如何划分以及stage的最终提交;

 

当jobsubmit 加入到DAGscheduler的event队列中的时候,

就会将job的stage划分为resultstage 和 shufflestage,其中一个job只会有一个resultstage;

 

DAGScheduler#handleJobSubmitted

 stage的划分上,首先从最后一个stage开始,最先创建一个resultstage,然后依次向前递归实现stage的划分。

private[scheduler] def handleJobSubmitted(jobId: Int,
      finalRDD: RDD[_],
      func: (TaskContext, Iterator[_]) => _,
      partitions: Array[Int],
      callSite: CallSite,
      listener: JobListener,
      properties: Properties) {
    var finalStage: ResultStage = null
    try {
      // New stage creation may throw an exception if, for example, jobs are run on a
      // HadoopRDD whose underlying HDFS files have been deleted.
      // Stage划分过程是从最后一个Stage开始往前执行的,最后一个Stage的类型是ResultStage
      finalStage = newResultStage(finalRDD, func, partitions, jobId, callSite)
    } catch {
      case e: Exception =>
        logWarning("Creating new stage failed due to exception - job: " + jobId, e)
        listener.jobFailed(e)
        return
    }
    //为该Job生成一个ActiveJob对象,并准备计算这个finalStage
    val job = new ActiveJob(jobId, finalStage, callSite, listener, properties)
    clearCacheLocs()
    logInfo("Got job %s (%s) with %d output partitions".format(
      job.jobId, callSite.shortForm, partitions.length))
    logInfo("Final stage: " + finalStage + " (" + finalStage.name + ")")
    logInfo("Parents of final stage: " + finalStage.parents)
    logInfo("Missing parents: " + getMissingParentStages(finalStage))

    val jobSubmissionTime = clock.getTimeMillis()
    jobIdToActiveJob(jobId) = job // 该job进入active状态
    activeJobs += job
    finalStage.setActiveJob(job) 
    val stageIds = jobIdToStageIds(jobId).toArray
    val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo))
    listenerBus.post( // 向LiveListenerBus发送Job提交事件
      SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties))
    submitStage(finalStage)   //提交当前Stage

    submitWaitingStages()
  }

  

 

DAGScheduler#newResultStage

在划分中,根据创建的resultstage,去获取result的parentstage进行递归调用;

private def newResultStage(
      rdd: RDD[_],
      func: (TaskContext, Iterator[_]) => _,
      partitions: Array[Int],
      jobId: Int,
      callSite: CallSite): ResultStage = {
    // 获取当前Stage的parent Stage,这个方法是划分Stage的核心实现 (递归调用实现)
    val (parentStages: List[Stage], id: Int) = getParentStagesAndId(rdd, jobId)
    val stage = new ResultStage(id, rdd, func, partitions, parentStages, jobId, callSite)// 创建当前最后的ResultStage
    stageIdToStage(id) = stage // 将ResultStage与stageId相关联
    updateJobIdStageIdMaps(jobId, stage) // 更新该job中包含的stage
    stage
  }

 

DAGScheduler#getParentStagesAndId

递归调用的终点,获取parentstage 和 stageid 的结果返回,由于这个是由后向前的递归调用(使用广度优先策略),那么最先执行的stageid 则是最小的0

private def getParentStagesAndId(rdd: RDD[_], firstJobId: Int): (List[Stage], Int) = {
    val parentStages = getParentStages(rdd, firstJobId) // 传入rdd和jobId,生成parentStage
    // 生成当前stage的stageId。同一Application中Stage初始编号为0
    val id = nextStageId.getAndIncrement() 
    (parentStages, id)
  }

  

DAGScheduler#getParentStages

private def getParentStages(rdd: RDD[_], firstJobId: Int): List[Stage] = {
    val parents = new HashSet[Stage] // 存储当前stage的所有parent stage
    val visited = new HashSet[RDD[_]] // 存储访问过的rdd
    // We are manually maintaining a stack here to prevent StackOverflowError
    // caused by recursively visiting
    val waitingForVisit = new Stack[RDD[_]] // 一个栈,保存未访问过的rdd,先进后出
    def visit(r: RDD[_]) {
      if (!visited(r)) { // 如果栈中弹出的rdd被未访问过
        visited += r // 首先将其标记为已访问
        // Kind of ugly: need to register RDDs with the cache here since
        // we can‘t do it in its constructor because # of partitions is unknown
        for (dep <- r.dependencies) { // 读取当然rdd的依赖
          dep match {
            case shufDep: ShuffleDependency[_, _, _] => // 如果是宽依赖,则获取依赖rdd所在的ShuffleMapStage
              parents += getShuffleMapStage(shufDep, firstJobId)
            case _ =>
              // 如果是窄依赖,将依赖的rdd也压入栈中,下次循环时会探索该rdd的依赖情况,直到找到款依赖划分新的stage为止
              waitingForVisit.push(dep.rdd) 
          }
        }
      }
    }
    waitingForVisit.push(rdd) // 将当前rdd压入栈中
    while (waitingForVisit.nonEmpty) { // 如果栈中有未被访问的rdd
      visit(waitingForVisit.pop()) // 
    }
    parents.toList
  }

 

 

 

 

 

参考资料:https://blog.csdn.net/dabokele/article/details/51902617

以上是关于深入理解spark-DAGscheduler源码分析(下)的主要内容,如果未能解决你的问题,请参考以下文章

Spark核心作业调度和任务调度之DAGScheduler源码

spark--job和DAGScheduler源码

Spark 源码解读SparkContext的初始化之创建和启动DAGScheduler

Spark源码剖析——SparkContext的初始化_创建和启动DAGScheduler

《深入理解SPARK:核心思想与源码分析》——SparkContext的初始化(中)

小记--------spark ——AGScheduler源码分析