Spark UDF 到自定义排序结构数组

Posted

技术标签:

【中文标题】Spark UDF 到自定义排序结构数组【英文标题】:Spark UDF to custom sort array of structs 【发布时间】:2020-01-24 18:35:24 【问题描述】:

我正在尝试使用 UDF 根据我定义的自定义排序对结构数组进行排序。

这是我希望获得的结果类型的示例:

input_tbl
+-------+-------+------+
| id1   | id2   | num  |
+-------+-------+------+
|   1   |   2   |  1   |
|   1   |   3   | -3   |
|   1   |   4   |  2   |
+-------+-------+------+

output_tbl
+-------+-------+------+
| id1   | id2   | num  |
+-------+-------+------+
|   1   |   3   | -3   |
+-------+-------+------+

案例类和UDF的一些示例代码如下所示。

case class Score(id: String, num: Int) extends Ordered[Score] 

  def compare(that: Score): Int = 
    abs(this.num-that.num)
  


val toScoreType : UserDefinedFunction = udf((id: String, num: Int) => 
    Score(id, num)
)

val sortScoreList: UserDefinedFunction = udf((score_list: Array[Score]) => 
    score_list.sorted
)

我将 sortScore UDF 调用如下:

val temp = input_tbl
    .select('id1, toScoreType('id2, 'num).as("score"))
    .groupBy('id1)
    .agg((collect_set('score)).as("score_list"))


temp.select('id1, sortScoreList('score_list).as("result"))

但是,我收到“java.lang.ClassCastException: scala.collection.mutable.WrappedArray$ofRef”错误。

有人对可能导致问题的原因有任何想法吗?

【问题讨论】:

【参考方案1】:

Spark 无法将记录(结构)映射到案例类作为 UDF 的输入。实际上,您的函数 toScoreType 不会转换为案例类(检查数据架构!),在内部它只是一个结构(即 Row)。

您应该重写代码以使用单个 UDF:

val sortScoreList: UserDefinedFunction = udf((score_list: Seq[Row]) => 
  score_list.mapcase Row(id:String,num:Int) => Score(id,num).sorted
)


val temp = input_tbl
  .groupBy('id1)
  .agg((collect_set(struct('id2,'num))).as("score_list"))

temp.select('id1, sortScoreList('score_list).as("result")).show()

但这不会给出预期的结果:

+---+--------------------+
|id1|              result|
+---+--------------------+
|  1|[[2, 1], [3, -3],...|
+---+--------------------+

如果您想要一条记录,您的 UDF 应该返回 1 个案例类,例如:

val sortScoreList: UserDefinedFunction = udf((score_list: Seq[Row]) => 
  score_list.mapcase Row(id:String,num:Int) => Score(id,num).sorted.head
)

然后将您的结构转换为列:

temp.select('id1, sortScoreList('score_list).as("result"))
  .select($"id1",$"result.*")
  .show()

编辑:

为了得到你想要的结果,我会这样做:

case class Score(id: String, num: Int)

val sortScoreList: UserDefinedFunction = udf((score_list: Seq[Row]) => 
      score_list.mapcase Row(id:String,num:Int) => Score(id,num).minBy(_.num)
 )


temp.select('id1, sortScoreList('score_list).as("result"))
  .select($"id1",$"result.*")
  .show()

+---+---+---+
|id1| id|num|
+---+---+---+
|  1|  3| -3|
+---+---+---+

【讨论】:

@mk2020 添加了获得所需结果的建议

以上是关于Spark UDF 到自定义排序结构数组的主要内容,如果未能解决你的问题,请参考以下文章

从最简单的vector中sort用法到自定义比较函数comp后对结构体排序的sort算法

C 语言二级指针内存模型 ( 指针数组 | 二维数组 | 自定义二级指针 | 将 一二 模型数据拷贝到 三 模型中 并 排序 )

spark中的分区和自定义分区器中的重新分区和排序给出数组越界异常

MySQL自定义函数使用数组和排序可能吗?

spark使用啥函数来拆分排序

Spark- 自定义排序