如何在 PySpark 中将多个参数传递给 Pandas UDF?

Posted

技术标签:

【中文标题】如何在 PySpark 中将多个参数传递给 Pandas UDF?【英文标题】:How do I pass multiple arguments to a Pandas UDF in PySpark? 【发布时间】:2021-08-26 05:54:42 【问题描述】:

我正在使用以下 sn-p:

from cape_privacy.pandas.transformations import Tokenizer

max_token_len = 5


@pandas_udf("string")

def Tokenize(column: pd.Series)-> pd.Series:
  tokenizer = Tokenizer(max_token_len)
  return tokenizer(column)


spark_df = spark_df.withColumn("name", Tokenize("name"))

由于 Pandas UDF 仅使用 Pandas 系列,我无法在函数调用 Tokenize("name") 中传递 max_token_len 参数。

因此我必须在函数范围之外定义max_token_len 参数。

this question 中提供的解决方法并不是很有帮助。 此问题是否有其他可能的解决方法或替代方法?

请指教

【问题讨论】:

【参考方案1】:

在尝试了无数方法后,我找到了一个不费吹灰之力的解决方案,如下图所示:

我创建了一个 包装器 函数 (Tokenize_wrapper) 来包装 Pandas UDF (Tokenize_udf) 并使用返回 Pandas UDF 的包装器函数 em> 函数调用。

def Tokenize_wrapper(column, max_token_len=10):

  @pandas_udf("string")
  def Tokenize_udf(column: pd.Series) -> pd.Series:
    tokenizer = Tokenizer(max_token_len)
    return tokenizer(column)

  return Tokenize_udf(column)



df = df.withColumn("Name", Tokenize_wrapper("Name", max_token_len=5))

使用部分函数(@Vaebhav 的回答)确实使这个问题的实现变得困难。

【讨论】:

【参考方案2】:

您可以通过使用partial 并在您的UDF 签名中直接指定一个额外的argument(s) 来实现此目的

数据准备

input_list = [
               (1,None,111)    
               ,(1,None,120)
              ,(1,None,121)
              ,(1,None,124)
              ,(1,'p1',125)
              ,(1,None,126)
              ,(1,None,146)
              ,(1,None,147)
             ]

sparkDF = sql.createDataFrame(input_list,['id','p_id','timestamp'])

sparkDF.show()

+---+----+---------+
| id|p_id|timestamp|
+---+----+---------+
|  1|null|      111|
|  1|null|      120|
|  1|null|      121|
|  1|null|      124|
|  1|  p1|      125|
|  1|null|      126|
|  1|null|      146|
|  1|null|      147|
+---+----+---------+

部分


def add_constant(inp,cnst=5):
    return inp + cnst


cnst_add = 10

partial_func = partial(add_constant,cnst=cnst_add)

sparkDF = sparkDF.withColumn('Constant',partial_func(F.col('timestamp')))
                 
sparkDF.show()

+---+----+---------+----------------+
| id|p_id|timestamp|Constant_Partial|
+---+----+---------+----------------+
|  1|null|      111|             121|
|  1|null|      120|             130|
|  1|null|      121|             131|
|  1|null|      124|             134|
|  1|  p1|      125|             135|
|  1|null|      126|             136|
|  1|null|      146|             156|
|  1|null|      147|             157|
+---+----+---------+----------------+

UDF 签名

cnst_add = 10

add_constant_udf = F.udf(lambda x : add_constant(x,cnst_add),IntegerType())


sparkDF = sparkDF.withColumn('Constant_UDF',add_constant_udf(F.col('timestamp')))

sparkDF.show()

+---+----+---------+------------+
| id|p_id|timestamp|Constant_UDF|
+---+----+---------+------------+
|  1|null|      111|         121|
|  1|null|      120|         130|
|  1|null|      121|         131|
|  1|null|      124|         134|
|  1|  p1|      125|         135|
|  1|null|      126|         136|
|  1|null|      146|         156|
|  1|null|      147|         157|
+---+----+---------+------------+

同样,您可以如下转换您的功能 -

from functools import partial

max_token_len = 5

def Tokenize(column: pd.Series,max_token_len=10)-> pd.Series:
  tokenizer = Tokenizer(max_token_len)
  return tokenizer(column)

Tokenize_udf = F.udf(lambda x : Tokenize(x,max_token_len),StringType())

Tokenize_partial = partial(Tokenize,max_token_len=max_token_len)

spark_df = spark_df.withColumn("name", Tokenize_udf("name"))
spark_df = spark_df.withColumn("name", Tokenize_partial("name"))

【讨论】:

我看到您提供的答案是关于UDF。答案是否也适用于Pandas UDF F.udf 中的F 是什么? NameError: name 'add_constant' is not defined 更新了答案,添加了缺少的函数def import pyspark.sql.functions as F

以上是关于如何在 PySpark 中将多个参数传递给 Pandas UDF?的主要内容,如果未能解决你的问题,请参考以下文章

在Python中将多个参数传递给pool.map()函数[重复]

在 Laravel 5 中将多个参数传递给控制器

如何将参数传递给不带字符串的函数(Pyspark)

如何使用 pySpark 决定将 numClasses 参数传递给 SPark MLlib 中的随机森林算法

如何在 Delphi 2010 中将参数传递给 flash 电影?

如何在 Flutter 中将参数传递给动作