在 PySpark Pandas UDF 中指定用户定义函数的正确方法

Posted

技术标签:

【中文标题】在 PySpark Pandas UDF 中指定用户定义函数的正确方法【英文标题】:Correct Way to Specify User-Defined Function in PySpark Pandas UDF 【发布时间】:2021-01-08 12:40:59 【问题描述】:

我使用的是 pyspark 2.4.2,所以对于这个版本,docs 可以这样做来创建一个 GROUPED_MAP:

from pyspark.sql.functions import pandas_udf, PandasUDFType
df = spark.createDataFrame([(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],("id", "v"))

@pandas_udf(returnType="id long, v double", functionType=PandasUDFType.GROUPED_MAP)
def subtract_mean(pdf):
    v = pdf.v
    return pdf.assign(v=v - v.mean())

df.groupby("id").apply(subtract_mean).show()

这可行,但您不能将 subtract_mean 作为传递给 pandas DataFrame 的普通 python 函数调用。但如果你这样做,它会起作用:

def subtract_mean(pdf):
    v = pdf.v
    return pdf.assign(v=v - v.mean())

sub_spark = pandas_udf(f=subtract_mean, returnType="id long, v double", functionType=PandasUDFType.GROUPED_MAP)

df.groupby("id").apply(sub_spark).show()

现在您可以通过 python 调用 subtract_mean 并传递 pandas DataFrame。如何使用注释方法做到这一点?从文档中不清楚如何做到这一点。 f参数注解了什么函数,赋予了什么函数?

【问题讨论】:

这两种方式等效于指定 UDF。装饰器方法只是一种更整洁的做事方式。装饰器后面的函数作为f 参数传递。如果你修饰了一个函数,我认为你不能访问原始的、未修饰的函数。 我害怕那个。驼峰 还有一种方法可以取回原始函数,如下所述:***.com/a/33024739/14165730。或许subtract_mean.__wrapped__ 会还给你原来未修饰的函数。 是的!这完全有效:pandas_dataframe.groupby("id").apply(subtract_mean.__wrapped__) 我建议在您的问题中使用第二种方法。使用__wrapped__ 会降低代码的可读性。 【参考方案1】:

这两种方法等效于指定 UDF。装饰器方法只是一种更整洁的做事方式。装饰器后面的函数作为f 参数传递。

如in this answer 所述,您可以使用subtract_mean.__wrapped__ 来取回原始未修饰的功能。不过,您问题中的第二种方法更具可读性。使用__wrapped__ 会降低代码的可读性。但如果只是为了单元测试,应该没问题。

【讨论】:

以上是关于在 PySpark Pandas UDF 中指定用户定义函数的正确方法的主要内容,如果未能解决你的问题,请参考以下文章

pyspark 中的 Pandas UDF

为啥运行 pandas_udf 时 Pyspark 失败?

带有 PySpark 2.4 的 Pandas UDF [重复]

如何在 pyspark 中使用 pandas UDF 并在 StructType 中返回结果

如何在 Pyspark 中使用 @pandas_udf 返回多个数据帧?

在pyspark的pandas_udf中使用外部库