Spark算子系列第1篇: reduceByKey 和 groupByKey

Posted Frank201608

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark算子系列第1篇: reduceByKey 和 groupByKey相关的知识,希望对你有一定的参考价值。

先通过示例代码来理解reduceByKey和groupByKey:

scala> val wordsRDD = sc.parallelize(Array("one", "two", "two", "three", "three", "three"),2).map(word => (word, 1))
wordsRDD: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[33] at map at <console>:24

scala> wordsRDD.collect
res37: Array[(String, Int)] = Array((one,1), (two,1), (two,1), (three,1), (three,1), (three,1))

scala> wordsRDD.foreach(x=>println(x._1+"|"+x._2))
one|1
three|1
three|1
two|1
three|1
two|1

//reduceByKey算子:
scala> wordsRDD.reduceByKey(_+_).foreach(x=>println(x._1+"|"+x._2))
two|2
one|1
three|3

scala> wordsRDD.reduceByKey(_+_)
res40: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[34] at reduceByKey at <console>:27

//groupByKey算子:
scala> wordsRDD.groupByKey().foreach(x=>println(x._1+"|"+x._2.sum))
two|2
one|1
three|3

scala> wordsRDD.groupByKey()
res41: org.apache.spark.rdd.RDD[(String, Iterable[Int])] = ShuffledRDD[35] at groupByKey at <console>:27

scala> wordsRDD.groupByKey().collect
res43: Array[(String, Iterable[Int])] = Array((two,CompactBuffer(1, 1)), (one,CompactBuffer(1)), (three,CompactBuffer(1, 1, 1)))

scala> wordsRDD.groupByKey().foreach(println)
(two,CompactBuffer(1, 1))
(one,CompactBuffer(1))
(three,CompactBuffer(1, 1, 1))

分析如下: 

1、从上面的运行结果可以看到,虽然两个函数都能得出正确的结果, 但reduceByKey函数更适合使用在大数据集上。

这是因为reduceByKey可以在每个分区移动数据之前将输出数据与一个共用的key结合,数据示例如下图所示:

2、当调用 groupByKey时,所有的键值对(key-value pair) 都会被移动,在网络上传输这些数据非常没必要,数据示例如下图所示:

3、通过上面两个图,可以想象一个非常大的数据集,在使用 reduceByKey 和 groupByKey 时他们的差别会被放大更多倍。另外,当移动的数据量大于单台执行机器内存总量时,Spark还需要把数据保存到磁盘上,这会更加影响性能。因此避免使用 GroupByKey。

算子源代码解析:

1、reduceByKey源码

 /**
   * Merge the values for each key using an associative and commutative reduce function. This will
   * also perform the merging locally on each mapper before sending results to a reducer, similarly
   * to a "combiner" in MapReduce.
   */
  def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] = self.withScope 
    combineByKeyWithClassTag[V]((v: V) => v, func, func, partitioner)
  

  /**
   * Merge the values for each key using an associative and commutative reduce function. This will
   * also perform the merging locally on each mapper before sending results to a reducer, similarly
   * to a "combiner" in MapReduce. Output will be hash-partitioned with numPartitions partitions.
   */
  def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)] = self.withScope 
    reduceByKey(new HashPartitioner(numPartitions), func)
  

  /**
   * Merge the values for each key using an associative and commutative reduce function. This will
   * also perform the merging locally on each mapper before sending results to a reducer, similarly
   * to a "combiner" in MapReduce. Output will be hash-partitioned with the existing partitioner/
   * parallelism level.
   */
  def reduceByKey(func: (V, V) => V): RDD[(K, V)] = self.withScope 
    reduceByKey(defaultPartitioner(self), func)
  

2、groupByKey源码

