第三十四课 Spark中任务处理的Stage划分和Task最佳位置算法

Posted sinat_25306771

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了第三十四课 Spark中任务处理的Stage划分和Task最佳位置算法相关的知识,希望对你有一定的参考价值。

本节课的内容

1.     Job Stage的划分算法

2.     Task最佳计算位置算法

 

 

一、Stage划分算法

       由于Spark的算子构建一般都是链式的,这就涉及了要如何进行这些链式计算,Spark的策略是对这些算子,鲜花分Stage,然后在进行计算。

       由于数据是分布式的存储在各个节点上的,所以为了减少网络传输的开销,就必须最大化的追求数据本地性,所谓的数据本地性是指,在计算时,数据本身已经在内存中或者利用已有缓存无需计算的方式获取数据。

1.      Stage划分算法思想

(1)一个Job由多个Stage构成

    一个Job可以有一个或者多个Stage,Stage划分的依据就是宽依赖,产生宽依赖的算子:reduceByKey、groupByKey等等

(2)根据依赖关系,从前往后依次执行多个Stage

       SparkApplication 中可以因为不同的Action触发众多的Job,也就是说一个Application中可以有很多的Job,每个Job是有一个或者多个Stage构成,后面的Stage依赖前面的Stage,也就是说只有前面的Stage计算完后,后面的Stage才会运行。

       (3)Stage的执行时Lazy级别的

       所有的Stage会形成一个DAG(有向无环图),由于RDD的Lazy特性,导致Stage也是Lazy级别的,只有遇到了Action才会真正发生作业的执行,在Action之前,Spark框架只是将要进行的计算记录下来,并没有真的执行。

       Action导致作业执行的代码如下:触发作业,发送消息。

/**
   * Return an array that contains all of the elements in this RDD.
   */
  def collect(): Array[T] = withScope {
    val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray)
    Array.concat(results: _*)
  }


 /**
   * Run a job on all partitions in an RDD and return the results in an array.
   */
  def runJob[T, U: ClassTag](rdd: RDD[T], func: Iterator[T] => U): Array[U] = {
    runJob(rdd, func, 0 until rdd.partitions.length)
  }

/**
   * Run a job on a given set of partitions of an RDD, but take a function of type
   * `Iterator[T] => U` instead of `(TaskContext, Iterator[T]) => U`.
   */
  def runJob[T, U: ClassTag](
      rdd: RDD[T],
      func: Iterator[T] => U,
      partitions: Seq[Int]): Array[U] = {
    val cleanedFunc = clean(func)
    runJob(rdd, (ctx: TaskContext, it: Iterator[T]) => cleanedFunc(it), partitions)
  }


/**
   * Run a function on a given set of partitions in an RDD and return the results as an array.
   */
  def runJob[T, U: ClassTag](
      rdd: RDD[T],
      func: (TaskContext, Iterator[T]) => U,
      partitions: Seq[Int]): Array[U] = {
    val results = new Array[U](partitions.size)
    runJob[T, U](rdd, func, partitions, (index, res) => results(index) = res)
    results
  }


/**
   * Run a function on a given set of partitions in an RDD and pass the results to the given
   * handler function. This is the main entry point for all actions in Spark.
   */
  def runJob[T, U: ClassTag](
      rdd: RDD[T],
      func: (TaskContext, Iterator[T]) => U,
      partitions: Seq[Int],
      resultHandler: (Int, U) => Unit): Unit = {
    if (stopped.get()) {
      throw new IllegalStateException("SparkContext has been shutdown")
    }
    val callSite = getCallSite
    val cleanedFunc = clean(func)
    logInfo("Starting job: " + callSite.shortForm)
    if (conf.getBoolean("spark.logLineage", false)) {
      logInfo("RDD's recursive dependencies:\\n" + rdd.toDebugString)
    }
    dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)
    progressBar.foreach(_.finishAll())
    rdd.doCheckpoint()
  }

def runJob[T, U](
      rdd: RDD[T],
      func: (TaskContext, Iterator[T]) => U,
      partitions: Seq[Int],
      callSite: CallSite,
      resultHandler: (Int, U) => Unit,
      properties: Properties): Unit = {
    val start = System.nanoTime
    val waiter = submitJob(rdd, func, partitions, callSite, resultHandler, properties)
    waiter.awaitResult() match {
      case JobSucceeded =>
        logInfo("Job %d finished: %s, took %f s".format
          (waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))
      case JobFailed(exception: Exception) =>
        logInfo("Job %d failed: %s, took %f s".format
          (waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))
        // SPARK-8644: Include user stack trace in exceptions coming from DAGScheduler.
        val callerStackTrace = Thread.currentThread().getStackTrace.tail
        exception.setStackTrace(exception.getStackTrace ++ callerStackTrace)
        throw exception
    }
  }
  def submitJob[T, U](
      rdd: RDD[T],
      func: (TaskContext, Iterator[T]) => U,
      partitions: Seq[Int],
      callSite: CallSite,
      resultHandler: (Int, U) => Unit,
      properties: Properties): JobWaiter[U] = {
    // Check to make sure we are not launching a task on a partition that does not exist.
    val maxPartitions = rdd.partitions.length
    partitions.find(p => p >= maxPartitions || p < 0).foreach { p =>
      throw new IllegalArgumentException(
        "Attempting to access a non-existent partition: " + p + ". " +
          "Total number of partitions: " + maxPartitions)
    }

    val jobId = nextJobId.getAndIncrement()
    if (partitions.size == 0) {
      // Return immediately if the job is running 0 tasks
      return new JobWaiter[U](this, jobId, 0, resultHandler)
    }

    assert(partitions.size > 0)
    val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _]
    val waiter = new JobWaiter(this, jobId, partitions.size, resultHandler)
    eventProcessLoop.post(JobSubmitted(
      jobId, rdd, func2, partitions.toArray, callSite, waiter,
      SerializationUtils.clone(properties)))
    waiter
  }

/** A result-yielding job was submitted on a target RDD */
private[scheduler] case class JobSubmitted(//这里面封装了哪些Partition要进行计算,
				//joblistener作业监听等等
    jobId: Int,
    finalRDD: RDD[_],
    func: (TaskContext, Iterator[_]) => _,
    partitions: Array[Int],
    callSite: CallSite,
    listener: JobListener,
    properties: Properties = null)
  extends DAGSchedulerEvent

例如 collect 会导致SparkContext中的runJob方法的执行,最终会导致DAGScheduler中的submit的执行。这其中最为核心的是通过发送一个名为JobSubmit的case class对象给eventProcessLoop。

以上是关于第三十四课 Spark中任务处理的Stage划分和Task最佳位置算法的主要内容,如果未能解决你的问题,请参考以下文章

python第三十四课——1.匿名函数的定义和使用

AGG第三十四课 stroke_aa和outline_aa渲染线段效率对比

5. spark-2.4.6源码分析(基于yarn cluster模式)- job任务提交Stage划分Stage提交

spark stage

Spark(10)——Spark的Stage如何划分

spark中job,stage,task的关系