大数据:Spark CoreDriver上的Task的生成分配调度

Posted raintungli

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了大数据:Spark CoreDriver上的Task的生成分配调度相关的知识,希望对你有一定的参考价值。

1. 什么是Task?

在前面的章节里描述过几个角色,Driver(Client),Master,Worker(Executor),Driver会提交Application到Master进行Worker上的Executor上的调度,显然这些都不是Task.

Spark上的几个关系可以这样理解:

  • Application: Application是Driver在构建SparkContent的上下文的时候创建的,就像申报员,现在要构建一个能完成任务的集群,需要申报的是这次需要多少个Executor(可以简单理解为虚拟的机器),每个Executor需要多少CPU,多少内存。
  • Job: 这是Driver在调用Action的时候生成的Job,让DAGScheduler线程进行最后的调度,代表着这时候RDD的状态分析完了,需要进行最后的Stage分析了,Job并不是提交给Executor运行的,一个Application存在多个Job
  • Task: 一个Job可以分成多个Task, 相当于这些Task分到了一个Group里,这个Group的ID就是Job ID

2. Task的类型

Task的类型和Stage相关,关于Stage,以及Stage之间的相关依赖构成任务的不同提交,就不在这篇描述了

ShuffleMapStage 转化成 ShuffleMapTask

ResultStage 转化成为 ResultTask

当Spark上的action算子,通过DAG进行提交任务的时候,会通过Stage来决定提交什么类型的任务,具体的实现都在DAGScheduler.scala 的submitMissingTasks方法中。

3. 同一个Stage的Task数量

Spark是一个分布式的执行任务的框架,那么同一个Stage的并行任务的拆分就非常的重要,在任务的分解中并不只是stage的步骤的分解,同时也是对同一个Stage中的要分析的数据分解,而对数据的分解直接决定对同一个Stage所提交的任务的数量。对Stage的任务拆解决定着任务的之间的关系,而对同一个Stage的分析数据进行拆解控制着任务的数量。

比如基于拆解的分析数据的而执行的算子象map,这些任务都是独立的,并没有对数据进行最后的归并和整理,这些task是完全可以进行并行计算的,对同一个Stage的task的数量在Spark上是可以控制的。

在这里以ParallelCollectionRDD为简单的例子,先看DAGScheduler.submitMissingTasks的方法

 private def submitMissingTasks(stage: Stage, jobId: Int) {
    logDebug("submitMissingTasks(" + stage + ")")
    // Get our pending tasks and remember them in our pendingTasks entry
    stage.pendingPartitions.clear()

    // First figure out the indexes of partition ids to compute.
    val partitionsToCompute: Seq[Int] = stage.findMissingPartitions()
    。。。。。。。。。。。
    val tasks: Seq[Task[_]] = try {
      stage match {
        case stage: ShuffleMapStage =>
          partitionsToCompute.map { id =>
            val locs = taskIdToLocations(id)
            val part = stage.rdd.partitions(id)
            new ShuffleMapTask(stage.id, stage.latestInfo.attemptId,
              taskBinary, part, locs, stage.latestInfo.taskMetrics, properties, Option(jobId),
              Option(sc.applicationId), sc.applicationAttemptId)
          }

        case stage: ResultStage =>
          partitionsToCompute.map { id =>
            val p: Int = stage.partitions(id)
            val part = stage.rdd.partitions(p)
            val locs = taskIdToLocations(id)
            new ResultTask(stage.id, stage.latestInfo.attemptId,
              taskBinary, part, locs, id, properties, stage.latestInfo.taskMetrics,
              Option(jobId), Option(sc.applicationId), sc.applicationAttemptId)
          }
      }
    } catch {
      case NonFatal(e) =>
        abortStage(stage, s"Task creation failed: $e\\n${Utils.exceptionString(e)}", Some(e))
        runningStages -= stage
        return
    }

生产task的数量是由val partitionsToCompute: Seq[Int] = stage.findMissingPartitions()来决定的,在ShuffleMapStage里

override def findMissingPartitions(): Seq[Int] = {
    val missing = (0 until numPartitions).filter(id => outputLocs(id).isEmpty)
    assert(missing.size == numPartitions - _numAvailableOutputs,
      s"${missing.size} missing, expected ${numPartitions - _numAvailableOutputs}")
    missing
  }

可以看到具体是由numPartitions来决定的,在来看numPartitions

val numPartitions = rdd.partitions.length
由rdd.partitions来决定的,对ShuffleMapStage来说rdd就是最后一个value类型的transformation 的RDD,比如常见的MapPartitionsRDD

在MapPartitionsRDD来说的partitions

  override def getPartitions: Array[Partition] = firstParent[T].partitions
是transformation的算子链中的第一个,我们以ParallelCollectionRDD为例子,比如常见的对应的例子:

sparkcontext.parallelize(exampleApacheLogs)
在ParallelCollectionRDD中

override def getPartitions: Array[Partition] = {
    val slices = ParallelCollectionRDD.slice(data, numSlices).toArray
    slices.indices.map(i => new ParallelCollectionPartition(id, i, slices(i))).toArray
  }
在ParallelCollectionRDD中数据的Partitions是由numSlices来决定的

  def parallelize[T: ClassTag](
      seq: Seq[T],
      numSlices: Int = defaultParallelism): RDD[T] = withScope {
    assertNotStopped()
    new ParallelCollectionRDD[T](this, seq, numSlices, Map[Int, Seq[String]]())
  }
numSlices 是可以在parallelize函数中传入,而默认使用defaultParallelism的参数控制

def defaultParallelism: Int = {
    assertNotStopped()
    taskScheduler.defaultParallelism
  }
override def defaultParallelism(): Int = backend.defaultParallelism()

在CoarseGrainedSchedulerBackend.scala 的类中:

  override def defaultParallelism(): Int = {
    conf.getInt("spark.default.parallelism", math.max(totalCoreCount.get(), 2))
  }

默认的值是受以下控制:

  1. 配置文件spark.default.parallelism
  2. totalCoreCount 的值: CoarseGrainedSchedulerBackend是一个executor管理的backend,里面维护着executor的信息,totalCoreCount就是executor汇报上来的核数,注意因为executor汇报自己是在application分配好后发生的,汇报的信息和获取totalCoreCount的线程是异步的,也就是如果executor没有汇报上来,totalCoreCount.get()的值并不准确(根绝Master对executor的分配策略,是无法保证分配多少个executor, 在这里spark更依赖executor主动的向driver汇报),这里的策略是无法保证准确的获取executor的核数。
  3. 如果没有设置spark.default.parallelism,最小值是2

依赖于rdd.partitions的策略,最后决定task的分配数量。

4. Task的提交和调度分配


以上是关于大数据:Spark CoreDriver上的Task的生成分配调度的主要内容,如果未能解决你的问题,请参考以下文章

如何优化大数据框上的 spark sql 操作?

Scala和Spark的大数据分析

大数据处理为何选择spark?

大数据Spark入门以及集群搭建

[Spark快速大数据分析]Spark基础

Spark 之 解决数据倾斜