Spark UDF 作为函数参数,UDF 不在函数范围内
Posted
技术标签:
【中文标题】Spark UDF 作为函数参数,UDF 不在函数范围内【英文标题】:Spark UDF as function parameter, UDF is not in function scope 【发布时间】:2017-02-08 21:03:13 【问题描述】:我想将一些 UDF 作为函数参数与数据帧一起传递。
执行此操作的一种方法可能是在函数中创建 UDF,但这会创建和销毁 UDF 的多个实例而不重用它,这可能不是解决此问题的最佳方法。
这是一段示例代码 -
val lkpUDF = udf(i: Int) => if (i > 0) 1 else 0
val df = inputDF1
.withColumn("new_col", lkpUDF(col("c1")))
val df2 = inputDF2.
.withColumn("new_col", lkpUDF(col("c1")))
理想情况下,我不想做上述事情,而是想做这样的事情 -
val lkpUDF = udf(i: Int) => if (i > 0) 1 else 0
def appendCols(df: DataFrame, lkpUDF: ?): DataFrame =
df
.withColumn("new_col", lkpUDF(col("c1")))
val df = appendCols(inputDF, lkpUDF)
上面的 UDF 非常简单,但在我的例子中,它可以返回原始类型或用户定义的案例类类型。任何想法/指针将不胜感激。谢谢。
【问题讨论】:
【参考方案1】:具有适当签名的函数必须是这样的:
import org.apache.spark.sql.UserDefinedFunction
def appendCols(df: DataFrame, func: UserDefinedFunction): DataFrame =
df.withColumn("new_col", func(col("col1")))
scala REPL 在返回初始化值的类型方面非常有帮助。
scala> val lkpUDF = udf(i: Int) => if (i > 0) 1 else 0
lkpUDF: org.apache.spark.sql.UserDefinedFunction = UserDefinedFunction(<function1>,IntegerType,List(IntegerType))
此外,如果您传递到 udf
包装器的函数签名包含 Any
返回类型(如果函数可以返回原始类型或用户定义的案例类,则属于这种情况), UDF 将无法编译,并出现如下异常:
java.lang.UnsupportedOperationException: Schema for type Any is not supported
【讨论】:
谢谢 septra。你是对的,我遇到了上述错误。但即使我尝试只返回一个案例类,我似乎也遇到了这个错误。 ***.com/questions/42121649/…以上是关于Spark UDF 作为函数参数,UDF 不在函数范围内的主要内容,如果未能解决你的问题,请参考以下文章
如何将 Set/HashSet 作为参数传递给 Spark 中的 UDF?
我可以将 spark 数据帧作为参数发送给 pandas UDF