如何在 Scala Spark 中对 RDD 进行排序?

Posted

技术标签:

【中文标题】如何在 Scala Spark 中对 RDD 进行排序?【英文标题】:How to sort an RDD in Scala Spark? 【发布时间】:2014-07-13 08:48:59 【问题描述】:

阅读 Spark 方法 sortByKey :

sortByKey([ascending], [numTasks])   When called on a dataset of (K, V) pairs where K implements Ordered, returns a dataset of (K, V) pairs sorted by keys in ascending or descending order, as specified in the boolean ascending argument.

是否可以只返回“N”个结果。所以不是返回所有结果,而是返回前 10 个。我可以将排序后的集合转换为数组并使用 take 方法,但由于这是一个 O(N) 操作,有没有更有效的方法?

【问题讨论】:

所以你知道怎么排序了,你问的是怎么取前N个。我可以建议编辑问题摘要吗? 【参考方案1】:

您很可能已经阅读过源代码:

  class OrderedRDDFunctions 
   // <snip>
  def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.size): RDD[P] = 
    val part = new RangePartitioner(numPartitions, self, ascending)
    val shuffled = new ShuffledRDD[K, V, P](self, part)
    shuffled.mapPartitions(iter => 
      val buf = iter.toArray
      if (ascending) 
        buf.sortWith((x, y) => x._1 < y._1).iterator
       else 
        buf.sortWith((x, y) => x._1 > y._1).iterator
      
    , preservesPartitioning = true)
  

而且,正如您所说,整个数据必须经过洗牌阶段 - 如 sn-p 所示。

但是,您对随后调用 take(K) 的担忧可能并不那么准确。此操作不会循环遍历所有 N 项:

  /**
   * Take the first num elements of the RDD. It works by first scanning one partition, and use the
   * results from that partition to estimate the number of additional partitions needed to satisfy
   * the limit.
   */
  def take(num: Int): Array[T] = 

那么,看来:

O(myRdd.take(K))

【讨论】:

sortByKey() 因为其他 RDD 转换需要进行惰性求值。 sortByKey.take(k) 会优化为 takeOrdered(k, func) 还是 take(k).sortByKey?认为这是懒惰评估的重点,因此可以优化物理计划。它可以在 Data Frames 中更好地实现吗? @Ruslan 我不相信这种重新排列/优化目前发生在 Spark 核心中。我只知道在 SQL/催化剂优化器中发生了类似的优化。【参考方案2】:

如果您只需要前 10 名,请使用rdd.top(10)。它避免了排序,因此速度更快。

rdd.top 使数据并行传递,将每个分区中的前 N ​​个收集到一个堆中,然后合并这些堆。它一个O(rdd.count)操作。排序将是 O(rdd.count log rdd.count),并且会产生大量数据传输 - 它会进行随机排序,因此所有数据都将通过网络传输。

【讨论】:

我不知道这种方法。这是比 sort() 更好的解决方案。所以这是一个比我的问题更好的答案(尽管它可能提供一些有用的背景)。我赞成。 嗨,我有一个pairRdd,有没有办法在这个pairRdd 中使用top 方法?例如,top(10) 将为此“pairRdd”中的每个键返回 10 个元素。我真的需要知道这一点。 不,它不是那样工作的。我建议一个单独的问题来按关键字查找前 10 名,因为这是一个更大的话题。 spark.apache.org/docs/latest/api/scala/…【参考方案3】:

另一个选项,至少从 PySpark 1.2.0 开始,是使用takeOrdered。

按升序排列:

rdd.takeOrdered(10)

按降序排列:

rdd.takeOrdered(10, lambda x: -x)

k,v 对的前 k 个值:

rdd.takeOrdered(10, lambda (k, v): -v)

【讨论】:

以上是关于如何在 Scala Spark 中对 RDD 进行排序?的主要内容,如果未能解决你的问题,请参考以下文章

Spark:scala - 如何将集合从 RDD 转换为另一个 RDD

在 spark scala 中对数据框的每一列进行排序

如何在 spark scala 中加入 2 rdd

如何知道 Spark 使用 Scala 推断出的 RDD 类型是啥

使用 Scala 在 Apache Spark 中连接不同 RDD 的数据集

Spark程序进行单元测试-使用scala