/**
   * Group the values for each key in the RDD into a single sequence. Hash-partitions the
   * resulting RDD with the existing partitioner/parallelism level. The ordering of elements
   * within each group is not guaranteed, and may even differ each time the resulting RDD is
   * evaluated.
   *
   * Note: This operation may be very expensive. If you are grouping in order to perform an
   * aggregation (such as a sum or average) over each key, using [[PairRDDFunctions.aggregateByKey]]
   * or [[PairRDDFunctions.reduceByKey]] will provide much better performance.
   */
  def groupByKey(): RDD[(K, Iterable[V])] = self.withScope 
    groupByKey(defaultPartitioner(self))
  

 /**
   * Group the values for each key in the RDD into a single sequence. Hash-partitions the
   * resulting RDD with into `numPartitions` partitions. The ordering of elements within
   * each group is not guaranteed, and may even differ each time the resulting RDD is evaluated.
   *
   * Note: This operation may be very expensive. If you are grouping in order to perform an
   * aggregation (such as a sum or average) over each key, using [[PairRDDFunctions.aggregateByKey]]
   * or [[PairRDDFunctions.reduceByKey]] will provide much better performance.
   *
   * Note: As currently implemented, groupByKey must be able to hold all the key-value pairs for any
   * key in memory. If a key has too many values, it can result in an [[OutOfMemoryError]].
   */
  def groupByKey(numPartitions: Int): RDD[(K, Iterable[V])] = self.withScope 
    groupByKey(new HashPartitioner(numPartitions))
  

 /**
   * Group the values for each key in the RDD into a single sequence. Allows controlling the
   * partitioning of the resulting key-value pair RDD by passing a Partitioner.
   * The ordering of elements within each group is not guaranteed, and may even differ
   * each time the resulting RDD is evaluated.
   *
   * Note: This operation may be very expensive. If you are grouping in order to perform an
   * aggregation (such as a sum or average) over each key, using [[PairRDDFunctions.aggregateByKey]]
   * or [[PairRDDFunctions.reduceByKey]] will provide much better performance.
   *
   * Note: As currently implemented, groupByKey must be able to hold all the key-value pairs for any
   * key in memory. If a key has too many values, it can result in an [[OutOfMemoryError]].
   */
  def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])] = self.withScope 
    // groupByKey shouldn't use map side combine because map side combine does not
    // reduce the amount of data shuffled and requires all map side data be inserted
    // into a hash table, leading to more objects in the old gen.
    val createCombiner = (v: V) => CompactBuffer(v)
    val mergeValue = (buf: CompactBuffer[V], v: V) => buf += v
    val mergeCombiners = (c1: CompactBuffer[V], c2: CompactBuffer[V]) => c1 ++= c2
    val bufs = combineByKeyWithClassTag[CompactBuffer[V]](
      createCombiner, mergeValue, mergeCombiners, partitioner, mapSideCombine = false)
    bufs.asInstanceOf[RDD[(K, Iterable[V])]]
  

3、从上面源码可知,两个算子都调用了combineByKeyWithClassTag,下面是combineByKeyWithClassTag的定义:

/**
   * :: Experimental ::
   * Generic function to combine the elements for each key using a custom set of aggregation
   * functions. Turns an RDD[(K, V)] into a result of type RDD[(K, C)], for a "combined type" C
   * Note that V and C can be different -- for example, one might group an RDD of type
   * (Int, Int) into an RDD of type (Int, Seq[Int]). Users provide three functions:
   *
   *  - `createCombiner`, which turns a V into a C (e.g., creates a one-element list)
   *  - `mergeValue`, to merge a V into a C (e.g., adds it to the end of a list)
   *  - `mergeCombiners`, to combine two C's into a single one.
   *
   * In addition, users can control the partitioning of the output RDD, and whether to perform
   * map-side aggregation (if a mapper can produce multiple items with the same key).
   */
  @Experimental
  def combineByKeyWithClassTag[C](
      createCombiner: V => C,
      mergeValue: (C, V) => C,
      mergeCombiners: (C, C) => C,
      partitioner: Partitioner,
      mapSideCombine: Boolean = true,
      serializer: Serializer = null)(implicit ct: ClassTag[C]): RDD[(K, C)] = self.withScope 
    require(mergeCombiners != null, "mergeCombiners must be defined") // required as of Spark 0.9.0
    if (keyClass.isArray) 
      if (mapSideCombine) 
        throw new SparkException("Cannot use map-side combining with array keys.")
      
      if (partitioner.isInstanceOf[HashPartitioner]) 
        throw new SparkException("Default partitioner cannot partition array keys.")
      
    
    val aggregator = new Aggregator[K, V, C](
      self.context.clean(createCombiner),
      self.context.clean(mergeValue),
      self.context.clean(mergeCombiners))
    if (self.partitioner == Some(partitioner)) 
      self.mapPartitions(iter => 
        val context = TaskContext.get()
        new InterruptibleIterator(context, aggregator.combineValuesByKey(iter, context))
      , preservesPartitioning = true)
     else 
      new ShuffledRDD[K, V, C](self, partitioner)
        .setSerializer(serializer)
        .setAggregator(aggregator)
        .setMapSideCombine(mapSideCombine)
    
  

从combineByKeyWithClassTag的源码可知,该函数的参数中,有三个函数作为参数:

createCombiner: V => C,
mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C
*  - `createCombiner`, which turns a V into a C (e.g., creates a one-element list)
*  - `mergeValue`, to merge a V into a C (e.g., adds it to the end of a list)
*  - `mergeCombiners`, to combine two C's into a single one.

createCombiner: V => C ,该函数把当前的值作为参数,可以对其做某些类似于初始化的操作(比如类型转换),并把它返回 
mergeValue: (C, V) => C,该函数把元素V合并到之前的元素C(createCombiner)上 (这个操作在每个分区内进行)
mergeCombiners: (C, C) => C,该函数把2个元素C合并 (这个操作在不同分区间进行)

