任何人都可以在 Spark 中实现 CombineByKey() 而不是 GroupByKey() 来对元素进行分组吗?

Posted

技术标签:

【中文标题】任何人都可以在 Spark 中实现 CombineByKey() 而不是 GroupByKey() 来对元素进行分组吗?【英文标题】:Can any one implement CombineByKey() instead of GroupByKey() in Spark in order to group elements? 【发布时间】:2020-03-26 14:05:05 【问题描述】:

我正在尝试对我创建的 RDD 的元素进行分组。一种简单但昂贵的方法是使用GroupByKey()。但最近我了解到CombineByKey() 可以更有效地完成这项工作。我的RDD很简单。它看起来像这样:

(1,5)
(1,8)
(1,40)
(2,9)
(2,20)
(2,6)
val grouped_elements=first_RDD.groupByKey()..mapValues(x => x.toList)

结果是:

(1,List(5,8,40))
(2,List(9,20,6))

我想根据第一个元素(键)对它们进行分组。

有人可以帮我用CombineByKey() 功能吗?我真的被CombineByKey()搞糊涂了

【问题讨论】:

这能回答你的问题吗? Apache Spark: What is the equivalent implementation of RDD.groupByKey() using RDD.aggregateByKey()? 这与我的问题很接近。但不完全是我想要的。但感谢您的关注 【参考方案1】:

首先看一下 API 参考 docs

combineByKey[C](createCombiner: (V) ⇒ C, mergeValue: (C, V) ⇒ C, mergeCombiners: (C, C) ⇒ C): RDD[(K, C)]

所以它接受我在下面定义的三个函数

scala> val createCombiner = (v:Int) => List(v)
createCombiner: Int => List[Int] = <function1>

scala> val mergeValue = (a:List[Int], b:Int) => a.::(b)
mergeValue: (List[Int], Int) => List[Int] = <function2>

scala> val mergeCombiners = (a:List[Int],b:List[Int]) => a.++(b)
mergeCombiners: (List[Int], List[Int]) => List[Int] = <function2>

一旦你定义了这些,你就可以在你的 combineByKey 调用中使用它,如下所示

scala> val list = List((1,5),(1,8),(1,40),(2,9),(2,20),(2,6))
list: List[(Int, Int)] = List((1,5), (1,8), (1,40), (2,9), (2,20), (2,6))

scala> val temp = sc.parallelize(list)
temp: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[41] at parallelize at <console>:30

scala> temp.combineByKey(createCombiner,mergeValue, mergeCombiners).collect
res27: Array[(Int, List[Int])] = Array((1,List(8, 40, 5)), (2,List(20, 9, 6)))

请注意,我在 Spark Shell 中对此进行了尝试,因此您可以在执行的命令下方看到输出。它们将帮助您加深理解。

【讨论】:

以上是关于任何人都可以在 Spark 中实现 CombineByKey() 而不是 GroupByKey() 来对元素进行分组吗?的主要内容,如果未能解决你的问题,请参考以下文章

Apache Spark - 分组

如何在 Spark 中实现“交叉连接”?

如何在 Spark ML 中实现 Kmeans 评估器

在 spark 中实现 informatica 逻辑

在 Spark GraphX 中实现拓扑排序

如何在 spark scala 中实现 uniqueConcatenate、uniqueCount [关闭]