Spark UDF 到自定义排序结构数组
Posted
技术标签:
【中文标题】Spark UDF 到自定义排序结构数组【英文标题】:Spark UDF to custom sort array of structs 【发布时间】:2020-01-24 18:35:24 【问题描述】:我正在尝试使用 UDF 根据我定义的自定义排序对结构数组进行排序。
这是我希望获得的结果类型的示例:
input_tbl
+-------+-------+------+
| id1 | id2 | num |
+-------+-------+------+
| 1 | 2 | 1 |
| 1 | 3 | -3 |
| 1 | 4 | 2 |
+-------+-------+------+
output_tbl
+-------+-------+------+
| id1 | id2 | num |
+-------+-------+------+
| 1 | 3 | -3 |
+-------+-------+------+
案例类和UDF的一些示例代码如下所示。
case class Score(id: String, num: Int) extends Ordered[Score]
def compare(that: Score): Int =
abs(this.num-that.num)
val toScoreType : UserDefinedFunction = udf((id: String, num: Int) =>
Score(id, num)
)
val sortScoreList: UserDefinedFunction = udf((score_list: Array[Score]) =>
score_list.sorted
)
我将 sortScore UDF 调用如下:
val temp = input_tbl
.select('id1, toScoreType('id2, 'num).as("score"))
.groupBy('id1)
.agg((collect_set('score)).as("score_list"))
temp.select('id1, sortScoreList('score_list).as("result"))
但是,我收到“java.lang.ClassCastException: scala.collection.mutable.WrappedArray$ofRef”错误。
有人对可能导致问题的原因有任何想法吗?
【问题讨论】:
【参考方案1】:Spark 无法将记录(结构)映射到案例类作为 UDF 的输入。实际上,您的函数 toScoreType
不会转换为案例类(检查数据架构!),在内部它只是一个结构(即 Row
)。
您应该重写代码以使用单个 UDF:
val sortScoreList: UserDefinedFunction = udf((score_list: Seq[Row]) =>
score_list.mapcase Row(id:String,num:Int) => Score(id,num).sorted
)
val temp = input_tbl
.groupBy('id1)
.agg((collect_set(struct('id2,'num))).as("score_list"))
temp.select('id1, sortScoreList('score_list).as("result")).show()
但这不会给出预期的结果:
+---+--------------------+
|id1| result|
+---+--------------------+
| 1|[[2, 1], [3, -3],...|
+---+--------------------+
如果您想要一条记录,您的 UDF 应该返回 1 个案例类,例如:
val sortScoreList: UserDefinedFunction = udf((score_list: Seq[Row]) =>
score_list.mapcase Row(id:String,num:Int) => Score(id,num).sorted.head
)
然后将您的结构转换为列:
temp.select('id1, sortScoreList('score_list).as("result"))
.select($"id1",$"result.*")
.show()
编辑:
为了得到你想要的结果,我会这样做:
case class Score(id: String, num: Int)
val sortScoreList: UserDefinedFunction = udf((score_list: Seq[Row]) =>
score_list.mapcase Row(id:String,num:Int) => Score(id,num).minBy(_.num)
)
temp.select('id1, sortScoreList('score_list).as("result"))
.select($"id1",$"result.*")
.show()
+---+---+---+
|id1| id|num|
+---+---+---+
| 1| 3| -3|
+---+---+---+
【讨论】:
@mk2020 添加了获得所需结果的建议以上是关于Spark UDF 到自定义排序结构数组的主要内容,如果未能解决你的问题,请参考以下文章
从最简单的vector中sort用法到自定义比较函数comp后对结构体排序的sort算法
C 语言二级指针内存模型 ( 指针数组 | 二维数组 | 自定义二级指针 | 将 一二 模型数据拷贝到 三 模型中 并 排序 )