Spark UDF 作为函数参数,UDF 不在函数范围内

Posted

技术标签:

【中文标题】Spark UDF 作为函数参数,UDF 不在函数范围内【英文标题】:Spark UDF as function parameter, UDF is not in function scope 【发布时间】:2017-02-08 21:03:13 【问题描述】:

我想将一些 UDF 作为函数参数与数据帧一起传递。

执行此操作的一种方法可能是在函数中创建 UDF,但这会创建和销毁 UDF 的多个实例而不重用它,这可能不是解决此问题的最佳方法。

这是一段示例代码 -

val lkpUDF = udf(i: Int) => if (i > 0) 1 else 0

val df =   inputDF1
    .withColumn("new_col", lkpUDF(col("c1")))
val df2 =   inputDF2.
  .withColumn("new_col", lkpUDF(col("c1")))

理想情况下,我不想做上述事情,而是想做这样的事情 -

val lkpUDF = udf(i: Int) => if (i > 0) 1 else 0

def appendCols(df: DataFrame, lkpUDF: ?): DataFrame = 

    df
      .withColumn("new_col", lkpUDF(col("c1")))

  
val df = appendCols(inputDF, lkpUDF)

上面的 UDF 非常简单,但在我的例子中,它可以返回原始类型或用户定义的案例类类型。任何想法/指针将不胜感激。谢谢。

【问题讨论】:

【参考方案1】:

具有适当签名的函数必须是这样的:

import org.apache.spark.sql.UserDefinedFunction

def appendCols(df: DataFrame, func: UserDefinedFunction): DataFrame = 
    df.withColumn("new_col", func(col("col1")))

scala REPL 在返回初始化值的类型方面非常有帮助。

scala> val lkpUDF = udf(i: Int) => if (i > 0) 1 else 0
lkpUDF: org.apache.spark.sql.UserDefinedFunction = UserDefinedFunction(<function1>,IntegerType,List(IntegerType))

此外,如果您传递到 udf 包装器的函数签名包含 Any 返回类型(如果函数可以返回原始类型或用户定义的案例类,则属于这种情况), UDF 将无法编译,并出现如下异常:

java.lang.UnsupportedOperationException: Schema for type Any is not supported

【讨论】:

谢谢 septra。你是对的,我遇到了上述错误。但即使我尝试只返回一个案例类,我似乎也遇到了这个错误。 ***.com/questions/42121649/…

以上是关于Spark UDF 作为函数参数,UDF 不在函数范围内的主要内容,如果未能解决你的问题,请参考以下文章

如何将 Set/HashSet 作为参数传递给 Spark 中的 UDF?

Spark UDF 函数怎么实现参数数量变化?

我可以将 spark 数据帧作为参数发送给 pandas UDF

spark自定义UDF为啥参数最多21个

在Spark中如何使用UDO作为参数调用UDF以避免二进制错误

Spark 2.1 注册UDF到functionRegistry