stage划分算法

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了stage划分算法相关的知识,希望对你有一定的参考价值。

stage划分算法总结

  1. 最后一个RDD创建finalstage   

  2. finalstage倒推

  3. 通过宽依赖,来进行新的stage划分

  4. 使用递归,依次提交stage,从父stage开始

源码 org.apache.spark.scheduler包下

stage划分算法由 submitStage和getMissingParentStages方法组成

第一步:使用触发job的最后一个RDD,创建finalstage,传入到newstage方法中

  var finalStage: Stage = null

  //创建一个stage对象,并且将stage加入到DAGscheduler中

  finalStage = newStage(finalRDD, partitions.size, None, jobId, callSite)

  第二步:用finalstage创建一个job,也就是说,这个job的最后一个stage,当然就是finalstage

  val job = new ActiveJob(jobId, finalStage, func, partitions, callSite, listener, properties)

   第三步:将job加入到内存缓存中

   jobIdToActiveJob(jobId) = job

  activeJobs += job

   finalStage.resultOfJob = Some(job)

   val stageIds = jobIdToStageIds(jobId).toArray

   val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo))

   listenerBus.post(

  SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties))

第四步:使用submitStage方法提交finalstage(尝试)

   submitStage(finalStage)

  //调用getMissingParentStages方法,去获取当前stage的父stage

   val missing = getMissingParentStages(stage).sortBy(_.id)

  //首先往栈中,推入了最后一个RDD

    waitingForVisit.push(stage.rdd)

//然后进行while循环,调用自己内部定义的visit()方法

    while (!waitingForVisit.isEmpty) {

      visit(waitingForVisit.pop())

    }

在visit()方法内,遍历RDD的依赖

for (dep <- rdd.dependencies) 

如果是窄依赖,那么将依赖的RDD放入栈中

case narrowDep: NarrowDependency[_] =>

                waitingForVisit.push(narrowDep.rdd)

如果是宽依赖,那么使用依赖的RDD创建一个新的stage,并且会将isShuffleMap设置为true

(默认的最后一个stage不是shuffleMap stage)

除了finalstage都是shuffleMap stage

              case shufDep: ShuffleDependency[_, _, _] =>

                val mapStage = getShuffleMapStage(shufDep, stage.jobId)


        if (missing == Nil) {

    //如果吗没有父stage则执行

          logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents")

          submitMissingTasks(stage, jobId.get)

        } else {

          //递归调用submit方法去去提交父stage

          for (parent <- missing) {

            submitStage(parent)

          }

          //并且将当前stage放入等待执行的stage队列中

          waitingStages += stage

        }


  /*

   * 提交stage的方法

   */

  private def submitStage(stage: Stage) {

    val jobId = activeJobForStage(stage)

    if (jobId.isDefined) {

      logDebug("submitStage(" + stage + ")")

      if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) {

        //调用getMissingParentStages方法,去获取当前stage的父stage

        val missing = getMissingParentStages(stage).sortBy(_.id)

        logDebug("missing: " + missing)

        if (missing == Nil) {

          logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents")

          submitMissingTasks(stage, jobId.get)

        } else {

          for (parent <- missing) {

            submitStage(parent)

          }

          waitingStages += stage

        }

      }

    } else {

      abortStage(stage, "No active job for stage " + stage.id)

    }

  }



  /*

   * 获取某个stage的父stage方法

   */

  private def getMissingParentStages(stage: Stage): List[Stage] = {

    val missing = new HashSet[Stage]

    val visited = new HashSet[RDD[_]]

    // We are manually maintaining a stack here to prevent StackOverflowError

    // caused by recursively visiting

    val waitingForVisit = new Stack[RDD[_]]

    def visit(rdd: RDD[_]) {

      if (!visited(rdd)) {

        visited += rdd

        if (getCacheLocs(rdd).contains(Nil)) {

          for (dep <- rdd.dependencies) {

//遍历RDD的父依赖

            dep match {

              case shufDep: ShuffleDependency[_, _, _] =>

                val mapStage = getShuffleMapStage(shufDep, stage.jobId)

                if (!mapStage.isAvailable) {

                  missing += mapStage

                }

              case narrowDep: NarrowDependency[_] =>

                waitingForVisit.push(narrowDep.rdd)

            }

          }

        }

      }

    }

//首先往栈中,推入了最后一个RDD

    waitingForVisit.push(stage.rdd)

//然后进行循环,调用自己内部定义的visit()方法

    while (!waitingForVisit.isEmpty) {

      visit(waitingForVisit.pop())

    }

    missing.toList

  }


以上是关于stage划分算法的主要内容,如果未能解决你的问题,请参考以下文章

[Spark传奇行动] 第34课:Stage划分和Task最佳位置算法源码彻底解密

第三十四课 Spark中任务处理的Stage划分和Task最佳位置算法

Spark源码剖析:stage划分原理与源码剖析

Spark作业调度中stage的划分

Spark作业调度中stage的划分

Spark(10)——Spark的Stage如何划分