Spark算子执行流程详解之六

Posted 亮亮-AC米兰

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark算子执行流程详解之六相关的知识,希望对你有一定的参考价值。

26.coalesce

coalesce顾名思义为合并,就是把多个分区的RDD合并成少量分区的RDD,这样可以减少任务调度的时间,但是请记住:合并之后不能保证结果RDD中的每个分区的记录数量是均衡的,因为合并的时候并没有考虑合并前每个分区的记录数,合并只会减少RDD的分区个数,因此并不能利用它来解决数据倾斜的问题。

def coalesce(numPartitions: Int, shuffle: Boolean =false)(implicitord:Ordering[T] =null)
    : RDD[T] = withScope
  if (shuffle)
    /** Distributes elements evenly across output partitions, starting from a random partition. */
   
val distributePartition = (index: Int, items:Iterator[T]) =>
      var position = (newRandom(index)).nextInt(numPartitions)//针对不同的分区索引初始化一个随机数

//将原来的记录映射为(K,记录)对,其中K为随机数的不断叠加
      items.map t =>
        // Note that the hash code of the key will just be the key itself. The HashPartitioner
        // will mod it with the number of total partitions.
       
position = position + 1
       
(position, t)
     
    : Iterator[(Int, T)]
    // include a shuffle step so that our upstream tasks are still distributed

//针对(k,记录)进行一次Hash分区
   
new CoalescedRDD(
      new ShuffledRDD[Int, T, T](mapPartitionsWithIndex(distributePartition),
      new HashPartitioner(numPartitions)),
      numPartitions).values//由于是KV对,最后再取其V即可
  else
    new CoalescedRDD(this, numPartitions)
 

先看其shuffle参数,如果为true的话,则先生成一个ShuffleRDD,然后在这基础上产生CoalescedRDD,如果为false的话,则直接生成CoalescedRDD。因此先看下其ShuffleRDD的生成过程:


以上是将3个分区合并成2个分区,当shuffle为true的时候,其CoalescedRDD父RDD即ShuffledRDD的生成过程,如果shuffle为false的时候,则直接利用其本身取生成CoalescedRDD。

       再来看CoalescedRDD的计算过程:

private[spark] classCoalescedRDD[T: ClassTag](
    @transient varprev: RDD[T],
    maxPartitions: Int,
    balanceSlack: Double = 0.10)
  extends RDD[T](prev.context,Nil)  // Nil since we implement getDependencies
 
override def getPartitions: Array[Partition] =
    val pc = newPartitionCoalescer(maxPartitions, prev, balanceSlack)
    pc.run().zipWithIndex.map
      case (pg, i) =>
        val ids = pg.arr.map(_.index).toArray
        new CoalescedRDDPartition(i, prev, ids, pg.prefLoc)
    
 
  override def compute(partition: Partition, context: TaskContext):Iterator[T] =
    partition.asInstanceOf[CoalescedRDDPartition].parents.iterator.flatMap parentPartition =>
      firstParent[T].iterator(parentPartition, context)
   
 
  ……

/**
 * Class that captures a coalesced RDD by essentially keeping track of parent partitions
 *
@param index of this coalesced partition
 *
@param rdd which it belongs to

* parentsIndices它代表了当前CoalescedRDD对应分区索引的分区是由父RDD的哪几个分区组成的
 *
@param parentsIndices list of indices in the parent that have been coalesced into this partition

* @param preferredLocation the preferred location for this partition
 */
private[spark] case class CoalescedRDDPartition(
    index: Int,
    @transient rdd: RDD[_],
    parentsIndices: Array[Int],
    @transient preferredLocation: Option[String] = None)extendsPartition
  var parents:Seq[Partition] =parentsIndices.map(rdd.partitions(_))

  @throws(classOf[IOException])
  private def writeObject(oos: ObjectOutputStream): Unit = Utils.tryOrIOException
    // Update the reference to parent partition at the time of task serialization
   
parents
= parentsIndices.map(rdd.partitions(_))
    oos.defaultWriteObject()
 

  /**
   * Computes the fraction of the parents' partitions containing preferredLocation within
   * their getPreferredLocs.
   *
@return locality of this coalesced partition between 0 and 1
   */
 
def localFraction: Double =
    val loc = parents.count p =>
      val parentPreferredLocations = rdd.context.getPreferredLocs(rdd, p.index).map(_.host)
      preferredLocation.exists(parentPreferredLocations.contains)
   
    if (parents.size ==0)0.0 else(loc.toDouble /parents.size.toDouble)
 

