Spark算子执行流程详解之七

Posted 亮亮-AC米兰

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark算子执行流程详解之七相关的知识,希望对你有一定的参考价值。

31.union

将2个rdd合并在一起。
def union(other: RDD[T]): RDD[T] = withScope 
  if (partitioner.isDefined && other.partitioner == partitioner) //两者的分区函数相同
      new PartitionerAwareUnionRDD(sc, Array(this, other))
   else //两者的分区函数不同
      new UnionRDD(sc, Array(this, other))
  
 
先来看当两者的分区函数相同时其是如何处理的: 
private[spark]
  class PartitionerAwareUnionRDD[T: ClassTag](
    sc: SparkContext,
    var rdds: Seq[RDD[T]]
  ) extends RDD[T](sc, rdds.map(x => new OneToOneDependency(x))) 
  require(rdds.length > 0)
  require(rdds.forall(_.partitioner.isDefined))
  require(rdds.flatMap(_.partitioner).toSet.size == 1,
    "Parent RDDs have different partitioners: " + rdds.flatMap(_.partitioner))
  
  override val partitioner = rdds.head.partitioner
  
  
//生成PartitionerAwareUnionRDDPartition,保存了组成某个分区索引为index的分区来源于rdds的哪几个分区
  override def getPartitions: Array[Partition] = 
    val numPartitions = partitioner.get.numPartitions
    (0 until numPartitions).map(index => 
      new PartitionerAwareUnionRDDPartition(rdds, index)
    ).toArray
  
  
  // Get the location where most of the partitions of parent RDDs are located
  override def getPreferredLocations(s: Partition): Seq[String] = 
    logDebug("Finding preferred location for " + this + ", partition " + s.index)
    val parentPartitions = s.asInstanceOf[PartitionerAwareUnionRDDPartition].parents
    val locations = rdds.zip(parentPartitions).flatMap 
      case (rdd, part) => 
        val parentLocations = currPrefLocs(rdd, part)
        logDebug("Location of " + rdd + " partition " + part.index + " = " + parentLocations)
        parentLocations
      
    
    val location = if (locations.isEmpty) 
      None
     else 
      // Find the location that maximum number of parent partitions prefer
      Some(locations.groupBy(x => x).maxBy(_._2.length)._1)
    
    logDebug("Selected location for " + this + ", partition " + s.index + " = " + location)
    location.toSeq
  
  
  override def compute(s: Partition, context: TaskContext): Iterator[T] = 
//parents即指向了该分区来源于的rdds组合的哪几个分区
    val parentPartitions = s.asInstanceOf[PartitionerAwareUnionRDDPartition].parents
//然后就是遍历原始rdds组合的某几个分区组成单个分区
    rdds.zip(parentPartitions).iterator.flatMap 
      case (rdd, p) => rdd.iterator(p, context)
    
  
  
  override def clearDependencies() 
    super.clearDependencies()
    rdds = null
  
  
  // Get the *current* preferred locations from the DAGScheduler (as opposed to the static ones)
  private def currPrefLocs(rdd: RDD[_], part: Partition): Seq[String] = 
    rdd.context.getPreferredLocs(rdd, part.index).map(tl => tl.host)
  
 
其分区信息为PartitionerAwareUnionRDDPartition: 
class PartitionerAwareUnionRDDPartition(
    @transient val rdds: Seq[RDD[_]],
    val idx: Int
  ) extends Partition 
// parents保存了对于分区索引idx来源于rdds的Partition信息,其实就是一一对应,比方说第1个分区来源于rdds组合中的每个rdd的第一个分区
  var parents = rdds.map(_.partitions(idx)).toArray
  
  override val index = idx
  override def hashCode(): Int = idx
  
  @throws(classOf[IOException])
  private def writeObject(oos: ObjectOutputStream): Unit = Utils.tryOrIOException 
    // Update the reference to parent partition at the time of task serialization
    parents = rdds.map(_.partitions(index)).toArray
    oos.defaultWriteObject()
  
 
其实当分区函数相同时,其结果的RDD的对应分区来源于原始两个RDD的对应分区,即:
再来看当两者的分区函数不相同时其是如何处理的: 
class UnionRDD[T: ClassTag](
    sc: SparkContext,
    var rdds: Seq[RDD[T]])
  extends RDD[T](sc, Nil)   // Nil since we implement getDependencies
  
  override def getPartitions: Array[Partition] = 
//计算rdds组合总共有几个分区
    val array = new Array[Partition](rdds.map(_.partitions.length).sum)
    var pos = 0
//总共有几个分区就生成几个分区,其每个分区各自对应rdds组合中的分区
    for ((rdd, rddIndex) <- rdds.zipWithIndex; split <- rdd.partitions) 
