在apache spark scala中排序和排名?

Posted

技术标签:

【中文标题】在apache spark scala中排序和排名?【英文标题】:Sorting and ranking in apache spark scala? 【发布时间】:2016-04-28 18:28:20 【问题描述】:

我想在spark中做排名,如下:

输入:

5.6
5.6
5.6
6.2
8.1
5.5
5.5

排名:

1
1
1
2
3
0
0
0

输出:

Rank Input 
0     5.5
0     5.5
1     5.6
1     5.6
1     5.6
2     6.2
3     8.1

我想知道如何在 spark 中对它们进行排序,并获得与上面列出的相同的排名。要求是:

    排名从 0 而非 1 开始 这是一个包含数百万条记录的示例案例,其中一个分区可能非常大 - 我很欣赏有关如何使用内部排序方法进行排名的建议

我想在 scala 中执行此操作。有人可以帮我写代码吗?

【问题讨论】:

您期望有多少不同的分数?数千,数百万? 是否也传递了排名,或者您是否期望排名来自输入的排序?关于将排名应用于索引的评论让我不清楚 @AlbertoBonsanto 所以有多个案例,排名全部或仅排名前 10 或 20。我需要支持所有案例。所以答案是数百万。 如果数百万,那么您必须使用sortBy :( @Bryce 抱歉。并不是说排名到索引,我的意思是说,输入值应该按排名排序,而不是像我们的例子中那样按顺序索引,它可以是 0,1,2 ...7。而是 0,0。必须支持极端情况,当两个输入冗余时,莫尔条纹不止一次会得到相同的排名。 【参考方案1】:

如果您希望只有 一些 排名,您可以首先获取所有 distinct 值,将它们收集为 List 并将它们转换为 BroadCast。下面,我展示了一个肮脏的例子,注意它不能保证输出会被排序(可能有更好的方法,但这是我想到的第一件事):

// Case 1. k is small (fits in the driver and nodes)
val rdd = sc.parallelize(List(1,1,44,4,1,33,44,1,2))
val distincts = rdd.distinct.collect.sortBy(x => x)
val broadcast = sc.broadcast(distincts)

val sdd = rdd.map
  case i: Int => (broadcast.value.asInstanceOf[Array[Int]].indexOf(i), i)


sdd.collect()

// Array[(Int, Int)] = Array((0,1), (0,1), (4,44), (2,4), (0,1), (3,33), (4,44), (0,1), (1,2))

在第二种方法中,我使用 Spark 的功能进行排序,在 RDD's documentation 中,您可以找到 zipWithIndexkeyBy 的工作原理。

//case 2. k is big, distinct values don't fit in the Driver.
val rdd = sc.parallelize(List(1,1,44,4,1,33,44,1,2))
val distincts = rdd.distinct.sortBy(x => x).zipWithIndex
rdd.keyBy(x => x)
  .join(distincts.keyBy(_._1))
  .map
    case (value: Int, (v1: Int, (v2: Int, index: Long))) => (index, value)
  .collect()

//res15: Array[(Long, Int)] = Array((3,33), (2,4), (0,1), (0,1), (0,1), (0,1), (4,44), (4,44), (1,2))

顺便说一句,我使用collect 只是为了可视化,在实际应用中你不应该使用它,除非你确定它适合驱动程序的记忆。

【讨论】:

非常感谢。它提供了预期的结果。因为我更关心性能,所以我想了解排序如何发生的内部结构。如果一个键有 100k 条记录,那么分区会很大,所以想知道 sortby 是唯一的选择还是关于使用库的任何建议。我在使用 Numpy 的 python 中使用了同样的方法,它的排序非常好。看起来类似。 问题是,如果 k 很大,spark 执行排序的方式是在分区之间移动多个值,这确实是低效的;但我会将其添加为案例 2。 目前我正在以未排序的方式获得结果。对于案例 2,我可以根据排名以排序格式获取它吗?比如 (0,1),(0,1),(0,1),(1,2), ....等等? 好吧,正如我告诉你的那样,不能保证结果是排序的,在你使用收集之后会更进一步,但我保证每个元素都有自己的排名。让我再告诉你一些事情,你看起来非常担心RDDs 上的记录位置,通常当你使用 Spark 时这不应该是一个问题(有特殊情况,如 windows 等)。如果您对答案不满意,请随时取消标记。 我对答案很满意。我只是想了解这里的情况。谢谢你的解释。欣赏它

以上是关于在apache spark scala中排序和排名?的主要内容,如果未能解决你的问题,请参考以下文章

如何使用 Scala 在 Spark 中进行滑动窗口排名?

在 Bash 脚本中执行 Apache Spark (Scala) 代码

利用Scala进行自定义排序的几种方法

使用 Spark 和 Scala 进行字数统计

数据框:如何在 Scala 中分组/计数然后按计数排序

在 spark scala 中对数据框的每一列进行排序