CoalescedRDD的分区结果由CoalescedRDDPartition决定,其中parentsIndices参数代表了CoalescedRDD的某个分区索引的分区来源于其父RDD的哪几个分区,然后就是利用flatMap把父RDD的多个分区串联起来。因此主要关注CoalescedRDD是如何生成CoalescedRDDPartition的,即

override def getPartitions: Array[Partition] =
  val pc = newPartitionCoalescer(maxPartitions, prev, balanceSlack)

  pc.run().zipWithIndex.map
    case (pg, i) =>
      val ids = pg.arr.map(_.index).toArray
      new CoalescedRDDPartition(i, prev, ids, pg.prefLoc)
 

通过PartitionCoalescer来计算生成CoalescedRDDPartition:

/**
 * Runs the packing algorithm and returns an array of PartitionGroups that if possible are
 * load balanced and grouped by locality
 *
@return array of partition groups
 */
def run(): Array[PartitionGroup] =

//设置一个个group
  setupGroups(math.min(prev.partitions.length, maxPartitions)) // setup the groups (bins)
 
 //然后把父rdd的还没有分配的partition放置到一个个group

throwBalls() // assign partitions (balls) to each group (bins)
 
getPartitions

首先生成一个个PartitionGroup,里面的arr保存了父rdd的分区索引,然后把其他父rdd没有分配的分区投放至PartitionGroup里面。先看setupGroups的过程,它首先生成targetLen个PartitionGroup,里面包含了初始默认的父rdd的分区索引,其流程如下:

/**
 * Initializes targetLen partition groups and assigns a preferredLocation
 * This uses coupon collector to estimate how many preferredLocations it must rotate through
 * until it has seen most of the preferred locations (2 * n log(n))
 * @param targetLen
 */
  def setupGroups(targetLen: Int) 
  val rotIt = new LocationIterator(prev)
  // deal with empty case, just create targetLen partition groups with no preferred location
//如果父RDD的分区没有本地性,则直接生成targetLenPartitionGroup返回
  if (!rotIt.hasNext) 
    (1 to targetLen).foreach(x => groupArr += PartitionGroup())
    return
  
  noLocality = false
  // number of iterations needed to be certain that we've seen most preferred locations
  val expectedCoupons2 = 2 * (math.log(targetLen)*targetLen + targetLen + 0.5).toInt
  var numCreated = 0
  var tries = 0
  // rotate through until either targetLen unique/distinct preferred locations have been created
  // OR we've rotated expectedCoupons2, in which case we have likely seen all preferred locations,
  // i.e. likely targetLen >> number of preferred locations (more buckets than there are machines)
//优先针对每台主机建立其对应的PartitionGroup,目的是为了让之后的计算更加分散
  while (numCreated < targetLen && tries < expectedCoupons2) 
    tries += 1
    // rotIt.next()的返回值为(String, Partition),其中nxt_replica为主机名 nxt_part为分区索引
    val (nxt_replica, nxt_part) = rotIt.next()
    if (!groupHash.contains(nxt_replica)) 
      val pgroup = PartitionGroup(nxt_replica)
      groupArr += pgroup
      addPartToPGroup(nxt_part, pgroup)//将其分区索引添加进此PartitionGroup
        groupHash.put(nxt_replica, ArrayBuffer(pgroup)) // list in case we have multiple
      numCreated += 1
    
  
//如果还没有足够多的PartitionGroup,实在不行则针对同一个主机名可以创建多个PartitionGroup
  while (numCreated < targetLen)   // if we don't have enough partition groups, create duplicates
    //(String, Partition) 主机名 分区索引
    var (nxt_replica, nxt_part) = rotIt.next()
    val pgroup = PartitionGroup(nxt_replica)
    groupArr += pgroup
    //  val groupHash = mutable.Map[String, ArrayBuffer[PartitionGroup]]() (主机名,PartitionGroup(主机名)),同一个主机名可能存在多个PartitionGroup
    groupHash.getOrElseUpdate(nxt_replica, ArrayBuffer()) += pgroup
    var tries = 0
//将其分区索引添加进此PartitionGroup
    while (!addPartToPGroup(nxt_part, pgroup) && tries < targetLen)  // ensure at least one part
      nxt_part = rotIt.next()._2
      tries += 1
    
    numCreated += 1
  
 

然后将剩余没有分配的父rdd的分区分配至对应的PartitionGroup