// pos:分区索引,rdd:该分区的父rdd,rddIndex:父rdd在rdds中的索引,split.index:该分区的Partition信息
      array(pos) = new UnionPartition(pos, rdd, rddIndex, split.index)
      pos += 1
    
    array
  
  
  override def getDependencies: Seq[Dependency[_]] = 
    val deps = new ArrayBuffer[Dependency[_]]
    var pos = 0
    for (rdd <- rdds) 
      deps += new RangeDependency(rdd, 0, pos, rdd.partitions.length)
      pos += rdd.partitions.length
    
    deps
  
  
  override def compute(s: Partition, context: TaskContext): Iterator[T] = 
    val part = s.asInstanceOf[UnionPartition[T]]
//rdds组合中的某个rdd所对应的分区数据
    parent[T](part.parentRddIndex).iterator(part.parentPartition, context)
  
  
  override def getPreferredLocations(s: Partition): Seq[String] =
    s.asInstanceOf[UnionPartition[T]].preferredLocations()
  
  override def clearDependencies() 
    super.clearDependencies()
    rdds = null
  
 
且看UnionPartition中parentPartition代表的意思: 
/**
 * Partition for UnionRDD.
 *
 * @param idx index of the partition
 * @param rdd the parent RDD this partition refers to
 * @param parentRddIndex index of the parent RDD this partition refers to
 * @param parentRddPartitionIndex index of the partition within the parent RDD
 *                                this partition refers to
 */
  private[spark] class UnionPartition[T: ClassTag](
    idx: Int,
    @transient rdd: RDD[T],
    val parentRddIndex: Int,
    @transient parentRddPartitionIndex: Int)
  extends Partition 
  // parentPartition来源于该分区对应父rdd的分区索引为parentRddPartitionIndex的Partition
  var parentPartition: Partition = rdd.partitions(parentRddPartitionIndex)
  
  def preferredLocations(): Seq[String] = rdd.preferredLocations(parentPartition)
  
  override val index: Int = idx
  
  @throws(classOf[IOException])
  private def writeObject(oos: ObjectOutputStream): Unit = Utils.tryOrIOException 
    // Update the reference to parent split at the time of task serialization
    parentPartition = rdd.partitions(parentRddPartitionIndex)
    oos.defaultWriteObject()
  
因此当两者的分区函数不相同时,其执行流程如下:

32. 两个加加++

其作用就是union
/**
 * Return the union of this RDD and another one. Any identical elements will appear multiple
 * times (use `.distinct()` to eliminate them).
 */
  def ++(other: RDD[T]): RDD[T] = withScope 
  this.union(other)
 

33.intersection

求2个RDD的交集,其中相同的值只输出一次。
/**
 * Return the intersection of this RDD and another one. The output will not contain any duplicate
 * elements, even if the input RDDs did.
 *
 * Note that this method performs a shuffle internally.
 */
  def intersection(other: RDD[T]): RDD[T] = withScope 
  this.map(v => (v, null)).cogroup(other.map(v => (v, null)))
      .filter  case (_, (leftGroup, rightGroup)) => leftGroup.nonEmpty && rightGroup.nonEmpty 
      .keys
 
其大致流程是先求交的两个rdd映射为KV对的pairRDD,其中V为null,然后生成CoGroupedRDD,接着对CoGroupedRDD的values进行转化为V为两个迭代器,紧接着进行筛选,保留左右两边rdd都存在的记录,最后返回其KEY值,即原始的左右两RDD的内容。
cogroup函数的内部实现如下:
/**
 * For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the
 * list of values for that key in `this` as well as `other`.
 */
  def cogroup[W](other: RDD[(K, W)]): RDD[(K, (Iterable[V], Iterable[W]))] = self.withScope 
//defaultPartitioner求默认的分区函数
  cogroup(other, defaultPartitioner(self, other))
 
/**
 * For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the
 * list of values for that key in `this` as well as `other`.
 */
  def cogroup[W](other: RDD[(K, W)], partitioner: Partitioner)
    : RDD[(K, (Iterable[V], Iterable[W]))] = self.withScope 
  if (partitioner.isInstanceOf[HashPartitioner] && keyClass.isArray) 
    throw new SparkException("Default partitioner cannot partition array keys.")
  
  val cg = new CoGroupedRDD[K](Seq(self, other), partitioner)
//将cg的values映射为两个数组的迭代器
  cg.mapValues  case Array(vs, w1s) =>
    (vs.asInstanceOf[Iterable[V]], w1s.asInstanceOf[Iterable[W]])
  
 
