Spark算子执行流程详解之七
Posted 亮亮-AC米兰
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark算子执行流程详解之七相关的知识,希望对你有一定的参考价值。
31.union
将2个rdd合并在一起。
def union(other: RDD[T]): RDD[T] = withScope if (partitioner.isDefined && other.partitioner == partitioner) //两者的分区函数相同 new PartitionerAwareUnionRDD(sc, Array(this, other)) else //两者的分区函数不同 new UnionRDD(sc, Array(this, other)) |
先来看当两者的分区函数相同时其是如何处理的:
private[spark] class PartitionerAwareUnionRDD[T: ClassTag]( sc: SparkContext, var rdds: Seq[RDD[T]] ) extends RDD[T](sc, rdds.map(x => new OneToOneDependency(x))) require(rdds.length > 0) require(rdds.forall(_.partitioner.isDefined)) require(rdds.flatMap(_.partitioner).toSet.size == 1, "Parent RDDs have different partitioners: " + rdds.flatMap(_.partitioner)) override val partitioner = rdds.head.partitioner //生成PartitionerAwareUnionRDDPartition,保存了组成某个分区索引为index的分区来源于rdds的哪几个分区 override def getPartitions: Array[Partition] = val numPartitions = partitioner.get.numPartitions (0 until numPartitions).map(index => new PartitionerAwareUnionRDDPartition(rdds, index) ).toArray // Get the location where most of the partitions of parent RDDs are located override def getPreferredLocations(s: Partition): Seq[String] = logDebug("Finding preferred location for " + this + ", partition " + s.index) val parentPartitions = s.asInstanceOf[PartitionerAwareUnionRDDPartition].parents val locations = rdds.zip(parentPartitions).flatMap case (rdd, part) => val parentLocations = currPrefLocs(rdd, part) logDebug("Location of " + rdd + " partition " + part.index + " = " + parentLocations) parentLocations val location = if (locations.isEmpty) None else // Find the location that maximum number of parent partitions prefer Some(locations.groupBy(x => x).maxBy(_._2.length)._1) logDebug("Selected location for " + this + ", partition " + s.index + " = " + location) location.toSeq override def compute(s: Partition, context: TaskContext): Iterator[T] = //parents即指向了该分区来源于的rdds组合的哪几个分区 val parentPartitions = s.asInstanceOf[PartitionerAwareUnionRDDPartition].parents //然后就是遍历原始rdds组合的某几个分区组成单个分区 rdds.zip(parentPartitions).iterator.flatMap case (rdd, p) => rdd.iterator(p, context) override def clearDependencies() super.clearDependencies() rdds = null // Get the *current* preferred locations from the DAGScheduler (as opposed to the static ones) private def currPrefLocs(rdd: RDD[_], part: Partition): Seq[String] = rdd.context.getPreferredLocs(rdd, part.index).map(tl => tl.host) |
其分区信息为PartitionerAwareUnionRDDPartition:
class PartitionerAwareUnionRDDPartition( @transient val rdds: Seq[RDD[_]], val idx: Int ) extends Partition // parents保存了对于分区索引idx来源于rdds的Partition信息,其实就是一一对应,比方说第1个分区来源于rdds组合中的每个rdd的第一个分区 var parents = rdds.map(_.partitions(idx)).toArray override val index = idx override def hashCode(): Int = idx @throws(classOf[IOException]) private def writeObject(oos: ObjectOutputStream): Unit = Utils.tryOrIOException // Update the reference to parent partition at the time of task serialization parents = rdds.map(_.partitions(index)).toArray oos.defaultWriteObject() |
其实当分区函数相同时,其结果的RDD的对应分区来源于原始两个RDD的对应分区,即:
再来看当两者的分区函数不相同时其是如何处理的:
class UnionRDD[T: ClassTag]( sc: SparkContext, var rdds: Seq[RDD[T]]) extends RDD[T](sc, Nil) // Nil since we implement getDependencies override def getPartitions: Array[Partition] = //计算rdds组合总共有几个分区 val array = new Array[Partition](rdds.map(_.partitions.length).sum) var pos = 0 //总共有几个分区就生成几个分区,其每个分区各自对应rdds组合中的分区 for ((rdd, rddIndex) <- rdds.zipWithIndex; split <- rdd.partitions) // pos:分区索引,rdd:该分区的父rdd,rddIndex:父rdd在rdds中的索引,split.index:该分区的Partition信息 array(pos) = new UnionPartition(pos, rdd, rddIndex, split.index) pos += 1 array override def getDependencies: Seq[Dependency[_]] = val deps = new ArrayBuffer[Dependency[_]] var pos = 0 for (rdd <- rdds) deps += new RangeDependency(rdd, 0, pos, rdd.partitions.length) pos += rdd.partitions.length deps override def compute(s: Partition, context: TaskContext): Iterator[T] = val part = s.asInstanceOf[UnionPartition[T]] //rdds组合中的某个rdd所对应的分区数据 parent[T](part.parentRddIndex).iterator(part.parentPartition, context) override def getPreferredLocations(s: Partition): Seq[String] = s.asInstanceOf[UnionPartition[T]].preferredLocations() override def clearDependencies() super.clearDependencies() rdds = null |
且看UnionPartition中parentPartition代表的意思:
/** * Partition for UnionRDD. * * @param idx index of the partition * @param rdd the parent RDD this partition refers to * @param parentRddIndex index of the parent RDD this partition refers to * @param parentRddPartitionIndex index of the partition within the parent RDD * this partition refers to */ private[spark] class UnionPartition[T: ClassTag]( idx: Int, @transient rdd: RDD[T], val parentRddIndex: Int, @transient parentRddPartitionIndex: Int) extends Partition // parentPartition来源于该分区对应父rdd的分区索引为parentRddPartitionIndex的Partition var parentPartition: Partition = rdd.partitions(parentRddPartitionIndex) def preferredLocations(): Seq[String] = rdd.preferredLocations(parentPartition) override val index: Int = idx @throws(classOf[IOException]) private def writeObject(oos: ObjectOutputStream): Unit = Utils.tryOrIOException // Update the reference to parent split at the time of task serialization parentPartition = rdd.partitions(parentRddPartitionIndex) oos.defaultWriteObject() |
因此当两者的分区函数不相同时,其执行流程如下:
32. 两个加加++
其作用就是union
/** * Return the union of this RDD and another one. Any identical elements will appear multiple * times (use `.distinct()` to eliminate them). */ def ++(other: RDD[T]): RDD[T] = withScope this.union(other) |
33.intersection
求2个RDD的交集,其中相同的值只输出一次。
/** * Return the intersection of this RDD and another one. The output will not contain any duplicate * elements, even if the input RDDs did. * * Note that this method performs a shuffle internally. */ def intersection(other: RDD[T]): RDD[T] = withScope this.map(v => (v, null)).cogroup(other.map(v => (v, null))) .filter case (_, (leftGroup, rightGroup)) => leftGroup.nonEmpty && rightGroup.nonEmpty .keys |
其大致流程是先求交的两个rdd映射为KV对的pairRDD,其中V为null,然后生成CoGroupedRDD,接着对CoGroupedRDD的values进行转化为V为两个迭代器,紧接着进行筛选,保留左右两边rdd都存在的记录,最后返回其KEY值,即原始的左右两RDD的内容。
cogroup函数的内部实现如下:
/** * For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the * list of values for that key in `this` as well as `other`. */ def cogroup[W](other: RDD[(K, W)]): RDD[(K, (Iterable[V], Iterable[W]))] = self.withScope //defaultPartitioner求默认的分区函数 cogroup(other, defaultPartitioner(self, other)) /** * For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the * list of values for that key in `this` as well as `other`. */ def cogroup[W](other: RDD[(K, W)], partitioner: Partitioner) : RDD[(K, (Iterable[V], Iterable[W]))] = self.withScope if (partitioner.isInstanceOf[HashPartitioner] && keyClass.isArray) throw new SparkException("Default partitioner cannot partition array keys.") val cg = new CoGroupedRDD[K](Seq(self, other), partitioner) //将cg的values映射为两个数组的迭代器 cg.mapValues case Array(vs, w1s) => (vs.asInstanceOf[Iterable[V]], w1s.asInstanceOf[Iterable[W]]) |
因此CoGroupedRDD的实现如下:
/** * :: DeveloperApi :: * A RDD that cogroups its parents. For each key k in parent RDDs, the resulting RDD contains a * tuple with the list of values for that key. * * Note: This is an internal API. We recommend users use RDD.cogroup(...) instead of * instantiating this directly. * @param rdds parent RDDs. * @param part partitioner used to partition the shuffle output */ @DeveloperApi class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part: Partitioner) extends RDD[(K, Array[Iterable[_]])](rdds.head.context, Nil) // For example, `(k, a) cogroup (k, b)` produces k -> Array(ArrayBuffer as, ArrayBuffer bs). // Each ArrayBuffer is represented as a CoGroup, and the resulting Array as a CoGroupCombiner. // CoGroupValue is the intermediate state of each value before being merged in compute. private type CoGroup = CompactBuffer[Any] private type CoGroupValue = (Any, Int) // Int is dependency number private type CoGroupCombiner = Array[CoGroup] private var serializer: Option[Serializer] = None /** Set a serializer for this RDD's shuffle, or null to use the default (spark.serializer) */ def setSerializer(serializer: Serializer): CoGroupedRDD[K] = this.serializer = Option(serializer) this override def getDependencies: Seq[Dependency[_]] = rdds.map rdd: RDD[_ <: Product2[K, _]] => //如果rdd的分区函数和CoGroupedRDD的分区函数相同,则相互之间的依赖是窄依赖 if (rdd.partitioner == Some(part)) logDebug("Adding one-to-one dependency with " + rdd) new OneToOneDependency(rdd) else //否则是宽依赖 logDebug("Adding shuffle dependency with " + rdd) new ShuffleDependency[K, Any, CoGroupCombiner](rdd, part, serializer) /* * 获取其分区配置信息CoGroupPartition,其由CoGroupPartition( idx: Int, val narrowDeps: Array[Option[NarrowCoGroupSplitDep]])组成 其中idx代表对应分区索引,narrowDeps存储的是其依赖的数组 */ override def getPartitions: Array[Partition] = val array = new Array[Partition](part.numPartitions) for (i <- 0 until array.length) // Each CoGroupPartition will have a dependency per contributing RDD array(i) = new CoGroupPartition(i, rdds.zipWithIndex.map case (rdd, j) => // Assume each RDD contributed a single dependency, and get it dependencies(j) match //宽依赖直接返回None case s: ShuffleDependency[_, _, _] => None case _ => //其他则为窄依赖 Some(new NarrowCoGroupSplitDep(rdd, i, rdd.partitions(i))) .toArray) array override val partitioner: Some[Partitioner] = Some(part) //在每个分区上根据传入的CoGroupPartition进行计算 override def compute(s: Partition, context: TaskContext): Iterator[(K, Array[Iterable[_]])] = val sparkConf = SparkEnv.get.conf //此参数决定了其中间整理的过程是在内存中执行还是内存+磁盘中执行 val externalSorting = sparkConf.getBoolean("spark.shuffle.spill", true) val split = s.asInstanceOf[CoGroupPartition] //代表有多少个rdd,每个rdd根据分区函数对应其依赖 val numRdds = dependencies.length // A list of (rdd iterator, dependency number) pairs // rddIterators是个KV的迭代器,其K为Product2的迭代器,其V是其索引 val rddIterators = new ArrayBuffer[(Iterator[Product2[K, Any]], Int)] for ((dep, depNum) <- dependencies.zipWithIndex) dep match //如果是窄依赖,则直接拉取父RDD对应分区的数值 case oneToOneDependency: OneToOneDependency[Product2[K, Any]] => val dependencyPartition = split.narrowDeps(depNum).get.split // Read them from the parent val it = oneToOneDependency.rdd.iterator(dependencyPartition, context) rddIterators += ((it, depNum)) //如果是宽依赖,则从shuffle的中间结果拉取对应分区的数值 case shuffleDependency: ShuffleDependency[_, _, _] => // Read map outputs of shuffle val it = SparkEnv.get.shuffleManager .getReader(shuffleDependency.shuffleHandle, split.index, split.index + 1, context) .read() rddIterators += ((it, depNum)) /* * rddIterators = new ArrayBuffer[(Iterator[Product2[K, Any]], Int)] * [Iterator[Product2[K, Any]],0,Iterator[Product2[K, Any]],1] * */ if (!externalSorting) //在内存中整理中间结果 val map = new AppendOnlyMap[K, CoGroupCombiner]//CoGroupCombiner为Buffer数组,相同的K只会保留1个 val update: (Boolean, CoGroupCombiner) => CoGroupCombiner = (hadVal, oldVal) => if (hadVal) oldVal else Array.fill(numRdds)(new CoGroup) val getCombiner: K => CoGroupCombiner = key => map.changeValue(key, update) //遍历迭代器数组,将相同的KEY的V存放在CoGroupCombiner里面 rddIterators.foreach case (it, depNum) => while (it.hasNext) val kv = it.next() getCombiner(kv._1)(depNum) += kv._2 new InterruptibleIterator(context, map.iterator.asInstanceOf[Iterator[(K, Array[Iterable[_]])]]) else //在内存+磁盘中整理中间结果 val map = createExternalMap(numRdds) //插入到ExternalAppendOnlyMap里面 for ((it, depNum) <- rddIterators) map.insertAll(it.map(pair => (pair._1, new CoGroupValue(pair._2, depNum)))) context.taskMetrics.incMemoryBytesSpilled(map.memoryBytesSpilled) context.taskMetrics.incDiskBytesSpilled(map.diskBytesSpilled) new InterruptibleIterator(context, map.iterator.asInstanceOf[Iterator[(K, Array[Iterable[_]])]]) …… |
因此假设两个RDD执行cogroup,其中一个rdd的分区函数为hash分区,分区个数为3,另外一个rdd没有分区函数,则其执行流程如下:
34.glom
glom函数将每个分区形成一个数组,得到一个新的GlommedRDD。
/** * Return an RDD created by coalescing all elements within each partition into an array. */ def glom(): RDD[Array[T]] = withScope //通过Iterator(iter.toArray)将其转化为数组 new MapPartitionsRDD[Array[T], T](this, (context, pid, iter) => Iterator(iter.toArray)) |
其执行过程如下:
35.cartesian
这个操作返回两个RDD的笛卡尔集,这个操作不会执行shuffle
/** * Return the Cartesian product of this RDD and another one, that is, the RDD of all pairs of * elements (a, b) where a is in `this` and b is in `other`. */ def cartesian[U: ClassTag](other: RDD[U]): RDD[(T, U)] = withScope new CartesianRDD(sc, this, other) |
其主要由CartesianRDD实现,继续往下看:
private[spark] class CartesianRDD[T: ClassTag, U: ClassTag]( sc: SparkContext, var rdd1 : RDD[T], var rdd2 : RDD[U]) extends RDD[Pair[T, U]](sc, Nil) with Serializable val numPartitionsInRdd2 = rdd2.partitions.length //分区个数为两个rdd的分区数目之积 override def getPartitions: Array[Partition] = // create the cross product split val array = new Array[Partition](rdd1.partitions.length * rdd2.partitions.length) for (s1 <- rdd1.partitions; s2 <- rdd2.partitions) val idx = s1.index * numPartitionsInRdd2 + s2.index //分区索引idx的数据来源于rdd1的index分区和rdd2的index分区 array(idx) = new CartesianPartition(idx, rdd1, rdd2, s1.index, s2.index) array override def getPreferredLocations(split: Partition): Seq[String] = val currSplit = split.asInstanceOf[CartesianPartition] (rdd1.preferredLocations(currSplit.s1) ++ rdd2.preferredLocations(currSplit.s2)).distinct //通过遍历rdd1的s1分区和rdd2的s2分区组装成当前CartesianPartition的分区数据 override def compute(split: Partition, context: TaskContext): Iterator[(T, U)] = val currSplit = split.asInstanceOf[CartesianPartition] for (x <- rdd1.iterator(currSplit.s1, context); y <- rdd2.iterator(currSplit.s2, context)) yield (x, y) //返回的都是窄依赖 override def getDependencies: Seq[Dependency[_]] = List( new NarrowDependency(rdd1) def getParents(id: Int): Seq[Int] = List(id / numPartitionsInRdd2) , new NarrowDependency(rdd2) def getParents(id: Int): Seq[Int] = List(id % numPartitionsInRdd2) ) override def clearDependencies() super.clearDependencies() rdd1 = null rdd2 = null |
其具体的执行过程如下:
以上是关于Spark算子执行流程详解之七的主要内容,如果未能解决你的问题,请参考以下文章