Spark算子系列第1篇: reduceByKey 和 groupByKey
Posted Frank201608
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark算子系列第1篇: reduceByKey 和 groupByKey相关的知识,希望对你有一定的参考价值。
先通过示例代码来理解reduceByKey和groupByKey:
scala> val wordsRDD = sc.parallelize(Array("one", "two", "two", "three", "three", "three"),2).map(word => (word, 1))
wordsRDD: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[33] at map at <console>:24
scala> wordsRDD.collect
res37: Array[(String, Int)] = Array((one,1), (two,1), (two,1), (three,1), (three,1), (three,1))
scala> wordsRDD.foreach(x=>println(x._1+"|"+x._2))
one|1
three|1
three|1
two|1
three|1
two|1
//reduceByKey算子:
scala> wordsRDD.reduceByKey(_+_).foreach(x=>println(x._1+"|"+x._2))
two|2
one|1
three|3
scala> wordsRDD.reduceByKey(_+_)
res40: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[34] at reduceByKey at <console>:27
//groupByKey算子:
scala> wordsRDD.groupByKey().foreach(x=>println(x._1+"|"+x._2.sum))
two|2
one|1
three|3
scala> wordsRDD.groupByKey()
res41: org.apache.spark.rdd.RDD[(String, Iterable[Int])] = ShuffledRDD[35] at groupByKey at <console>:27
scala> wordsRDD.groupByKey().collect
res43: Array[(String, Iterable[Int])] = Array((two,CompactBuffer(1, 1)), (one,CompactBuffer(1)), (three,CompactBuffer(1, 1, 1)))
scala> wordsRDD.groupByKey().foreach(println)
(two,CompactBuffer(1, 1))
(one,CompactBuffer(1))
(three,CompactBuffer(1, 1, 1))
分析如下:
1、从上面的运行结果可以看到,虽然两个函数都能得出正确的结果, 但reduceByKey函数更适合使用在大数据集上。
这是因为reduceByKey可以在每个分区移动数据之前将输出数据与一个共用的key
结合,数据示例如下图所示:
2、当调用 groupByKey时,所有的键值对(key-value pair) 都会被移动,在网络上传输这些数据非常没必要,数据示例如下图所示:
3、通过上面两个图,可以想象一个非常大的数据集,在使用 reduceByKey 和 groupByKey 时他们的差别会被放大更多倍。另外,当移动的数据量大于单台执行机器内存总量时,Spark还需要
把数据保存到磁盘上,这会更加影响性能。因此避免使用 GroupByKey。
算子源代码解析:
1、reduceByKey源码
/**
* Merge the values for each key using an associative and commutative reduce function. This will
* also perform the merging locally on each mapper before sending results to a reducer, similarly
* to a "combiner" in MapReduce.
*/
def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] = self.withScope
combineByKeyWithClassTag[V]((v: V) => v, func, func, partitioner)
/**
* Merge the values for each key using an associative and commutative reduce function. This will
* also perform the merging locally on each mapper before sending results to a reducer, similarly
* to a "combiner" in MapReduce. Output will be hash-partitioned with numPartitions partitions.
*/
def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)] = self.withScope
reduceByKey(new HashPartitioner(numPartitions), func)
/**
* Merge the values for each key using an associative and commutative reduce function. This will
* also perform the merging locally on each mapper before sending results to a reducer, similarly
* to a "combiner" in MapReduce. Output will be hash-partitioned with the existing partitioner/
* parallelism level.
*/
def reduceByKey(func: (V, V) => V): RDD[(K, V)] = self.withScope
reduceByKey(defaultPartitioner(self), func)
2、groupByKey源码
/**
* Group the values for each key in the RDD into a single sequence. Hash-partitions the
* resulting RDD with the existing partitioner/parallelism level. The ordering of elements
* within each group is not guaranteed, and may even differ each time the resulting RDD is
* evaluated.
*
* Note: This operation may be very expensive. If you are grouping in order to perform an
* aggregation (such as a sum or average) over each key, using [[PairRDDFunctions.aggregateByKey]]
* or [[PairRDDFunctions.reduceByKey]] will provide much better performance.
*/
def groupByKey(): RDD[(K, Iterable[V])] = self.withScope
groupByKey(defaultPartitioner(self))
/**
* Group the values for each key in the RDD into a single sequence. Hash-partitions the
* resulting RDD with into `numPartitions` partitions. The ordering of elements within
* each group is not guaranteed, and may even differ each time the resulting RDD is evaluated.
*
* Note: This operation may be very expensive. If you are grouping in order to perform an
* aggregation (such as a sum or average) over each key, using [[PairRDDFunctions.aggregateByKey]]
* or [[PairRDDFunctions.reduceByKey]] will provide much better performance.
*
* Note: As currently implemented, groupByKey must be able to hold all the key-value pairs for any
* key in memory. If a key has too many values, it can result in an [[OutOfMemoryError]].
*/
def groupByKey(numPartitions: Int): RDD[(K, Iterable[V])] = self.withScope
groupByKey(new HashPartitioner(numPartitions))
/**
* Group the values for each key in the RDD into a single sequence. Allows controlling the
* partitioning of the resulting key-value pair RDD by passing a Partitioner.
* The ordering of elements within each group is not guaranteed, and may even differ
* each time the resulting RDD is evaluated.
*
* Note: This operation may be very expensive. If you are grouping in order to perform an
* aggregation (such as a sum or average) over each key, using [[PairRDDFunctions.aggregateByKey]]
* or [[PairRDDFunctions.reduceByKey]] will provide much better performance.
*
* Note: As currently implemented, groupByKey must be able to hold all the key-value pairs for any
* key in memory. If a key has too many values, it can result in an [[OutOfMemoryError]].
*/
def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])] = self.withScope
// groupByKey shouldn't use map side combine because map side combine does not
// reduce the amount of data shuffled and requires all map side data be inserted
// into a hash table, leading to more objects in the old gen.
val createCombiner = (v: V) => CompactBuffer(v)
val mergeValue = (buf: CompactBuffer[V], v: V) => buf += v
val mergeCombiners = (c1: CompactBuffer[V], c2: CompactBuffer[V]) => c1 ++= c2
val bufs = combineByKeyWithClassTag[CompactBuffer[V]](
createCombiner, mergeValue, mergeCombiners, partitioner, mapSideCombine = false)
bufs.asInstanceOf[RDD[(K, Iterable[V])]]
3、从上面源码可知,两个算子都调用了combineByKeyWithClassTag
,下面是combineByKeyWithClassTag
的定义:
/**
* :: Experimental ::
* Generic function to combine the elements for each key using a custom set of aggregation
* functions. Turns an RDD[(K, V)] into a result of type RDD[(K, C)], for a "combined type" C
* Note that V and C can be different -- for example, one might group an RDD of type
* (Int, Int) into an RDD of type (Int, Seq[Int]). Users provide three functions:
*
* - `createCombiner`, which turns a V into a C (e.g., creates a one-element list)
* - `mergeValue`, to merge a V into a C (e.g., adds it to the end of a list)
* - `mergeCombiners`, to combine two C's into a single one.
*
* In addition, users can control the partitioning of the output RDD, and whether to perform
* map-side aggregation (if a mapper can produce multiple items with the same key).
*/
@Experimental
def combineByKeyWithClassTag[C](
createCombiner: V => C,
mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C,
partitioner: Partitioner,
mapSideCombine: Boolean = true,
serializer: Serializer = null)(implicit ct: ClassTag[C]): RDD[(K, C)] = self.withScope
require(mergeCombiners != null, "mergeCombiners must be defined") // required as of Spark 0.9.0
if (keyClass.isArray)
if (mapSideCombine)
throw new SparkException("Cannot use map-side combining with array keys.")
if (partitioner.isInstanceOf[HashPartitioner])
throw new SparkException("Default partitioner cannot partition array keys.")
val aggregator = new Aggregator[K, V, C](
self.context.clean(createCombiner),
self.context.clean(mergeValue),
self.context.clean(mergeCombiners))
if (self.partitioner == Some(partitioner))
self.mapPartitions(iter =>
val context = TaskContext.get()
new InterruptibleIterator(context, aggregator.combineValuesByKey(iter, context))
, preservesPartitioning = true)
else
new ShuffledRDD[K, V, C](self, partitioner)
.setSerializer(serializer)
.setAggregator(aggregator)
.setMapSideCombine(mapSideCombine)
从combineByKeyWithClassTag的源码可知,该函数的参数中,有三个函数作为参数:
createCombiner: V => C,
mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C
* - `createCombiner`, which turns a V into a C (e.g., creates a one-element list)
* - `mergeValue`, to merge a V into a C (e.g., adds it to the end of a list)
* - `mergeCombiners`, to combine two C's into a single one.
createCombiner: V => C ,该函数把当前的值作为参数,可以对其做某些类似于初始化的操作(比如类型转换),并把它返回
mergeValue: (C, V) => C,该函数把元素V合并到之前的元素C(createCombiner)上 (这个操作在每个分区内进行)
mergeCombiners: (C, C) => C,该函数把2个元素C合并 (这个操作在不同分区间进行)
4、combineByKeyWithClassTag 的简化版本:combineByKey,定义如下:
/**
* Simplified version of combineByKeyWithClassTag that hash-partitions the resulting RDD using the
* existing partitioner/parallelism level. This method is here for backward compatibility. It
* does not provide combiner classtag information to the shuffle.
*
* @see [[combineByKeyWithClassTag]]
*/
def combineByKey[C](
createCombiner: V => C,
mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C): RDD[(K, C)] = self.withScope
combineByKeyWithClassTag(createCombiner, mergeValue, mergeCombiners)(null)
/**
* Simplified version of combineByKeyWithClassTag that hash-partitions the output RDD.
* This method is here for backward compatibility. It does not provide combiner
* classtag information to the shuffle.
*
* @see [[combineByKeyWithClassTag]]
*/
def combineByKey[C](
createCombiner: V => C,
mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C,
numPartitions: Int): RDD[(K, C)] = self.withScope
combineByKeyWithClassTag(createCombiner, mergeValue, mergeCombiners, numPartitions)(null)
5、combineByKey应用示例:
var rdd1 = sc.makeRDD(Array(("A", 1), ("A", 2), ("B", 1), ("B", 2),("B",3),("B",4), ("C", 1)))
rdd1.combineByKey(
(v: Int) => v + "_",
(c: String, v: Int) => c + "@" + v,
(c1: String, c2: String) => c1 + "$" + c2
).collect.foreach(println)
单机模式下运行结果如下:
(A,1_$2_)
(B,1_@2$3_$4_)
(C,1_)
总结:
有许多函数比goupByKey好:
当你combine元素时,可以使用combineByKey,但是输入值类型和输出可能不一样
作用于RDD[K,V]根据K将V做折叠、合并处理,其中的参数zeroValue表示先根据映射函数将zeroValue应用与V,进行初始化V,在将映射函数应用于初始化后的V。
foldByKey合并每一个 key 的所有值,foldByKey
,aggregateByKey
都是由combineByKey实现,并且mapSideCombine=true
,因此可以使用这些函数替代goupByKey。
附录:
foldByKey函数:
/**
* Merge the values for each key using an associative function and a neutral "zero value" which
* may be added to the result an arbitrary number of times, and must not change the result
* (e.g., Nil for list concatenation, 0 for addition, or 1 for multiplication.).
*/
def foldByKey(
zeroValue: V,
partitioner: Partitioner)(func: (V, V) => V): RDD[(K, V)] = self.withScope
// Serialize the zero value to a byte array so that we can get a new clone of it on each key
val zeroBuffer = SparkEnv.get.serializer.newInstance().serialize(zeroValue)
val zeroArray = new Array[Byte](zeroBuffer.limit)
zeroBuffer.get(zeroArray)
// When deserializing, use a lazy val to create just one instance of the serializer per task
lazy val cachedSerializer = SparkEnv.get.serializer.newInstance()
val createZero = () => cachedSerializer.deserialize[V](ByteBuffer.wrap(zeroArray))
val cleanedFunc = self.context.clean(func)
combineByKeyWithClassTag[V]((v: V) => cleanedFunc(createZero(), v),
cleanedFunc, cleanedFunc, partitioner)
/**
* Merge the values for each key using an associative function and a neutral "zero value" which
* may be added to the result an arbitrary number of times, and must not change the result
* (e.g., Nil for list concatenation, 0 for addition, or 1 for multiplication.).
*/
def foldByKey(zeroValue: V, numPartitions: Int)(func: (V, V) => V): RDD[(K, V)] = self.withScope
foldByKey(zeroValue, new HashPartitioner(numPartitions))(func)
/**
* Merge the values for each key using an associative function and a neutral "zero value" which
* may be added to the result an arbitrary number of times, and must not change the result
* (e.g., Nil for list concatenation, 0 for addition, or 1 for multiplication.).
*/
def foldByKey(zeroValue: V)(func: (V, V) => V): RDD[(K, V)] = self.withScope
foldByKey(zeroValue, defaultPartitioner(self))(func)
foldByKey运行示例:
rdd1.foldByKey(0)(_+_).collect().foreach(println)
运行结果如下:
(A,3)
(B,15)
(C,1)
* rdd1中每个key对应的V进行累加,注意zeroValue=0,需要先初始化V,映射函数为+操作
* 比如,("A", 1), ("A", 2),先将zeroValue应用于每个V,得到("A", 1+0), ("A", 2+0)
以上是关于Spark算子系列第1篇: reduceByKey 和 groupByKey的主要内容,如果未能解决你的问题,请参考以下文章
groupByKey,reduceByKey,sortByKey算子-Java&Python版Spark
Spark 算子 reduce / reduceByKey / reduceByKeyLocally 区别