def throwBalls() 
  if (noLocality)   // no preferredLocations in parent RDD, no randomization needed 没有本地性,少分区合并成多分区,无法合并,保持原样
      if (maxPartitions > groupArr.size)  // just return prev.partitions
      for ((p, i) <- prev.partitions.zipWithIndex) 
        groupArr(i).arr += p
      
     else  // no locality available, then simply split partitions based on positions in array
      for (i <- 0 until maxPartitions) //否则无本地性要求的情况下,简单的按区间进行合并
          val rangeStart = ((i.toLong * prev.partitions.length) / maxPartitions).toInt
        val rangeEnd = (((i.toLong + 1) * prev.partitions.length) / maxPartitions).toInt
        (rangeStart until rangeEnd).foreach j => groupArr(i).arr += prev.partitions(j) 
      
    
   else //遍历父rdd的分区,且之前没有被分配过,则进行分配
    for (p <- prev.partitions if (!initialHash.contains(p)))  // throw every partition into group
      //选择某个PartitionGroup,然后添加至arr
pickBin(p).arr += p
    
  

那么pickBin是如何计算的呢?且看:

/**
 * Takes a parent RDD partition and decides which of the partition groups to put it in
 * Takes locality into account, but also uses power of 2 choices to load balance
 * It strikes a balance between the two use the balanceSlack variable
 * @param p partition (ball to be thrown)
 * @return partition group (bin to be put in)
 */
  def pickBin(p: Partition): PartitionGroup = 
//获取父rdd的该Partition的本地性所在主机的列表,并按其包含的分区数目从少到多排序
    val pref = currPrefLocs(p).map(getLeastGroupHash(_)).sortWith(compare) // least loaded pref locs
//如果没有列表,则返回none,如果有,则返回最少的那个主机名的PartitionGroup
  val prefPart = if (pref == Nil) None else pref.head
    //随机选择2个PartitionGroup中包含分区数目最小的PartitionGroup
  val r1 = rnd.nextInt(groupArr.size)
  val r2 = rnd.nextInt(groupArr.size)
  val minPowerOfTwo = if (groupArr(r1).size < groupArr(r2).size) groupArr(r1) else groupArr(r2)
  if (prefPart.isEmpty) //如果无本地性要求,则返回minPowerOfTwo
    // if no preferred locations, just use basic power of two
    return minPowerOfTwo
  
  
  val prefPartActual = prefPart.get
  //否则根据平衡因子来选择
  if (minPowerOfTwo.size + slack <= prefPartActual.size)  // more imbalance than the slack allows
    minPowerOfTwo  // prefer balance over locality
   else 
    prefPartActual // prefer locality over balance
  
 
因此合并的原则就是:
1.保证CoalescedRDD的每个分区个数相同
2.CoalescedRDD的每个分区,尽量跟它的Parent RDD的本地性相同。比如说CoalescedRDD的分区1对应于它的Parent RDD的1到10这10个分区,但是1到7这7个分区在节点1.1.1.1上,那么 CoalescedRDD的分区1所要执行的节点就是1.1.1.1。这么做的目的是为了减少节点间的数据通信,提升处理能力。
3.CoalescedRDD的分区尽量分配到不同的节点执行
比如说:
1)3个分区合并成2个分区,shuffle为true

 
 
ShuffleRDD的getPreferredLocations为Nil
 
 
2)2个分区合并成3个分区,shuffle为true

 
 
ShuffleRDD的getPreferredLocations为Nil
3)5个分区合并成3个分区,shuffle为 false,父RDD的每个分区都包含本地性

 
 
4)5个分区合并成3个分区,shuffle为 false,父RDD的每个分区不包含本地性
 
 
 
5)3个分区合并成5个分区,shuffle为 false,父RDD的每个分区都包含本地性
 
 
 
6)3个分区合并成5个分区,shuffle为 false,父RDD的每个分区不包含本地性
 
 
 

27.repartition

对RDD重分区,重分区之后的分区个数为numPartitions。
/**
 * Return a new RDD that has exactly numPartitions partitions.
 *
 * Can increase or decrease the level of parallelism in this RDD. Internally, this uses
 * a shuffle to redistribute data.
 *
 * If you are decreasing the number of partitions in this RDD, consider using `coalesce`,
 * which can avoid performing a shuffle.
 */
  def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope 
  coalesce(numPartitions, shuffle = true)
可见其本质调用的还是coalesce,但是其shuffle参数为true,因为如果为false,则有可能获取不到指定分区个数的rdd

28.sample

