使用 callUDF 创建链接 UDF 调用的方法

Posted

技术标签:

【中文标题】使用 callUDF 创建链接 UDF 调用的方法【英文标题】:Using callUDF to create a method that chains UDF calls 【发布时间】:2017-02-26 00:42:38 【问题描述】:

我猴子修补了org.apache.spark.sql.Column 类以添加chainUDF 方法。它适用于不带参数的 udf,我需要帮助使其对带参数的 udf 通用。

这是当前的chainUDF 方法定义。

object ColumnExt 

  implicit class ColumnMethods(c: Column) 

    def chainUDF(udfName: String): Column = 
      callUDF(udfName, c)
    

  


这是chainUDF 方法的实际应用。

def appendZ(s: String): String = 
  s"$sZ"


spark.udf.register("appendZUdf", appendZ _)

def prependA(s: String): String = 
  s"A$s"


spark.udf.register("prependAUdf", prependA _)

val hobbiesDf = Seq(
  ("dance"),
  ("sing")
).toDF("word")

val actualDf = hobbiesDf.withColumn(
  "fun",
  col("word").chainUDF("appendZUdf").chainUDF("prependAUdf")
)

我想更新chainUDF 方法定义,使其采用Column 参数的可选列表。像这样的:

def appendWord(s: String, word: String): String = 
  s"$s$word"


spark.udf.register("appendWordUdf", appendWord _)

val hobbiesDf = Seq(
  ("dance"),
  ("sing")
).toDF("word")

val actualDf = hobbiesDf.withColumn(
  "fun",
  col("word").chainUDF("appendZUdf").chainUDF("appendWordUdf", lit("cool"))
)

我认为我们需要将 chainUDF 方法定义更新为如下内容:

object ColumnExt 

  implicit class ColumnMethods(c: Column) 

    def chainUDF(udfName: String, cols: Column* = some_default_value): Column = 
      callUDF(udfName, c + cols)
    

  


我确信有一些 Scala 魔术可以实现这一点。

【问题讨论】:

【参考方案1】:

签名是:

def callUDF(udfName: String, cols: Column*): Column

所以你不需要魔法:

def chainUDF(udfName: String, cols: Column* = some_default_value): Column = 
  callUDF(udfName, c +: cols: _*)

【讨论】:

如果= some_default_value 被删除,您的答案有效。如果你能添加一些关于c +: cols: _*) 工作原理的描述,我会很棒。谢谢!

以上是关于使用 callUDF 创建链接 UDF 调用的方法的主要内容,如果未能解决你的问题,请参考以下文章

java,如何在spark 1.4.1中调用UDF [重复]

AnalysisException callUDF() inside withColumn()

使用 withColumn 和 callUDF 将列附加到数据框

在创建表期间调用 UDF

SparkSQL - 与多重比较相比,isIn()的性能

在 Spark SQL DataFrame 中的 UDF 方法中调用其他方法/变量