何时使用 UDF 与 PySpark 中的函数? [复制]

Posted

技术标签:

【中文标题】何时使用 UDF 与 PySpark 中的函数? [复制]【英文标题】:When to use a UDF versus a function in PySpark? [duplicate] 【发布时间】:2019-09-26 18:46:55 【问题描述】:

我将 Spark 与 Databricks 结合使用,并具有以下代码:

def replaceBlanksWithNulls(column):
    return when(col(column) != "", col(column)).otherwise(None)

以下两个语句都有效:

x = rawSmallDf.withColumn("z", replaceBlanksWithNulls("z"))

并使用 UDF:

replaceBlanksWithNulls_Udf = udf(replaceBlanksWithNulls)
y = rawSmallDf.withColumn("z", replaceBlanksWithNulls_Udf("z"))

我不清楚documentation 什么时候应该使用一个而不是另一个,为什么?

【问题讨论】:

【参考方案1】:

UDF 本质上可以是任何类型的函数(当然也有例外) - 不必使用诸如 whencol 等 Spark 结构。通过使用 UDF replaceBlanksWithNulls函数可以写成普通的python代码:

def replaceBlanksWithNulls(s):
    return "" if s != "" else None

注册后可以在数据框列上使用:

replaceBlanksWithNulls_Udf = udf(replaceBlanksWithNulls)
y = rawSmallDf.withColumn("z", replaceBlanksWithNulls_Udf("z"))

注意:UDF 的默认返回类型是字符串。如果需要其他类型,注册时必须指定,例如

from pyspark.sql.types import LongType
squared_udf = udf(squared, LongType())

在这种情况下,列操作并不复杂,并且有 Spark 函数可以实现相同的目标(即问题中的replaceBlanksWithNulls

x = rawSmallDf.withColumn("z", when(col("z") != "", col("z")).otherwise(None))

这是总是首选,因为它允许 Spark 优化查询,参见例如Spark functions vs UDF performance?

【讨论】:

感谢您的解释 - 我发现内联编写 PySpark 代码(更高效)意味着我可以重用代码。举个简单的例子,假设我想扩展函数 replaceBlanksWithNulls 并将 NAN 或另一个值替换为 null - 而不是只在一个地方更改它,我必须找到我使用内联代码的所有地方。所以它可能更有效,但我发现它不适合重用......想法? @Rodney:我建议您在您的问题中继续使用诸如replaceBlanksWithNulls 方法之类的方法。它不是 udf,因为它返回一个可用于一个/多个列的表达式。这种方法的性能相当于“普通”内联代码,同时允许您在一个地方进行扩展。【参考方案2】:

您可以在 Spark SQL 中找到差异(如文档中所述)。例如,你可以发现,如果你写:

spark.sql("select replaceBlanksWithNulls(column_name) from dataframe")

如果您没有将函数replaceBlanksWithNulls 注册为udf,则不起作用。在 spark sql 中,我们需要知道执行的函数的返回类型。因此,我们需要将自定义函数注册为用户自定义函数(udf),以便在 spark sql 中使用。

【讨论】:

谢谢 - 你是说在 PySpark 中没有区别(例如性能相同? @Rodney 很高兴。正如Shaido的回答中提到的那样,这个问题得到了***.com/q/38296609/3768871的回答。

以上是关于何时使用 UDF 与 PySpark 中的函数? [复制]的主要内容,如果未能解决你的问题,请参考以下文章

udf(用户定义函数)如何在 pyspark 中工作?

Pyspark 中的 UDF 和 python 函数

在 scala 中编写 udf 函数并在 pyspark 作业中使用它们

Pyspark SIZE 函数本身可以工作,但在 UDF 中它没有 [关闭]

更改 DataFrame 中的列数据类型并将其传递到 UDF - PySpark

pyspark 中的 Pandas UDF