Pyspark - 调用 pandas_udf 时出错,结果返回 Series.interpolate()

Posted

技术标签:

【中文标题】Pyspark - 调用 pandas_udf 时出错,结果返回 Series.interpolate()【英文标题】:Pyspark - Error calling pandas_udf returning a Series.interpolate() as result 【发布时间】:2019-02-06 22:00:17 【问题描述】:

我正在尝试创建一个返回 interpolation 函数的 UDF,但该函数正在返回一个带有索引并引发异常的系列。

from pyspark.sql.types import FloatType

@F.pandas_udf(FloatType(), F.PandasUDFType.GROUPED_AGG)
def udf_interpolate(v):
  return v.interpolate('linear')

## Test data
df = spark.createDataFrame([
    ("charles", 1),
    ("charles", None),
    ("charles", 3),
], ["name", "value"])

window = Window.partitionBy('name').rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
df.withColumn('test_interp', udf_interpolate(df.value).over(window)).show()

错误信息:

pyarrow.lib.ArrowInvalid: Could not convert 0    3.0
1    2.0
2    1.0
Name: _0, dtype: float64 with type Series: tried to convert to float32

我尝试强制转换为 float32,但错误仍然存​​在。我最初的想法是因为我在“预期一个值”中返回了一个包含多个值的系列,但我不知道如何解决这个问题。

如果我改变我的函数,例如,返回一个v.mean(),效果很好。

感谢任何帮助。

谢谢。

【问题讨论】:

【参考方案1】:

GROUPED_AGG requires the UDF to return a scalar;在您的情况下,最好使用GROUPED_MAP,因为您要返回一个系列并且需要按组执行计算;本质上,您将每个名称的子数据框传递给pandas_udf,使用 pandas API 对其进行转换并返回转换后的数据框:

@F.pandas_udf(df.schema, F.PandasUDFType.GROUPED_MAP)
def udf_interpolate(g):
    return g.assign(value=g.value.interpolate('linear'))

df.groupby('name').apply(udf_interpolate).show()
+-------+-----+                                                                 
|   name|value|
+-------+-----+
|charles|    1|
|charles|    2|
|charles|    3|
+-------+-----+

【讨论】:

谢谢,@Psidom。在 Databricks 中进行测试,如果我使用 GROUPED_MAP 运行 udf_interpolate 3 次,我会得到三个不同的结果(有时是正确的结果)。你知道为什么吗? 很可能是因为您的数据框没有固有的顺序。在实践中,您应该有一个像时间戳这样的列来排序。

以上是关于Pyspark - 调用 pandas_udf 时出错,结果返回 Series.interpolate()的主要内容,如果未能解决你的问题,请参考以下文章

在 PySpark 中使用 pandas_udf 时无法填充数组

如何在 pyspark.sql.functions.pandas_udf 和 pyspark.sql.functions.udf 之间进行选择?

为啥我的应用程序不能以 pandas_udf 和 PySpark+Flask 开头?

带有 PySpark 2.4 的 Pandas UDF [重复]

PySpark。将 Dataframe 传递给 pandas_udf 并返回一个系列

如何在 Pyspark 中使用 @pandas_udf 返回多个数据帧?