如何将标量 Pyspark UDF 转换为 Pandas UDF?
Posted
技术标签:
【中文标题】如何将标量 Pyspark UDF 转换为 Pandas UDF?【英文标题】:How to convert Scalar Pyspark UDF to Pandas UDF? 【发布时间】:2021-10-25 16:32:54 【问题描述】:我有一个如下的 UDF,它是一个普通的标量 Pyspark UDF:
@udf()
def redact(colVal: column, offset: int = 0):
if not colVal or not offset:
return 'X'*8
else:
charList=list(colVal)
charList[:-offset]='X'*(len(colVal)-offset)
return "".join(charList)
当我阅读时尝试将其转换为 pandas_udf 时,使用矢量化 UDF 代替标量 UDF 有显着的性能改进,但我遇到了很多与 pandas 相关的问题,但我在这方面经验不足。
请帮助我将此 UDF 转换为矢量化 Pandas UDF
【问题讨论】:
【参考方案1】:redact
函数可以包装在一个函数中,该函数将redact
应用于pd.Series
的每个项目。
需要应用柯里化,因为要传递标量
offset
值。
from pyspark.sql import functions as F
import pandas as pd
def pandas_wrapper(values: pd.Series, offset: int) -> pd.Series:
def redact(colVal: str, offset: int = 0):
if not colVal or not offset:
return 'X'*8
else:
charList=list(colVal)
charList[:-offset]='X'*(len(colVal)-offset)
return "".join(charList)
return values.apply(lambda value: redact(value, offset))
def curried_wrapper(offset: int):
return F.pandas_udf(lambda x: pandas_wrapper(x, offset), "string")
df = spark.createDataFrame([("abcdef", ), ("12yz", ), (None,)], ("data_col", ))
df.withColumn("redacted", curried_wrapper(2)(F.col("data_col"))).show()
输出
+--------+--------+
|data_col|redacted|
+--------+--------+
| abcdef| XXXXef|
| 12yz| XXyz|
| null|XXXXXXXX|
+--------+--------+
【讨论】:
这不会破坏使用矢量化 pandas udf 的目的(通过避免在单行级别应用函数来提高性能),因为它本质上是循环所有值(values.apply(lambda value: redact(value, offset))
?跨度>
是的,这是真的,我不尝试在这里对 UDF 进行矢量化,而是提出一种调用 pandas UDF 的方法(使用矢量化或非矢量化实现)。也就是说,使用 pandas_udf
即使是非向量化逻辑也可以通过克服非 pandas UDF 的序列化开销来提高性能。以上是关于如何将标量 Pyspark UDF 转换为 Pandas UDF?的主要内容,如果未能解决你的问题,请参考以下文章
PySpark 将算法转换为 UDF 并将其应用于 DataFrame
Pyspark:从 Struct 中识别 arrayType 列并调用 udf 将数组转换为字符串
如何使用PySpark将SparseVector中的前X个单词转换为字符串数组