因此CoGroupedRDD的实现如下:
/**
 * :: DeveloperApi ::
 * A RDD that cogroups its parents. For each key k in parent RDDs, the resulting RDD contains a
 * tuple with the list of values for that key.
 *
 * Note: This is an internal API. We recommend users use RDD.cogroup(...) instead of
 * instantiating this directly.
  
 * @param rdds parent RDDs.
 * @param part partitioner used to partition the shuffle output
 */
  @DeveloperApi
  class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part: Partitioner)
  extends RDD[(K, Array[Iterable[_]])](rdds.head.context, Nil) 
  
  // For example, `(k, a) cogroup (k, b)` produces k -> Array(ArrayBuffer as, ArrayBuffer bs).
  // Each ArrayBuffer is represented as a CoGroup, and the resulting Array as a CoGroupCombiner.
  // CoGroupValue is the intermediate state of each value before being merged in compute.
  private type CoGroup = CompactBuffer[Any]
  private type CoGroupValue = (Any, Int)  // Int is dependency number
  private type CoGroupCombiner = Array[CoGroup]
  
  private var serializer: Option[Serializer] = None
  
  /** Set a serializer for this RDD's shuffle, or null to use the default (spark.serializer) */
  def setSerializer(serializer: Serializer): CoGroupedRDD[K] = 
    this.serializer = Option(serializer)
    this
  
  
  override def getDependencies: Seq[Dependency[_]] = 
    rdds.map  rdd: RDD[_ <: Product2[K, _]] =>

//如果rdd的分区函数和CoGroupedRDD的分区函数相同,则相互之间的依赖是窄依赖

if (rdd.partitioner == Some(part)) 
        logDebug("Adding one-to-one dependency with " + rdd)
        new OneToOneDependency(rdd)
       else 
//否则是宽依赖
        logDebug("Adding shuffle dependency with " + rdd)
        new ShuffleDependency[K, Any, CoGroupCombiner](rdd, part, serializer)
      
    
  
  
  /*
  * 获取其分区配置信息CoGroupPartition,其由CoGroupPartition(
    idx: Int, val narrowDeps: Array[Option[NarrowCoGroupSplitDep]])组成
其中idx代表对应分区索引,narrowDeps存储的是其依赖的数组
*/
  override def getPartitions: Array[Partition] = 
    val array = new Array[Partition](part.numPartitions)
    for (i <- 0 until array.length) 
      // Each CoGroupPartition will have a dependency per contributing RDD
      array(i) = new CoGroupPartition(i, rdds.zipWithIndex.map  case (rdd, j) =>
        // Assume each RDD contributed a single dependency, and get it
        dependencies(j) match 
//宽依赖直接返回None
          case s: ShuffleDependency[_, _, _] =>
            None
          case _ =>
//其他则为窄依赖
            Some(new NarrowCoGroupSplitDep(rdd, i, rdd.partitions(i)))
        
      .toArray)
    
    array
  
  
  override val partitioner: Some[Partitioner] = Some(part)
  
  
//在每个分区上根据传入的CoGroupPartition进行计算
  override def compute(s: Partition, context: TaskContext): Iterator[(K, Array[Iterable[_]])] = 
    val sparkConf = SparkEnv.get.conf
//此参数决定了其中间整理的过程是在内存中执行还是内存+磁盘中执行
    val externalSorting = sparkConf.getBoolean("spark.shuffle.spill", true)
    val split = s.asInstanceOf[CoGroupPartition]
    //代表有多少个rdd,每个rdd根据分区函数对应其依赖
    val numRdds = dependencies.length
  
    // A list of (rdd iterator, dependency number) pairs
// rddIterators个KV的迭代器,其K为Product2的迭代器,其V是其索引
    val rddIterators = new ArrayBuffer[(Iterator[Product2[K, Any]], Int)]
    for ((dep, depNum) <- dependencies.zipWithIndex) dep match 
//如果是窄依赖,则直接拉取父RDD对应分区的数值
      case oneToOneDependency: OneToOneDependency[Product2[K, Any]] =>
        val dependencyPartition = split.narrowDeps(depNum).get.split
        // Read them from the parent
        val it = oneToOneDependency.rdd.iterator(dependencyPartition, context)
        rddIterators += ((it, depNum))
     //如果是宽依赖,则从shuffle的中间结果拉取对应分区的数值
      case shuffleDependency: ShuffleDependency[_, _, _] =>
        // Read map outputs of shuffle
        val it = SparkEnv.get.shuffleManager
          .getReader(shuffleDependency.shuffleHandle, split.index, split.index + 1, context)
          .read()
        rddIterators += ((it, depNum))
    
    /*
    * rddIterators = new ArrayBuffer[(Iterator[Product2[K, Any]], Int)]
    * [Iterator[Product2[K, Any]],0,Iterator[Product2[K, Any]],1]
    * */
  
    if (!externalSorting) //在内存中整理中间结果
        val map = new AppendOnlyMap[K, CoGroupCombiner]//CoGroupCombiner为Buffer数组,相同的K只会保留1个
        val update: (Boolean, CoGroupCombiner) => CoGroupCombiner = (hadVal, oldVal) => 
        if (hadVal) oldVal else Array.fill(numRdds)(new CoGroup)
      
      val getCombiner: K => CoGroupCombiner = key => 
        map.changeValue(key, update)
      
