在 Spark 数据帧 udf 中,像 struct(col1,col2) 这样的函数参数的类型是啥?
Posted
技术标签:
【中文标题】在 Spark 数据帧 udf 中,像 struct(col1,col2) 这样的函数参数的类型是啥?【英文标题】:In Spark dataframe udf, what is the type of function parameters which like struct(col1,col2)?在 Spark 数据帧 udf 中,像 struct(col1,col2) 这样的函数参数的类型是什么? 【发布时间】:2018-03-10 08:49:36 【问题描述】:背景:
我有一个包含三列的数据框:id, x, y
。 x,y 是双倍的。
struct (col("x"),col("y"))
获取坐标列。
然后groupBy(col("id"))
和agg(collect_list(col("coordinate")))
所以现在 df 只有两列:id ,coordinate
。
我认为坐标的数据类型是collection.mutable.WrappedArray[(Double,Double)]
。
所以我把它传给了udf。但是,数据类型是错误的。运行代码时出现错误。我不知道为什么。 struct(col1,col2) 的真正数据类型是什么?或者有没有其他方法可以轻松得到正确答案?
这是代码:
def getMedianPoint = udf((array1: collection.mutable.WrappedArray[(Double,Double)]) =>
var l = (array1.length/2)
var c = array1(l)
val x = c._1.asInstanceOf[Double]
val y = c._2.asInstanceOf[Double]
(x,y)
)
df.withColumn("coordinate",struct(col("x"),col("y")))
.groupBy(col("id"))
.agg(collect_list("coordinate").as("coordinate")
.withColumn("median",getMedianPoint(col("coordinate")))
非常感谢!
【问题讨论】:
所以数据类型应该是 Seq[Row] 基于 Ramesh Maharjan 的回答。 【参考方案1】:我认为坐标的数据类型是collection.mutable.WrappedArray[(Double,Double)]
是的,你说的完全正确。 您在 udf 函数中定义为 dataTypes 的内容以及您作为参数传递的内容也是正确的。但是主要问题是结构列的键名。因为你一定有以下问题
由于数据类型不匹配,无法解析 'UDF(coordinate)':参数 1 需要 array> 类型,但是,'
coordinate
' 属于 array> 类型。;;
只需使用alias
将结构键重命名为
df.withColumn("coordinate",struct(col("x").as("_1"),col("y").as("_2")))
.groupBy(col("id"))
.agg(collect_list("coordinate").as("coordinate"))
.withColumn("median",getMedianPoint(col("coordinate")))
以便键名匹配。
但是
这将引发另一个问题
var c = array1(l)
原因:java.lang.ClassCastException:org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema 无法转换为 scala.Tuple2
所以我建议你把udf
函数改成
import org.apache.spark.sql.functions._
def getMedianPoint = udf((array1: Seq[Row]) =>
var l = (array1.length/2)
(array1(l)(0).asInstanceOf[Double], array1(l)(1).asInstanceOf[Double])
)
这样您甚至都不需要使用alias
。所以完整的解决方案是
import org.apache.spark.sql.functions._
def getMedianPoint = udf((array1: Seq[Row]) =>
var l = (array1.length/2)
(array1(l)(0).asInstanceOf[Double], array1(l)(1).asInstanceOf[Double])
)
df.withColumn("coordinate",struct(col("x"),col("y")))
.groupBy(col("id"))
.agg(collect_list("coordinate").as("coordinate"))
.withColumn("median",getMedianPoint(col("coordinate")))
.show(false)
希望回答对你有帮助
【讨论】:
是的,我试过了。但是它抛出了一个新异常Caused by: java.lang.ClassCastException: org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema cannot be cast to scala.Tuple2 如果答案对您有帮助,请考虑接受它:) 对不起,对不起。只是有点忙。它非常有用。非常感谢!以上是关于在 Spark 数据帧 udf 中,像 struct(col1,col2) 这样的函数参数的类型是啥?的主要内容,如果未能解决你的问题,请参考以下文章
我可以将 spark 数据帧作为参数发送给 pandas UDF
如何从 Spark 数据帧中的 When 子句将多个列发送到 udf?
将 Python UDF 应用于 Spark 数据帧时出现 java.lang.IllegalArgumentException