如何在 Spark SQL 中将额外参数传递给 UDF?

Posted

技术标签:

【中文标题】如何在 Spark SQL 中将额外参数传递给 UDF?【英文标题】:How can I pass extra parameters to UDFs in Spark SQL? 【发布时间】:2016-02-22 05:47:53 【问题描述】:

我想解析DataFrame 中的日期列,对于每个日期列,日期的分辨率可能会改变(即 2011/01/10 => 2011 /01,如果分辨率设置为“月” )。

我写了以下代码:

def convertDataFrame(dataframe: DataFrame, schema : Array[FieldDataType], resolution: Array[DateResolutionType]) : DataFrame =

  import org.apache.spark.sql.functions._
  val convertDateFunc = udf(x:String, resolution: DateResolutionType) => SparkDateTimeConverter.convertDate(x, resolution)
  val convertDateTimeFunc = udf(x:String, resolution: DateResolutionType) => SparkDateTimeConverter.convertDateTime(x, resolution)

  val allColNames = dataframe.columns
  val allCols = allColNames.map(name => dataframe.col(name))

  val mappedCols =
  
    for(i <- allCols.indices) yield
    
      schema(i) match
      
        case FieldDataType.Date => convertDateFunc(allCols(i), resolution(i)))
        case FieldDataType.DateTime => convertDateTimeFunc(allCols(i), resolution(i))
        case _ => allCols(i)
      
    
  

  dataframe.select(mappedCols:_*)


但是它不起作用。看来我只能将Columns 传递给UDF。我想知道如果我将DataFrame转换为RDD并在每一行上应用该函数会不会很慢。

有人知道正确的解决方案吗?谢谢!

【问题讨论】:

【参考方案1】:

只需使用一点柯里化:

def convertDateFunc(resolution: DateResolutionType) = udf((x:String) => 
  SparkDateTimeConverter.convertDate(x, resolution))

并按如下方式使用:

case FieldDataType.Date => convertDateFunc(resolution(i))(allCols(i))

另外,您应该看看sql.functions.truncsql.functions.date_format。这些应该至少是工作的一部分,根本不使用 UDF。

注意

在 Spark 2.2 或更高版本中,您可以使用typedLit 函数:

import org.apache.spark.sql.functions.typedLit

支持更广泛的文字,例如 SeqMap

【讨论】:

感谢您的回答和柯里化的直觉! 我写了一篇关于如何使用柯里化创建在调用时接受额外参数的 Spark UDF 的教程。 gist.github.com/andrearota/5910b5c5ac65845f23856b2415474c38 太棒了,对 Spark 的深入了解。 是否可以使用spark.udf.register 注册currying UDF 以使其sql 可用? 有人把这个放在文档中..!【参考方案2】:

您可以使用org.apache.spark.sql.functions 中定义的lit(...) 函数创建文字Column 以传递给udf

例如:

val takeRight = udf((s: String, i: Int) => s.takeRight(i))
df.select(takeRight($"stringCol", lit(1)))

【讨论】:

谢谢,我一开始也用lit,结果发现性能不如其他答案...

以上是关于如何在 Spark SQL 中将额外参数传递给 UDF?的主要内容,如果未能解决你的问题,请参考以下文章

如何在 Javascript .filter() 方法中将额外参数传递给回调函数?

如何将一些额外的字符串参数传递给每行的 spark udf?

在reactjs中将额外的参数传递给onChange Listener

如何使用 withColumn 将额外的参数传递给 UDF

如何将参数传递给 spark.sql(""" """)?

将额外参数传递给 usort 回调