Sample是对rdd中的数据集进行采样,并生成一个新的RDD,这个新的RDD只有原来RDD的部分数据,这个保留的数据集大小由fraction来进行控制. 
/**
 * Return a sampled subset of this RDD.
 *
 * @param withReplacement can elements be sampled multiple times (replaced when sampled out)
 * @param fraction expected size of the sample as a fraction of this RDD's size
 *  without replacement: probability that each element is chosen; fraction must be [0, 1]
 *  with replacement: expected number of times each element is chosen; fraction must be >= 0
 * @param seed seed for the random number generator
 */
  def sample(
    withReplacement: Boolean,
    fraction: Double,
    seed: Long = Utils.random.nextLong): RDD[T] = withScope 
  require(fraction >= 0.0, "Negative fraction value: " + fraction)
  if (withReplacement) 
    new PartitionwiseSampledRDD[T, T](this, new PoissonSampler[T](fraction), true, seed)
   else 
    new PartitionwiseSampledRDD[T, T](this, new BernoulliSampler[T](fraction), true, seed)
  
withReplacement:这个值如果是true时,采用PoissonSampler抽样器(Poisson分布),否则使用BernoulliSampler的抽样器.
Fraction:一个大于0,小于或等于1的小数值,用于控制要读取的数据所占整个数据集的概率.
Seed:这个值如果没有传入,默认值是一个0~Long.maxvalue之间的整数.
至于那个PoissonSampler抽样器和BernoulliSampler抽样器,数学理论比较深,大家感兴趣可以百度相关资料查看其抽样原理,这里不详细叙述其抽样的内部原理。
继续看PartitionwiseSampledRDD: 
private[spark] class PartitionwiseSampledRDD[T: ClassTag, U: ClassTag](
    prev: RDD[T],
    sampler: RandomSampler[T, U],
    @transient preservesPartitioning: Boolean,
    @transient seed: Long = Utils.random.nextLong)
  extends RDD[U](prev) 
  @transient override val partitioner = if (preservesPartitioning) prev.partitioner else None
  override def getPartitions: Array[Partition] = 
    val random = new Random(seed)
    firstParent[T].partitions.map(x => new PartitionwiseSampledRDDPartition(x, random.nextLong()))
  
  override def getPreferredLocations(split: Partition): Seq[String] =
    firstParent[T].preferredLocations(split.asInstanceOf[PartitionwiseSampledRDDPartition].prev)
  override def compute(splitIn: Partition, context: TaskContext): Iterator[U] = 
    val split = splitIn.asInstanceOf[PartitionwiseSampledRDDPartition]
    val thisSampler = sampler.clone
    thisSampler.setSeed(split.seed)
//调用不同的抽样器针对每个分区进行抽样
    thisSampler.sample(firstParent[T].iterator(split.prev, context))
  
 

29.takeSample

takeSample函数返回一个数组,在数据集中随机采样 num 个元素组成。
/**
 * Return a fixed-size sampled subset of this RDD in an array
 *
 * @param withReplacement whether sampling is done with replacement
 * @param num size of the returned sample
 * @param seed seed for the random number generator
 * @return sample of specified size in an array
 */
// TODO: rewrite this without return statements so we can wrap it in a scope
  def takeSample(
    withReplacement: Boolean,
    num: Int,
    seed: Long = Utils.random.nextLong): Array[T] = 
  val numStDev = 10.0
  
  if (num < 0) //num为负数,直接抛异常
    throw new IllegalArgumentException("Negative number of elements requested")
   else if (num == 0) //num为0的话,直接返回空数组
    return new Array[T](0)
  
  
  val initialCount = this.count()
  if (initialCount == 0) //如果rdd为空的话,那么就直接返回空数组
      return new Array[T](0)
  
  //计算最大允许的采样个数
  val maxSampleSize = Int.MaxValue - (numStDev * math.sqrt(Int.MaxValue)).toInt
  if (num > maxSampleSize) 
throw new IllegalArgumentException("Cannot support a sample size > Int.MaxValue - " +
      s"$numStDev * math.sqrt(Int.MaxValue)")
  
  
  val rand = new Random(seed)
//如果不支持重复采样,且采样总数大于rdd的个数,则直接把rdd的数据集混洗完返回
  if (!withReplacement && num >= initialCount) 
    return Utils.randomizeInPlace(this.collect(), rand)
  
  //为了利用sample方法,需要计算采样百分比
  val fraction = SamplingUtils.computeFractionForSampleSize(num, initialCount,
    withReplacement)
  //尝试进行第一次采样
  var samples = this.sample(withReplacement, fraction, rand.nextInt()).collect()
  
  // If the first sample didn't turn out large enough, keep trying to take samples;
  // this shouldn't happen often because we use a big multiplier for the initial size
  var numIters = 0