4、combineByKeyWithClassTag 的简化版本:combineByKey,定义如下:

  /**
   * Simplified version of combineByKeyWithClassTag that hash-partitions the resulting RDD using the
   * existing partitioner/parallelism level. This method is here for backward compatibility. It
   * does not provide combiner classtag information to the shuffle.
   *
   * @see [[combineByKeyWithClassTag]]
   */
  def combineByKey[C](
      createCombiner: V => C,
      mergeValue: (C, V) => C,
      mergeCombiners: (C, C) => C): RDD[(K, C)] = self.withScope 
    combineByKeyWithClassTag(createCombiner, mergeValue, mergeCombiners)(null)
  
  
/**
   * Simplified version of combineByKeyWithClassTag that hash-partitions the output RDD.
   * This method is here for backward compatibility. It does not provide combiner
   * classtag information to the shuffle.
   *
   * @see [[combineByKeyWithClassTag]]
   */
  def combineByKey[C](
      createCombiner: V => C,
      mergeValue: (C, V) => C,
      mergeCombiners: (C, C) => C,
      numPartitions: Int): RDD[(K, C)] = self.withScope 
    combineByKeyWithClassTag(createCombiner, mergeValue, mergeCombiners, numPartitions)(null)
  

5、combineByKey应用示例:

var rdd1 = sc.makeRDD(Array(("A", 1), ("A", 2), ("B", 1), ("B", 2),("B",3),("B",4), ("C", 1)))
    rdd1.combineByKey(
      (v: Int) => v + "_",
      (c: String, v: Int) => c + "@" + v,
      (c1: String, c2: String) => c1 + "$" + c2
    ).collect.foreach(println)
单机模式下运行结果如下:
(A,1_$2_)
(B,1_@2$3_$4_)
(C,1_)

总结:

有许多函数比goupByKey好:

当你combine元素时,可以使用combineByKey,但是输入值类型和输出可能不一样
foldByKey合并每一个 key 的所有值,
作用于RDD[K,V]根据K将V做折叠、合并处理,其中的参数zeroValue表示先根据映射函数将zeroValue应用与V,进行初始化V,在将映射函数应用于初始化后的V。
foldByKey,aggregateByKey都是由combineByKey实现,并且mapSideCombine=true,因此可以使用这些函数替代goupByKey。

附录:

foldByKey函数:

/**
   * Merge the values for each key using an associative function and a neutral "zero value" which
   * may be added to the result an arbitrary number of times, and must not change the result
   * (e.g., Nil for list concatenation, 0 for addition, or 1 for multiplication.).
   */
  def foldByKey(
      zeroValue: V,
      partitioner: Partitioner)(func: (V, V) => V): RDD[(K, V)] = self.withScope 
    // Serialize the zero value to a byte array so that we can get a new clone of it on each key
    val zeroBuffer = SparkEnv.get.serializer.newInstance().serialize(zeroValue)
    val zeroArray = new Array[Byte](zeroBuffer.limit)
    zeroBuffer.get(zeroArray)

    // When deserializing, use a lazy val to create just one instance of the serializer per task
    lazy val cachedSerializer = SparkEnv.get.serializer.newInstance()
    val createZero = () => cachedSerializer.deserialize[V](ByteBuffer.wrap(zeroArray))

    val cleanedFunc = self.context.clean(func)
    combineByKeyWithClassTag[V]((v: V) => cleanedFunc(createZero(), v),
      cleanedFunc, cleanedFunc, partitioner)
  

  /**
   * Merge the values for each key using an associative function and a neutral "zero value" which
   * may be added to the result an arbitrary number of times, and must not change the result
   * (e.g., Nil for list concatenation, 0 for addition, or 1 for multiplication.).
   */
  def foldByKey(zeroValue: V, numPartitions: Int)(func: (V, V) => V): RDD[(K, V)] = self.withScope 
    foldByKey(zeroValue, new HashPartitioner(numPartitions))(func)
  

  /**
   * Merge the values for each key using an associative function and a neutral "zero value" which
   * may be added to the result an arbitrary number of times, and must not change the result
   * (e.g., Nil for list concatenation, 0 for addition, or 1 for multiplication.).
   */
  def foldByKey(zeroValue: V)(func: (V, V) => V): RDD[(K, V)] = self.withScope 
    foldByKey(zeroValue, defaultPartitioner(self))(func)
  

foldByKey运行示例:

rdd1.foldByKey(0)(_+_).collect().foreach(println)

运行结果如下:
(A,3)
(B,15)
(C,1)

   * rdd1中每个key对应的V进行累加,注意zeroValue=0,需要先初始化V,映射函数为+操作
   * 比如,("A", 1), ("A", 2),先将zeroValue应用于每个V,得到("A", 1+0), ("A", 2+0)

以上是关于Spark算子系列第1篇: reduceByKey 和 groupByKey的主要内容,如果未能解决你的问题,请参考以下文章

Spark算子系列第0篇:spark常用算子详解

groupByKey,reduceByKey,sortByKey算子-Java&Python版Spark

Spark 算子 reduce / reduceByKey / reduceByKeyLocally 区别

spark:distinct算子实现原理

Spark 中 RDD 算子 ReduceByKey 和 GroupByKey 使用方法和区别

Spark常用的Transformation算子的简单例子