如何在 Scala Spark 中对 RDD 进行排序?
Posted
技术标签:
【中文标题】如何在 Scala Spark 中对 RDD 进行排序?【英文标题】:How to sort an RDD in Scala Spark? 【发布时间】:2014-07-13 08:48:59 【问题描述】:阅读 Spark 方法 sortByKey :
sortByKey([ascending], [numTasks]) When called on a dataset of (K, V) pairs where K implements Ordered, returns a dataset of (K, V) pairs sorted by keys in ascending or descending order, as specified in the boolean ascending argument.
是否可以只返回“N”个结果。所以不是返回所有结果,而是返回前 10 个。我可以将排序后的集合转换为数组并使用 take
方法,但由于这是一个 O(N) 操作,有没有更有效的方法?
【问题讨论】:
所以你知道怎么排序了,你问的是怎么取前N个。我可以建议编辑问题摘要吗? 【参考方案1】:您很可能已经阅读过源代码:
class OrderedRDDFunctions
// <snip>
def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.size): RDD[P] =
val part = new RangePartitioner(numPartitions, self, ascending)
val shuffled = new ShuffledRDD[K, V, P](self, part)
shuffled.mapPartitions(iter =>
val buf = iter.toArray
if (ascending)
buf.sortWith((x, y) => x._1 < y._1).iterator
else
buf.sortWith((x, y) => x._1 > y._1).iterator
, preservesPartitioning = true)
而且,正如您所说,整个数据必须经过洗牌阶段 - 如 sn-p 所示。
但是,您对随后调用 take(K) 的担忧可能并不那么准确。此操作不会循环遍历所有 N 项:
/**
* Take the first num elements of the RDD. It works by first scanning one partition, and use the
* results from that partition to estimate the number of additional partitions needed to satisfy
* the limit.
*/
def take(num: Int): Array[T] =
那么,看来:
O(myRdd.take(K))
【讨论】:
sortByKey() 因为其他 RDD 转换需要进行惰性求值。 sortByKey.take(k) 会优化为 takeOrdered(k, func) 还是 take(k).sortByKey?认为这是懒惰评估的重点,因此可以优化物理计划。它可以在 Data Frames 中更好地实现吗? @Ruslan 我不相信这种重新排列/优化目前发生在 Spark 核心中。我只知道在 SQL/催化剂优化器中发生了类似的优化。【参考方案2】:如果您只需要前 10 名,请使用rdd.top(10)
。它避免了排序,因此速度更快。
rdd.top
使数据并行传递,将每个分区中的前 N 个收集到一个堆中,然后合并这些堆。它是一个O(rdd.count)操作。排序将是 O(rdd.count log rdd.count),并且会产生大量数据传输 - 它会进行随机排序,因此所有数据都将通过网络传输。
【讨论】:
我不知道这种方法。这是比 sort() 更好的解决方案。所以这是一个比我的问题更好的答案(尽管它可能提供一些有用的背景)。我赞成。 嗨,我有一个pairRdd
,有没有办法在这个pairRdd
中使用top
方法?例如,top(10) 将为此“pairRdd”中的每个键返回 10 个元素。我真的需要知道这一点。
不,它不是那样工作的。我建议一个单独的问题来按关键字查找前 10 名,因为这是一个更大的话题。
spark.apache.org/docs/latest/api/scala/…【参考方案3】:
另一个选项,至少从 PySpark 1.2.0 开始,是使用takeOrdered。
按升序排列:
rdd.takeOrdered(10)
按降序排列:
rdd.takeOrdered(10, lambda x: -x)
k,v 对的前 k 个值:
rdd.takeOrdered(10, lambda (k, v): -v)
【讨论】:
以上是关于如何在 Scala Spark 中对 RDD 进行排序?的主要内容,如果未能解决你的问题,请参考以下文章
Spark:scala - 如何将集合从 RDD 转换为另一个 RDD
如何知道 Spark 使用 Scala 推断出的 RDD 类型是啥