任何人都可以在 Spark 中实现 CombineByKey() 而不是 GroupByKey() 来对元素进行分组吗?
Posted
技术标签:
【中文标题】任何人都可以在 Spark 中实现 CombineByKey() 而不是 GroupByKey() 来对元素进行分组吗?【英文标题】:Can any one implement CombineByKey() instead of GroupByKey() in Spark in order to group elements? 【发布时间】:2020-03-26 14:05:05 【问题描述】:我正在尝试对我创建的 RDD 的元素进行分组。一种简单但昂贵的方法是使用GroupByKey()
。但最近我了解到CombineByKey()
可以更有效地完成这项工作。我的RDD很简单。它看起来像这样:
(1,5)
(1,8)
(1,40)
(2,9)
(2,20)
(2,6)
val grouped_elements=first_RDD.groupByKey()..mapValues(x => x.toList)
结果是:
(1,List(5,8,40))
(2,List(9,20,6))
我想根据第一个元素(键)对它们进行分组。
有人可以帮我用CombineByKey()
功能吗?我真的被CombineByKey()
搞糊涂了
【问题讨论】:
这能回答你的问题吗? Apache Spark: What is the equivalent implementation of RDD.groupByKey() using RDD.aggregateByKey()? 这与我的问题很接近。但不完全是我想要的。但感谢您的关注 【参考方案1】:首先看一下 API 参考 docs
combineByKey[C](createCombiner: (V) ⇒ C, mergeValue: (C, V) ⇒ C, mergeCombiners: (C, C) ⇒ C): RDD[(K, C)]
所以它接受我在下面定义的三个函数
scala> val createCombiner = (v:Int) => List(v)
createCombiner: Int => List[Int] = <function1>
scala> val mergeValue = (a:List[Int], b:Int) => a.::(b)
mergeValue: (List[Int], Int) => List[Int] = <function2>
scala> val mergeCombiners = (a:List[Int],b:List[Int]) => a.++(b)
mergeCombiners: (List[Int], List[Int]) => List[Int] = <function2>
一旦你定义了这些,你就可以在你的 combineByKey 调用中使用它,如下所示
scala> val list = List((1,5),(1,8),(1,40),(2,9),(2,20),(2,6))
list: List[(Int, Int)] = List((1,5), (1,8), (1,40), (2,9), (2,20), (2,6))
scala> val temp = sc.parallelize(list)
temp: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[41] at parallelize at <console>:30
scala> temp.combineByKey(createCombiner,mergeValue, mergeCombiners).collect
res27: Array[(Int, List[Int])] = Array((1,List(8, 40, 5)), (2,List(20, 9, 6)))
请注意,我在 Spark Shell 中对此进行了尝试,因此您可以在执行的命令下方看到输出。它们将帮助您加深理解。
【讨论】:
以上是关于任何人都可以在 Spark 中实现 CombineByKey() 而不是 GroupByKey() 来对元素进行分组吗?的主要内容,如果未能解决你的问题,请参考以下文章