谁能给spark中的combineByKey解释清楚?

Posted

技术标签:

【中文标题】谁能给spark中的combineByKey解释清楚?【英文标题】:Who can give a clear explanation for `combineByKey` in Spark? 【发布时间】:2015-11-26 11:34:40 【问题描述】:

我正在学习spark,但是看不懂这个功能combineByKey

>>> data = sc.parallelize([("A",1),("A",2),("B",1),("B",2),("C",1)] )
>>> data.combineByKey(lambda v : str(v)+"_", lambda c, v : c+"@"+str(v), lambda c1, c2 : c1+c2).collect()

输出是:

[('A', '1_2_'), ('C', '1_'), ('B', '1_2_')]

首先,我很困惑:第二步lambda c, v : c+"@"+v中的@在哪里?我在结果中找不到@

其次,我阅读了combineByKey的函数描述,但我对算法流程感到困惑。

【问题讨论】:

@eliasah 的回答已经涵盖了所有内容。但我建议你尝试使用不同数量的分区:sc.parallelize([("A",1),("A",2),("B",1),("B",2),("C",1)], num_partitions) 使用 num_partitions=1 你会得到 [('A', '1_@2'), ('C', '1_'), ('B', '1_@2')] 【参考方案1】:

groupByKey 调用不会尝试合并/组合值,因此这是一项昂贵的操作。

因此,combineByKey 调用就是这样一种优化。使用combineByKey 时,值在每个分区合并为一个值,然后每个分区值合并为一个值。值得注意的是,组合值的类型不必与原始值的类型匹配,而且通常情况下不会匹配。 combineByKey 函数接受 3 个函数作为参数:

    创建组合器的函数。在aggregateByKey 函数中,第一个参数只是一个初始零值。在combineByKey 中,我们提供了一个函数,它将接受我们当前的值作为参数并返回我们将与其他值合并的新值。

    第二个函数是一个合并函数,它接受一个值并将其合并/组合到之前收集的值中。

    第三个函数将合并的值组合在一起。基本上,此函数采用在分区级别生成的新值并将它们组合起来,直到我们最终得到一个奇异值。

换句话说,要理解combineByKey,思考它如何处理它处理的每个元素是很有用的。当combineByKey 遍历分区中的元素时,每个元素要么有一个以前没有见过的键,要么与前一个元素具有相同的键。

如果是新元素,combineByKey 使用我们提供的函数createCombiner() 为该键上的累加器创建初始值。请务必注意,这种情况发生在第一次在每个分区中找到键时,而不是仅在第一次在 RDD 中找到键时发生。

如果它是我们之前在处理该分区时看到的值,它将改为使用提供的函数mergeValue(),以及该键的累加器的当前值和新值。

由于每个分区都是独立处理的,我们可以为同一个 key 有多个累加器。当我们合并来自每个分区的结果时,如果两个或多个分区具有相同键的累加器,我们使用用户提供的mergeCombiners() 函数合并累加器。

参考资料:

学习火花 - Chapter 4. Using combineByKey in Apache-Spark 博客条目。

【讨论】:

【参考方案2】:

'@' 只添加在每个分区中。在您的示例中,似乎每个分区中只有一个元素。 试试:

data.combineByKey(lambda v : str(v)+"_", lambda c, v : c+"@"+str(v), lambda c1, c2 : c1+'$'+c2).collect() $

看看区别

【讨论】:

【参考方案3】:

这里是 combineByKey 的一个例子。目标是找到输入数据的每个键的平均值。

scala> val kvData = Array(("a",1),("b",2),("a",3),("c",9),("b",6))
kvData: Array[(String, Int)] = Array((a,1), (b,2), (a,3), (c,9), (b,6))

scala> val kvDataDist = sc.parallelize(kvData,5)
kvDataDist: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[0] at parallelize at <console>:26

scala> val keyAverages = kvDataDist.combineByKey(x=>(x,1),(a: (Int,Int),x: Int)=>(a._1+x,a._2+1),(b: (Int,Int),c: (Int,Int))=>(b._1+c._1,b._2+c._2))
keyAverages: org.apache.spark.rdd.RDD[(String, (Int, Int))] = ShuffledRDD[4] at combineByKey at <console>:25

scala> keyAverages.collect
res0: Array[(String, (Int, Int))] = Array((c,(9,1)), (a,(4,2)), (b,(8,2)))

scala> val keyAveragesFinal = keyAverages.map(x => (x._1,x._2._1/x._2._2))
keyAveragesFinal: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[3] at map at <console>:25

scala> keyAveragesFinal.collect
res1: Array[(String, Int)] = Array((c,9), (a,2), (b,4))

combineByKey 接受 3 个函数作为参数:

    函数 1 = createCombiner:在每个分区中,每个键“k”调用一次

    输入:与键“k”关联的值 输出:基于程序逻辑的任何所需输出类型“O”。此输出类型将自动进一步使用。 在此示例中,选择的输出类型是 (Int,Int)。 Pair 中的第一个元素对值求和,第二个元素跟踪构成总和的元素的数量。

    Function 2 = mergeValue : 与分区中键“k”的出现次数一样多次 - 1

    输入:(createCombiner 的输出:O,与此分区中的键“k”关联的后续值) 输出:(输出:O)

    函数 3 = mergeCombiners :调用次数与键所在的分区一样多

    输入:(分区 X 的 mergeValue 输出:O,分区 Y 的 mergeValue 输出:O) 输出:(输出:O)

【讨论】:

以上是关于谁能给spark中的combineByKey解释清楚?的主要内容,如果未能解决你的问题,请参考以下文章

spark算子:combineByKey

spark通过combineByKey算子实现条件性聚合的方法

优化 Spark combineByKey

reduceByKey 与 groupByKey 与 aggregateByKey 与 combineByKey 之间的火花差异

Spark核心RDD:combineByKey函数详解

Spark算子篇 --Spark算子之combineByKey详解