如何使用scala对spark中rdd的每一行进行排序?

Posted

技术标签:

【中文标题】如何使用scala对spark中rdd的每一行进行排序?【英文标题】:How to sort each line of a rdd in spark using scala? 【发布时间】:2018-09-27 12:35:03 【问题描述】:

我的文本文件有以下数据:

10,14,16,19,52
08,09,12,20,45
55,56,70,78,53

我想按降序对每一行进行排序。我试过下面的代码

val file = sc.textFile("Maximum values").map(x=>x.split(","))
val sorted = file.sortBy(x=> -x(2).toInt)
sorted.collect()

我得到了下面的输出

[[55, 56, 70, 78, 53], [10, 14, 16, 19, 52], [08, 09, 12, 20, 45]]

上面的结果表明整个列表已经按降序排序了。但是我希望按降序对每个值进行排序

例如

[10,14,16,19,52],[08,09,12,20,45],[55,56,70,78,53]

应该是

[52,19,16,14,10],[45,20,12,09,08],[78,70,56,55,53]

请抽空回答这个问题。提前致谢。

【问题讨论】:

【参考方案1】:

检查一下。

val file = spark.sparkContext.textFile("in/sort.dat").map( x=>  val y = x.split(','); y.sorted.reverse.mkString(",")   )
file.collect.foreach(println)

EDIT1: 不同的方法如何应用于上述代码。

scala> val a = "10,14,16,19,52"
a: String = 10,14,16,19,52

scala> val b = a.split(',')
b: Array[String] = Array(10, 14, 16, 19, 52)

scala> b.sorted
res0: Array[String] = Array(10, 14, 16, 19, 52)

scala> b.sorted.reverse
res1: Array[String] = Array(52, 19, 16, 14, 10)

scala> b.sorted.reverse.mkString(",")
res2: String = 52,19,16,14,10

scala> b.sorted.reverse.mkString("*")
res3: String = 52*19*16*14*10

scala>

EDIT2:

val file = spark.sparkContext.textFile("in/sort.dat").map( x=>  val y = x.split(',').map(_.toInt); y.sorted.reverse.mkString(",")   )
file.collect.foreach(println)

【讨论】:

我是 spark 和 scala 的初学者,如果你能解释一下变量 'y' 上分隔符 "," 的用法,我会非常高兴。即 y.sorted.reverse.mkString(",") 'y' 将是字符串数组。当您使用“排序”进行排序时,它会按字母顺序排列,因此您会在数组中从最小到最大。因此,使用“reverse”方法反转数组,mkString 只是使用您在此处指定的逗号分隔符连接所有数组项。我在答案中添加了“EDIT1”以在 REPL 中显示结果。 如果你有像 "5,18,26,72,61" 这样的行,那么它将排序为 "72,61,5,26,18" .. 所以对于整数排序,那么你拆分后必须将它们转换为整数。看我的 EDIT2【参考方案2】:

这是一种方法(未经测试)

val reverseStringOrdering = Ordering[String].reverse
val file = sc.textFile("Maximum values").map(x=>x.split(",").sorted(reverseStringOrdering))
val sorted = file.sortBy(r => r, ascending = false)
sorted.collect()

【讨论】:

非常感谢。但是sortBy函数需要定义隐式排序。所以我刚刚添加了它,完美的代码如下所示。 val reverseStringOrdering = Ordering[String].reverse val file = sc.textFile("/user/rahimenzo4891/Datasets/Maximum values").map(x=>x.split(",").sorted(reverseStringOrdering)) val sorted = file.sortBy(r => r(1),ascending = true) sorted.collect() 如果您按元素 1 排序,即 r(1),则不能保证您的列表按正确的顺序排序。【参考方案3】:

Spark SQL 方式,

import org.apache.spark.sql.functions._
val df = Seq(
 ("10","14","16","19","52"),
 ("08","09","12","20","45"),
 ("55","56","70","78","53")).toDF("C1", "C2","C3","C4","C5")

 df.withColumn("sortedCol", sort_array(array("C1", "C2","C3","C4","C5"), false))
  .select("sortedCol")     
  .show()

输出

+--------------------+
|           sortedCol|
+--------------------+
|[52, 19, 16, 14, 10]|
|[45, 20, 12, 09, 08]|
|[78, 70, 56, 55, 53]|
+--------------------+

【讨论】:

以上是关于如何使用scala对spark中rdd的每一行进行排序?的主要内容,如果未能解决你的问题,请参考以下文章

将 RDD 转换为 Dataframe Spark

如何在 Scala/Spark 中为数据框中的每一行编写一个 Json 文件并重命名文件

在 spark scala 中对数据框的每一列进行排序

如何在 Scala Spark 中对 RDD 进行排序?

将RDD的每一行中的键值对列表转换为每行中的单个键值

如何对 spark scala RDD 中的元组列表/数组执行转换?