如何在 PySpark 中将多个参数传递给 Pandas UDF?
Posted
技术标签:
【中文标题】如何在 PySpark 中将多个参数传递给 Pandas UDF?【英文标题】:How do I pass multiple arguments to a Pandas UDF in PySpark? 【发布时间】:2021-08-26 05:54:42 【问题描述】:我正在使用以下 sn-p:
from cape_privacy.pandas.transformations import Tokenizer
max_token_len = 5
@pandas_udf("string")
def Tokenize(column: pd.Series)-> pd.Series:
tokenizer = Tokenizer(max_token_len)
return tokenizer(column)
spark_df = spark_df.withColumn("name", Tokenize("name"))
由于 Pandas UDF 仅使用 Pandas 系列,我无法在函数调用 Tokenize("name")
中传递 max_token_len
参数。
因此我必须在函数范围之外定义max_token_len
参数。
this question 中提供的解决方法并不是很有帮助。 此问题是否有其他可能的解决方法或替代方法?
请指教
【问题讨论】:
【参考方案1】:在尝试了无数方法后,我找到了一个不费吹灰之力的解决方案,如下图所示:
我创建了一个 包装器 函数 (Tokenize_wrapper
) 来包装 Pandas UDF (Tokenize_udf
) 并使用返回 Pandas UDF 的包装器函数 em> 函数调用。
def Tokenize_wrapper(column, max_token_len=10):
@pandas_udf("string")
def Tokenize_udf(column: pd.Series) -> pd.Series:
tokenizer = Tokenizer(max_token_len)
return tokenizer(column)
return Tokenize_udf(column)
df = df.withColumn("Name", Tokenize_wrapper("Name", max_token_len=5))
使用部分函数(@Vaebhav 的回答)确实使这个问题的实现变得困难。
【讨论】:
【参考方案2】:您可以通过使用partial 并在您的UDF 签名中直接指定一个额外的argument(s) 来实现此目的
数据准备
input_list = [
(1,None,111)
,(1,None,120)
,(1,None,121)
,(1,None,124)
,(1,'p1',125)
,(1,None,126)
,(1,None,146)
,(1,None,147)
]
sparkDF = sql.createDataFrame(input_list,['id','p_id','timestamp'])
sparkDF.show()
+---+----+---------+
| id|p_id|timestamp|
+---+----+---------+
| 1|null| 111|
| 1|null| 120|
| 1|null| 121|
| 1|null| 124|
| 1| p1| 125|
| 1|null| 126|
| 1|null| 146|
| 1|null| 147|
+---+----+---------+
部分
def add_constant(inp,cnst=5):
return inp + cnst
cnst_add = 10
partial_func = partial(add_constant,cnst=cnst_add)
sparkDF = sparkDF.withColumn('Constant',partial_func(F.col('timestamp')))
sparkDF.show()
+---+----+---------+----------------+
| id|p_id|timestamp|Constant_Partial|
+---+----+---------+----------------+
| 1|null| 111| 121|
| 1|null| 120| 130|
| 1|null| 121| 131|
| 1|null| 124| 134|
| 1| p1| 125| 135|
| 1|null| 126| 136|
| 1|null| 146| 156|
| 1|null| 147| 157|
+---+----+---------+----------------+
UDF 签名
cnst_add = 10
add_constant_udf = F.udf(lambda x : add_constant(x,cnst_add),IntegerType())
sparkDF = sparkDF.withColumn('Constant_UDF',add_constant_udf(F.col('timestamp')))
sparkDF.show()
+---+----+---------+------------+
| id|p_id|timestamp|Constant_UDF|
+---+----+---------+------------+
| 1|null| 111| 121|
| 1|null| 120| 130|
| 1|null| 121| 131|
| 1|null| 124| 134|
| 1| p1| 125| 135|
| 1|null| 126| 136|
| 1|null| 146| 156|
| 1|null| 147| 157|
+---+----+---------+------------+
同样,您可以如下转换您的功能 -
from functools import partial
max_token_len = 5
def Tokenize(column: pd.Series,max_token_len=10)-> pd.Series:
tokenizer = Tokenizer(max_token_len)
return tokenizer(column)
Tokenize_udf = F.udf(lambda x : Tokenize(x,max_token_len),StringType())
Tokenize_partial = partial(Tokenize,max_token_len=max_token_len)
spark_df = spark_df.withColumn("name", Tokenize_udf("name"))
spark_df = spark_df.withColumn("name", Tokenize_partial("name"))
【讨论】:
我看到您提供的答案是关于UDF
。答案是否也适用于Pandas UDF
?
F.udf
中的F
是什么?
NameError: name 'add_constant' is not defined
更新了答案,添加了缺少的函数def
import pyspark.sql.functions as F以上是关于如何在 PySpark 中将多个参数传递给 Pandas UDF?的主要内容,如果未能解决你的问题,请参考以下文章
在Python中将多个参数传递给pool.map()函数[重复]
如何使用 pySpark 决定将 numClasses 参数传递给 SPark MLlib 中的随机森林算法