PySpark。将 Dataframe 传递给 pandas_udf 并返回一个系列

Posted

技术标签:

【中文标题】PySpark。将 Dataframe 传递给 pandas_udf 并返回一个系列【英文标题】:PySpark. Passing a Dataframe to a pandas_udf and returning a series 【发布时间】:2018-11-29 15:00:38 【问题描述】:

我正在使用 PySpark 的新 pandas_udf 装饰器,我试图让它接受多个列作为输入并返回一个系列作为输入,但是,我得到了一个 TypeError: Invalid argument

示例代码

@pandas_udf(df.schema, PandasUDFType.SCALAR)
def fun_function(df_in):
    df_in.loc[df_in['a'] < 0] = 0.0
    return (df_in['a'] - df_in['b']) / df_in['c']

【问题讨论】:

【参考方案1】:

A SCALAR udf 期望 pandas 系列作为输入而不是数据框。对于您的情况,无需使用 udf。裁剪后从列abc 直接计算应该可以工作:

import pyspark.sql.functions as f

df = spark.createDataFrame([[1,2,4],[-1,2,2]], ['a', 'b', 'c'])

clip = lambda x: f.when(df.a < 0, 0).otherwise(x)
df.withColumn('d', (clip(df.a) - clip(df.b)) / clip(df.c)).show()

#+---+---+---+-----+
#|  a|  b|  c|    d|
#+---+---+---+-----+
#|  1|  2|  4|-0.25|
#| -1|  2|  2| null|
#+---+---+---+-----+

如果你必须使用pandas_udf,你的返回类型必须是double,而不是df.schema,因为你只返回一个熊猫系列而不是熊猫数据框;而且您还需要将列作为系列传递给函数而不是整个数据框:

@pandas_udf('double', PandasUDFType.SCALAR)
def fun_function(a, b, c):
    clip = lambda x: x.where(a >= 0, 0)
    return (clip(a) - clip(b)) / clip(c)

df.withColumn('d', fun_function(df.a, df.b, df.c)).show()
#+---+---+---+-----+                                                             
#|  a|  b|  c|    d|
#+---+---+---+-----+
#|  1|  2|  4|-0.25|
#| -1|  2|  2| null|
#+---+---+---+-----+

【讨论】:

TypeError: pandas udf 只接受一个参数 检查你的pandas和pyarrow的版本,我可以成功得到结果。@mat77

以上是关于PySpark。将 Dataframe 传递给 pandas_udf 并返回一个系列的主要内容,如果未能解决你的问题,请参考以下文章

PySpark - 如何将列表传递给用户定义函数?

PySpark 将 Dataframe 作为额外参数传递给映射

Pyspark:如何编写复杂的 Dataframe 计算

PySpark,DataFrame 的顶部

在 Pyspark 中合并 DataFrame

无法将变量传递给 pyspark 中的 spark sql 查询