如何在 Spark SQL 中将额外参数传递给 UDF?
Posted
技术标签:
【中文标题】如何在 Spark SQL 中将额外参数传递给 UDF?【英文标题】:How can I pass extra parameters to UDFs in Spark SQL? 【发布时间】:2016-02-22 05:47:53 【问题描述】:我想解析DataFrame
中的日期列,对于每个日期列,日期的分辨率可能会改变(即 2011/01/10 => 2011 /01,如果分辨率设置为“月” )。
我写了以下代码:
def convertDataFrame(dataframe: DataFrame, schema : Array[FieldDataType], resolution: Array[DateResolutionType]) : DataFrame =
import org.apache.spark.sql.functions._
val convertDateFunc = udf(x:String, resolution: DateResolutionType) => SparkDateTimeConverter.convertDate(x, resolution)
val convertDateTimeFunc = udf(x:String, resolution: DateResolutionType) => SparkDateTimeConverter.convertDateTime(x, resolution)
val allColNames = dataframe.columns
val allCols = allColNames.map(name => dataframe.col(name))
val mappedCols =
for(i <- allCols.indices) yield
schema(i) match
case FieldDataType.Date => convertDateFunc(allCols(i), resolution(i)))
case FieldDataType.DateTime => convertDateTimeFunc(allCols(i), resolution(i))
case _ => allCols(i)
dataframe.select(mappedCols:_*)
但是它不起作用。看来我只能将Column
s 传递给UDF。我想知道如果我将DataFrame
转换为RDD
并在每一行上应用该函数会不会很慢。
有人知道正确的解决方案吗?谢谢!
【问题讨论】:
【参考方案1】:只需使用一点柯里化:
def convertDateFunc(resolution: DateResolutionType) = udf((x:String) =>
SparkDateTimeConverter.convertDate(x, resolution))
并按如下方式使用:
case FieldDataType.Date => convertDateFunc(resolution(i))(allCols(i))
另外,您应该看看sql.functions.trunc
和sql.functions.date_format
。这些应该至少是工作的一部分,根本不使用 UDF。
注意:
在 Spark 2.2 或更高版本中,您可以使用typedLit
函数:
import org.apache.spark.sql.functions.typedLit
支持更广泛的文字,例如 Seq
或 Map
。
【讨论】:
感谢您的回答和柯里化的直觉! 我写了一篇关于如何使用柯里化创建在调用时接受额外参数的 Spark UDF 的教程。 gist.github.com/andrearota/5910b5c5ac65845f23856b2415474c38 太棒了,对 Spark 的深入了解。 是否可以使用spark.udf.register
注册currying UDF 以使其sql 可用?
有人把这个放在文档中..!【参考方案2】:
您可以使用org.apache.spark.sql.functions
中定义的lit(...)
函数创建文字Column
以传递给udf
例如:
val takeRight = udf((s: String, i: Int) => s.takeRight(i))
df.select(takeRight($"stringCol", lit(1)))
【讨论】:
谢谢,我一开始也用lit
,结果发现性能不如其他答案...以上是关于如何在 Spark SQL 中将额外参数传递给 UDF?的主要内容,如果未能解决你的问题,请参考以下文章
如何在 Javascript .filter() 方法中将额外参数传递给回调函数?
如何将一些额外的字符串参数传递给每行的 spark udf?