如何将标量 Pyspark UDF 转换为 Pandas UDF?

Posted

技术标签:

【中文标题】如何将标量 Pyspark UDF 转换为 Pandas UDF?【英文标题】:How to convert Scalar Pyspark UDF to Pandas UDF? 【发布时间】:2021-10-25 16:32:54 【问题描述】:

我有一个如下的 UDF,它是一个普通的标量 Pyspark UDF:

@udf()
def redact(colVal: column, offset: int = 0):
    if not colVal or not offset:
        return 'X'*8
    else:
        charList=list(colVal)
        charList[:-offset]='X'*(len(colVal)-offset)
        return "".join(charList)

当我阅读时尝试将其转换为 pandas_udf 时,使用矢量化 UDF 代替标量 UDF 有显着的性能改进,但我遇到了很多与 pandas 相关的问题,但我在这方面经验不足。

请帮助我将此 UDF 转换为矢量化 Pandas UDF

【问题讨论】:

【参考方案1】:

redact 函数可以包装在一个函数中,该函数将redact 应用于pd.Series 的每个项目。

需要应用柯里化,因为要传递标量 offset 值。

from pyspark.sql import functions as F
import pandas as pd

def pandas_wrapper(values: pd.Series, offset: int) -> pd.Series:
    def redact(colVal: str, offset: int = 0):
        if not colVal or not offset:
            return 'X'*8
        else:
            charList=list(colVal)
            charList[:-offset]='X'*(len(colVal)-offset)
            return "".join(charList)
    return values.apply(lambda value: redact(value, offset))

def curried_wrapper(offset: int):
    return F.pandas_udf(lambda x: pandas_wrapper(x, offset), "string")

df = spark.createDataFrame([("abcdef", ), ("12yz", ), (None,)], ("data_col", ))

df.withColumn("redacted", curried_wrapper(2)(F.col("data_col"))).show()

输出

+--------+--------+
|data_col|redacted|
+--------+--------+
|  abcdef|  XXXXef|
|    12yz|    XXyz|
|    null|XXXXXXXX|
+--------+--------+

【讨论】:

这不会破坏使用矢量化 pandas udf 的目的(通过避免在单行级别应用函数来提高性能),因为它本质上是循环所有值(values.apply(lambda value: redact(value, offset))?跨度> 是的,这是真的,我不尝试在这里对 UDF 进行矢量化,而是提出一种调用 pandas UDF 的方法(使用矢量化或非矢量化实现)。也就是说,使用 pandas_udf 即使是非向量化逻辑也可以通过克服非 pandas UDF 的序列化开销来提高性能。

以上是关于如何将标量 Pyspark UDF 转换为 Pandas UDF?的主要内容,如果未能解决你的问题,请参考以下文章

PySpark 将算法转换为 UDF 并将其应用于 DataFrame

Pyspark:从 Struct 中识别 arrayType 列并调用 udf 将数组转换为字符串

如何使用PySpark将SparseVector中的前X个单词转换为字符串数组

如何在pyspark中将字符串值转换为arrayType

哪个选项使用 pyspark 提供最佳性能?使用地图进行 UDF 或 RDD 处理?

AWS Glue pyspark UDF