//遍历迭代器数组,将相同的KEY的V存放在CoGroupCombiner里面
      rddIterators.foreach  case (it, depNum) =>
        while (it.hasNext) 
          val kv = it.next()
          getCombiner(kv._1)(depNum) += kv._2
        
      
      new InterruptibleIterator(context,
        map.iterator.asInstanceOf[Iterator[(K, Array[Iterable[_]])]])
     else //在内存+磁盘中整理中间结果
        val map = createExternalMap(numRdds)
//插入到ExternalAppendOnlyMap里面
      for ((it, depNum) <- rddIterators) 
        map.insertAll(it.map(pair => (pair._1, new CoGroupValue(pair._2, depNum))))
      
      context.taskMetrics.incMemoryBytesSpilled(map.memoryBytesSpilled)
      context.taskMetrics.incDiskBytesSpilled(map.diskBytesSpilled)
      new InterruptibleIterator(context,
        map.iterator.asInstanceOf[Iterator[(K, Array[Iterable[_]])]])
    
  
    ……
 
因此假设两个RDD执行cogroup,其中一个rdd的分区函数为hash分区,分区个数为3,另外一个rdd没有分区函数,则其执行流程如下:

34.glom

glom函数将每个分区形成一个数组,得到一个新的GlommedRDD。
/**
 * Return an RDD created by coalescing all elements within each partition into an array.
 */
  def glom(): RDD[Array[T]] = withScope 
//通过Iterator(iter.toArray)将其转化为数组
  new MapPartitionsRDD[Array[T], T](this, (context, pid, iter) => Iterator(iter.toArray))
其执行过程如下:

35.cartesian

这个操作返回两个RDD的笛卡尔集,这个操作不会执行shuffle
/**
 * Return the Cartesian product of this RDD and another one, that is, the RDD of all pairs of
 * elements (a, b) where a is in `this` and b is in `other`.
 */
  def cartesian[U: ClassTag](other: RDD[U]): RDD[(T, U)] = withScope 
  new CartesianRDD(sc, this, other)
其主要由CartesianRDD实现,继续往下看: 
private[spark]
  class CartesianRDD[T: ClassTag, U: ClassTag](
    sc: SparkContext,
    var rdd1 : RDD[T],
    var rdd2 : RDD[U])
  extends RDD[Pair[T, U]](sc, Nil)
  with Serializable 
  
  val numPartitionsInRdd2 = rdd2.partitions.length
  
  
//分区个数为两个rdd的分区数目之积
  override def getPartitions: Array[Partition] = 
    // create the cross product split
    val array = new Array[Partition](rdd1.partitions.length * rdd2.partitions.length)
    for (s1 <- rdd1.partitions; s2 <- rdd2.partitions) 
      val idx = s1.index * numPartitionsInRdd2 + s2.index
//分区索引idx的数据来源于rdd1的index分区和rdd2的index分区
      array(idx) = new CartesianPartition(idx, rdd1, rdd2, s1.index, s2.index)
    
    array
  
  
  override def getPreferredLocations(split: Partition): Seq[String] = 
    val currSplit = split.asInstanceOf[CartesianPartition]
    (rdd1.preferredLocations(currSplit.s1) ++ rdd2.preferredLocations(currSplit.s2)).distinct
  
  //通过遍历rdd1的s1分区和rdd2的s2分区组装成当前CartesianPartition的分区数据
  override def compute(split: Partition, context: TaskContext): Iterator[(T, U)] = 
    val currSplit = split.asInstanceOf[CartesianPartition]
    for (x <- rdd1.iterator(currSplit.s1, context);
         y <- rdd2.iterator(currSplit.s2, context)) yield (x, y)
  
  
  
//返回的都是窄依赖
  override def getDependencies: Seq[Dependency[_]] = List(
    new NarrowDependency(rdd1) 
      def getParents(id: Int): Seq[Int] = List(id / numPartitionsInRdd2)
    ,
    new NarrowDependency(rdd2) 
      def getParents(id: Int): Seq[Int] = List(id % numPartitionsInRdd2)
    
  )
  
  override def clearDependencies() 
    super.clearDependencies()
    rdd1 = null
    rdd2 = null
  
 
其具体的执行过程如下:

以上是关于Spark算子执行流程详解之七的主要内容,如果未能解决你的问题,请参考以下文章

Spark算子执行流程详解之八

Spark算子执行流程详解之八

Spark算子执行流程详解之一

Spark算子执行流程详解之五

Spark算子执行流程详解之四

Spark算子执行流程详解之二