Apache Spark - 注册 UDF - 返回数据帧
Posted
技术标签:
【中文标题】Apache Spark - 注册 UDF - 返回数据帧【英文标题】:Apache Spark - registering a UDF - returning dataframe 【发布时间】:2016-12-20 10:24:47 【问题描述】:我有一个返回数据框的 UDF。类似下面的那个
scala> predict_churn(Vectors.dense(2.0,1.0,0.0,3.0,4.0,4.0,0.0,4.0,5.0,2.0))
res3: org.apache.spark.sql.DataFrame = [noprob: string, yesprob: string, pred: string]
scala> predict_churn(Vectors.dense(2.0,1.0,0.0,3.0,4.0,4.0,0.0,4.0,5.0,2.0)).show
+------------------+------------------+----+
| noprob| yesprob|pred|
+------------------+------------------+----+
|0.3619977592578127|0.6380022407421874| 1.0|
+------------------+------------------+----+
但是,当我尝试使用命令将其注册为 UDF 时
hiveContext.udf.register("predict_churn", outerpredict _)
我收到类似的错误
java.lang.UnsupportedOperationException: Schema for type org.apache.spark.sql.DataFrame is not supported
at org.apache.spark.sql.catalyst.ScalaReflection$class.schemaFor(ScalaReflection.scala:715)
不支持返回数据帧。我正在使用 Spark 1.6.1 和 Scala 2.10。如果这不受支持,我该如何将多个列返回给外部程序。
谢谢
巴拉
【问题讨论】:
【参考方案1】:不支持返回数据帧
正确 - 您不能从 UDF 返回 DataFrame。 UDF 应返回可转换为支持的列类型的类型:
基元(Int、String、Boolean、...) 元组其他支持的类型 列表、数组、地图其他支持的类型 其他支持类型的案例类在您的情况下,您可以使用案例类:
case class Record(noprob: Double, yesprob: Double, pred: Double)
让您的 UDF (predict_churn
) 返回 Record
。
然后,当应用于单个记录(如 UDF 一样)时,此案例类将转换为以其成员命名的列(并具有正确的类型),从而产生一个类似于您的函数当前返回的 DataFrame。
【讨论】:
感谢您的回复。我尝试了您提出的解决方案。这就是我所做的 我的案例类如下case class Prob(noprob: String, yesprob: String, pred: String)
在函数val op = result.map(p => Prob(p(0).toString, p(1).toString,p(2).toString)) op // returning op as the output
中即使在此之后我得到一个非常相似的错误** scala> hiveContext.udf.register("predict_churn", outerpredict _) java.lang。 UnsupportedOperationException:不支持 org.apache.spark.rdd.RDD[Prob] 类型的架构 ** 请问我做错了什么..
您修改后的 UDF 现在是否返回 RDD?这不是我的意思,它应该只返回一个 Record
很抱歉我的语法不正确。如何将scala> result res13: org.apache.spark.sql.DataFrame = [noprob: string, yesprob: string, pred: string]
转换为case class Record(noprob: String, yesprob: String, pred: String)
类型的案例类谢谢
您不能/不应该将 DataFrame 转换为 Record.. 我认为您误解了 UDF 的使用 - UDF 是应用于 单行 的函数(或其列的子集)在 DataFrame 中,返回一个值,然后将其转换为不同的 Row。 UDF 应用于 DataFrame 中的每条记录(例如,df.select(myUdf($"col1"))
)以生成新的 DataFrame,但 UDF 本身在单个记录级别工作。以上是关于Apache Spark - 注册 UDF - 返回数据帧的主要内容,如果未能解决你的问题,请参考以下文章
如何在spark shell中注册Java SPark UDF?