//如果采样返回的个数不满足条件,则继续利用sample进行采样
  while (samples.length < num) 
    logWarning(s"Needed to re-sample due to insufficient sample size. Repeat #$numIters")
    samples = this.sample(withReplacement, fraction, rand.nextInt()).collect()
    numIters += 1
  
  //将结果集混洗完取前num条数据
  Utils.randomizeInPlace(samples, rand).take(num)
 
 

30.randomSplit

依据所提供的权重对该RDD进行随机划分
def randomSplit(
    weights: Array[Double],
    seed: Long = Utils.random.nextLong): Array[RDD[T]] = withScope 
  val sum = weights.sum
  val normalizedCumWeights = weights.map(_ / sum).scanLeft(0.0d)(_ + _)
  normalizedCumWeights.sliding(2).map  x =>
    randomSampleWithRange(x(0), x(1), seed)
  .toArray

  
  /**
 * Internal method exposed for Random Splits in DataFrames. Samples an RDD given a probability
 * range.
 * @param lb lower bound to use for the Bernoulli sampler
 * @param ub upper bound to use for the Bernoulli sampler
 * @param seed the seed for the Random number generator
 * @return A random sub-sample of the RDD without replacement.
 */
  private[spark] def randomSampleWithRange(lb: Double, ub: Double, seed: Long): RDD[T] = 
  this.mapPartitionsWithIndex(  (index, partition) =>
    val sampler = new BernoulliCellSampler[T](lb, ub)
    sampler.setSeed(seed + index)
    sampler.sample(partition)
  , preservesPartitioning = true)
 
假设按照以下进行采样:

List<Integer> data = Arrays.asList(1,2,4,3,5,6,7);

JavaRDD<Integer> javaRDD = javaSparkContext.parallelize(data);

double [] weights =  1,2,3,4;

//依据所提供的权重对该RDD进行随机划分

JavaRDD<Integer> [] randomSplitRDDs = javaRDD.randomSplit(weights);
先进行权重的计算,即normalizedCumWeights=[0.0,0.1,0.3,0.6,1.0],然后调用normalizedCumWeights.sliding(2)将其两两分组,即转化为[0.0,0.1],[0.1,0.3],[0.3,0.6],[0.6,0.1],接着利用伯努利采样器进行采样,即: 
class BernoulliCellSampler[T](lb: Double, ub: Double, complement: Boolean = false)
  extends RandomSampler[T, T] 
  /** epsilon slop to avoid failure from floating point jitter. */
  require(
    lb <= (ub + RandomSampler.roundingEpsilon),
    s"Lower bound ($lb) must be <= upper bound ($ub)")
  require(
    lb >= (0.0 - RandomSampler.roundingEpsilon),
    s"Lower bound ($lb) must be >= 0.0")
  require(
    ub <= (1.0 + RandomSampler.roundingEpsilon),
    s"Upper bound ($ub) must be <= 1.0")
  private val rng: Random = new XORShiftRandom
  override def setSeed(seed: Long): Unit = rng.setSeed(seed)
//利用sample进行采样
  override def sample(items: Iterator[T]): Iterator[T] = 
    if (ub - lb <= 0.0) 
      if (complement) items else Iterator.empty
     else 
      if (complement) 
        items.filter  item => 
          val x = rng.nextDouble()
          (x < lb) || (x >= ub)
        
       else //正常走这个分之,其complement为false
//其实就是利用rng生成随机数,然后判断其范围,是否在[lb,ub)区间范围之内,是的话就保留,否则就抛弃
        items.filter  item => 
          val x = rng.nextDouble()
          (x >= lb) && (x < ub)
        
      
    
  
  
  /**
   *  Return a sampler that is the complement of the range specified of the current sampler.
   */
  def cloneComplement(): BernoulliCellSampler[T] =
    new BernoulliCellSampler[T](lb, ub, !complement)
  
  override def clone: BernoulliCellSampler[T] = new BernoulliCellSampler[T](lb, ub, complement)
 
则以上采样的结果可能为,权重大的获取到的数据量就大,但相互之间不一定成比例,只代表一种概率
scala> randomSplitRDDs (0).collect
res10: Array[Int] = Array(1, 4)
scala> randomSplitRDDs (1).collect
res11: Array[Int] = Array(3)                                                    
scala> randomSplitRDDs (2).collect
res12: Array[Int] = Array(5, 9)
scala> randomSplitRDDs (3).collect
res13: Array[Int] = Array(2, 6, 7, 8, 10)
 

以上是关于Spark算子执行流程详解之六的主要内容,如果未能解决你的问题,请参考以下文章

Spark算子执行流程详解之八

Spark算子执行流程详解之八

Spark算子执行流程详解之一

Spark算子执行流程详解之五

Spark算子执行流程详解之四

Spark算子执行流程详解之七