Apache Spark - 注册 UDF - 返回数据帧

Posted

技术标签:

【中文标题】Apache Spark - 注册 UDF - 返回数据帧【英文标题】:Apache Spark - registering a UDF - returning dataframe 【发布时间】:2016-12-20 10:24:47 【问题描述】:

我有一个返回数据框的 UDF。类似下面的那个

scala> predict_churn(Vectors.dense(2.0,1.0,0.0,3.0,4.0,4.0,0.0,4.0,5.0,2.0))
res3: org.apache.spark.sql.DataFrame = [noprob: string, yesprob: string, pred: string]

scala> predict_churn(Vectors.dense(2.0,1.0,0.0,3.0,4.0,4.0,0.0,4.0,5.0,2.0)).show
+------------------+------------------+----+
|            noprob|           yesprob|pred|
+------------------+------------------+----+
|0.3619977592578127|0.6380022407421874| 1.0|
+------------------+------------------+----+

但是,当我尝试使用命令将其注册为 UDF 时

hiveContext.udf.register("predict_churn", outerpredict _)

我收到类似的错误

java.lang.UnsupportedOperationException: Schema for type org.apache.spark.sql.DataFrame is not supported
            at      org.apache.spark.sql.catalyst.ScalaReflection$class.schemaFor(ScalaReflection.scala:715)

不支持返回数据帧。我正在使用 Spark 1.6.1 和 Scala 2.10。如果这不受支持,我该如何将多个列返回给外部程序。

谢谢

巴拉

【问题讨论】:

【参考方案1】:

不支持返回数据帧

正确 - 您不能从 UDF 返回 DataFrame。 UDF 应返回可转换为支持的列类型的类型:

基元(Int、String、Boolean、...) 元组其他支持的类型 列表、数组、地图其他支持的类型 其他支持类型的案例类

在您的情况下,您可以使用案例类:

case class Record(noprob: Double, yesprob: Double, pred: Double)

让您的 UDF (predict_churn) 返回 Record。 然后,当应用于单个记录(如 UDF 一样)时,此案例类将转换为以其成员命名的列(并具有正确的类型),从而产生一个类似于您的函数当前返回的 DataFrame。

【讨论】:

感谢您的回复。我尝试了您提出的解决方案。这就是我所做的 我的案例类如下case class Prob(noprob: String, yesprob: String, pred: String) 在函数val op = result.map(p => Prob(p(0).toString, p(1).toString,p(2).toString)) op // returning op as the output 中即使在此之后我得到一个非常相似的错误** scala> hiveContext.udf.register("predict_churn", outerpredict _) java.lang。 UnsupportedOperationException:不支持 org.apache.spark.rdd.RDD[Prob] 类型的架构 ** 请问我做错了什么.. 您修改后的 UDF 现在是否返回 RDD?这不是我的意思,它应该只返回一个 Record 很抱歉我的语法不正确。如何将scala> result res13: org.apache.spark.sql.DataFrame = [noprob: string, yesprob: string, pred: string] 转换为case class Record(noprob: String, yesprob: String, pred: String) 类型的案例类谢谢 您不能/不应该将 DataFrame 转换为 Record.. 我认为您误解了 UDF 的使用 - UDF 是应用于 单行 的函数(或其列的子集)在 DataFrame 中,返回一个值,然后将其转换为不同的 Row。 UDF 应用于 DataFrame 中的每条记录(例如,df.select(myUdf($"col1")))以生成新的 DataFrame,但 UDF 本身在单个记录级别工作。

以上是关于Apache Spark - 注册 UDF - 返回数据帧的主要内容,如果未能解决你的问题,请参考以下文章

如何在spark shell中注册Java SPark UDF?

Spark SQL UDF示例

在 sparkSession 上注册两个同名 udf

注册匿名类功能

Spark 2.1 UDF 未在 Spark Jar 中注册

Spark 2.1 注册UDF到functionRegistry