按日期排序 Spark DataFrame 列的数组

Posted

技术标签:

【中文标题】按日期排序 Spark DataFrame 列的数组【英文标题】:Sort by date an Array of a Spark DataFrame Column 【发布时间】:2016-11-14 11:10:08 【问题描述】:

我的 DataFrame 格式如下:

+---+------------------------------------------------------+
|Id |DateInfos                                             |
+---+------------------------------------------------------+
|B  |[[3, 19/06/2012-02.42.01], [4, 17/06/2012-18.22.21]]  |
|A  |[[1, 15/06/2012-18.22.16], [2, 15/06/2012-09.22.35]]  |
|C  |[[5, 14/06/2012-05.20.01]]                            |
+---+------------------------------------------------------+

我想用我的数组的第二个元素中的时间戳按日期对 DateInfos 列的每个元素进行排序

+---+------------------------------------------------------+
|Id |DateInfos                                             |
+---+------------------------------------------------------+
|B  |[[4, 17/06/2012-18.22.21], [3, 19/06/2012-02.42.01]]  |
|A  |[[2, 15/06/2012-09.22.35], [1, 15/06/2012-18.22.16]]  |
|C  |[[5, 14/06/2012-05.20.01]]                            |
+---+------------------------------------------------------+

我的 DataFrame 的架构打印如下:

root
 |-- C1: string (nullable = true)
 |-- C2: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- _1: integer (nullable = false)
 |    |    |-- _2: string (nullable = false)

我假设我必须创建一个使用具有以下签名的函数的 udf:

def sort_by_date(mouvements : Array[Any]) : Array[Any]

你有什么想法吗?

【问题讨论】:

【参考方案1】:

这确实有点棘手 - 因为虽然 UDF 的输入和输出类型看起来相同,但我们不能真正这样定义它 - 因为输入实际上是 mutable.WrappedArray[Row] 而输出不能使用 Row 或否则 Spark 将无法将其解码行...

所以我们定义了一个接受mutable.WrappedArray[Row] 并返回Array[(Int, String)] 的UDF:

val sortDates = udf  arr: mutable.WrappedArray[Row] =>
  arr.map  case Row(i: Int, s: String) => (i, s) .sortBy(_._2)


val result = input.select($"Id", sortDates($"DateInfos") as "DateInfos")

result.show(truncate = false)
// +---+--------------------------------------------------+
// |Id |DateInfos                                         |
// +---+--------------------------------------------------+
// |B  |[[4,17/06/2012-18.22.21], [3,19/06/2012-02.42.01]]|
// |A  |[[2,15/06/2012-09.22.35], [1,15/06/2012-18.22.16]]|
// |C  |[[5,14/06/2012-05.20.01]]                         |
// +---+--------------------------------------------------+

【讨论】:

添加一个像这样arr.map case Row(i: Int, s: String) => (i, s) .sortBy(-_._2)这样的符号来反转排序方向

以上是关于按日期排序 Spark DataFrame 列的数组的主要内容,如果未能解决你的问题,请参考以下文章

如何按 Seq[org.apache.spark.sql.Column] 降序排序 spark DataFrame?

求教: Spark的dataframe 怎么改列的名字,比如列名 SUM(_c1) 改成c1

按日期对 Pandas DataFrame 进行分组

Spark:保存按“虚拟”列分区的 DataFrame

Spark Scala:在使用 spark 按不同日期排序后,需要获取具有 NULL 日期的记录

Pandas DataFrame 按分类列排序,但按特定类排序