Spark算子执行流程详解之四

Posted 亮亮-AC米兰

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark算子执行流程详解之四相关的知识,希望对你有一定的参考价值。

17.map

/**
 * Return a new RDD by applying a function to all elements of this RDD.
 */
def map[U: ClassTag](f: T => U): RDD[U] = withScope
  val cleanF = sc.clean(f)
  new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))

利用映射函数f针对其父RDD的每个元素进行处理,继续往下看MapPartitionsRDD

private[spark] class MapPartitionsRDD[U: ClassTag, T: ClassTag](
    prev: RDD[T],
    f: (TaskContext, Int, Iterator[T]) => Iterator[U],  // (TaskContext, partition index, iterator)
   
preservesPartitioning: Boolean = false)
  extends RDD[U](prev)
  override val partitioner = if (preservesPartitioning) firstParent[T].partitioner else None
  override def getPartitions: Array[Partition] = firstParent[T].partitions
  override def compute(split: Partition, context: TaskContext): Iterator[U] =
    f(context, split.index, firstParent[T].iterator(split, context))

注意关注其compute函数,取得是firstParent[T].iterator,即其第一个父RDD的迭代器,因为RDD是分区的,因此其迭代器需要传入分区索引进去,即split:

final def iterator(split: Partition, context: TaskContext): Iterator[T] =
  if (storageLevel != StorageLevel.NONE) //如果是需要缓存的,则读缓存或者计算(第一次的时候)
    SparkEnv.get.cacheManager.getOrCompute(this, split, context, storageLevel)
  else //否则的话重新计算或者读检查点的数据
    computeOrReadCheckpoint(split, context)
 

 

18.mapPartitions

针对RDD的每个分区进行处理,返回一个新的RDD
/**
 * Return a new RDD by applying a function to each partition of this RDD.
 *
 * `preservesPartitioning` indicates whether the input function preserves the partitioner, which
 * should be `false` unless this is a pair RDD and the input function doesn't modify the keys.
 */
  def mapPartitions[U: ClassTag](
    f: Iterator[T] => Iterator[U],
    preservesPartitioning: Boolean = false): RDD[U] = withScope 
  val cleanedF = sc.clean(f)
  new MapPartitionsRDD(
    this,
    (context: TaskContext, index: Int, iter: Iterator[T]) => cleanedF(iter),
    preservesPartitioning)
 

注意preservesPartitioning参数,如果子RDD需要保留父RDD的分区信息的话,则必须设置为true,否则经过转化之后的RDD的partitioner为None了。

同时需要注意的是:map是对rdd中的每一个元素进行操作,而mapPartitions(foreachPartition)则是对rdd中的每个分区的迭代器进行操作。如果在map过程中需要频繁创建额外的对象(例如将rdd中的数据通过jdbc写入数据库,map需要为每个元素创建一个链接而mapPartition为每个partition创建一个链接),则mapPartitions效率比map高的多。

SparkSql或DataFrame默认会对程序进行mapPartition的优化。


19.mapPartitionsWithIndex

和mapPartitions类似,只是在转化函数中会把分区索引传入进入

/*
 * Return a new RDD by applying a function to each partition of this RDD, while tracking the index
 * of the original partition.
 *
 * `preservesPartitioning` indicates whether the input function preserves the partitioner, which
 * should be `false` unless this is a pair RDD and the input function doesn't modify the keys.
 */
def mapPartitionsWithIndex[U: ClassTag](
    f: (Int, Iterator[T]) => Iterator[U],
    preservesPartitioning: Boolean = false): RDD[U] = withScope
  val cleanedF = sc.clean(f)
  new MapPartitionsRDD(
    this,
    (context: TaskContext, index: Int, iter: Iterator[T]) => cleanedF(index, iter),//会将分区索引index传入进来
    preservesPartitioning)

如果分区索引对于实际业务有意义的话,可以使用mapPartitionsWithIndex去进行相关转化。

20.flatMap

/**
 *  Return a new RDD by first applying a function to all elements of this
 *  RDD, and then flattening the results.
 */
def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U] = withScope
  val cleanF = sc.clean(f)
  new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.flatMap(cleanF))

flatMap是针对RDD的每个元素利用函数f生成多个元素,然后把这些结果全部串联起来,其调用的本质就是调用迭代器的flatmap函数:

def flatMap[B](f: A => GenTraversableOnce[B]): Iterator[B] = new AbstractIterator[B]
  private var cur: Iterator[B] = empty
 
def hasNext: Boolean =
    cur.hasNext || self.hasNext && cur = f(self.next).toIterator; hasNext
  def next(): B = (if (hasNext) cur else empty).next()

 

21.filter

/**
 * Return a new RDD containing only the elements that satisfy a predicate.
 */
def filter(f: T => Boolean): RDD[T] = withScope
  val cleanF = sc.clean(f)
  new MapPartitionsRDD[T, T](
    this,
    (context, pid, iter) => iter.filter(cleanF),
    preservesPartitioning = true)

filter是针对RDD的每个元素利用函数f过滤,返回过滤后的结果:

/** Returns an iterator over all the elements of this iterator that satisfy the predicate `p`.
 *  The order of the elements is preserved.
 *
 * 
@param p the predicate used to test values.
 * 
@return  an iterator which produces those values of this iterator which satisfy the predicate `p`.
 * 
@note    Reuse: $consumesAndProducesIterator
 */
def filter(p: A => Boolean): Iterator[A] = new AbstractIterator[A]
  private var hd: A = _
  private var hdDefined: Boolean = false

  def
hasNext: Boolean = hdDefined || //利用函数p过滤记录,直到找到符合条件的记录为止
    do
      if (!self.hasNext) return false
     
hd = self.next()
    while (!p(hd))
    hdDefined = true
    true
 


  def next() = if (hasNext) hdDefined = false; hd else empty.next()

 



以上是关于Spark算子执行流程详解之四的主要内容,如果未能解决你的问题,请参考以下文章

Spark算子执行流程详解之八

Spark算子执行流程详解之一

Spark算子执行流程详解之五

Spark算子执行流程详解之七

Spark算子执行流程详解之七

Spark算子执行流程详解之二