在 PySpark 中使用 pandas_udf 时无法填充数组

Posted

技术标签:

【中文标题】在 PySpark 中使用 pandas_udf 时无法填充数组【英文标题】:Unable to populate array while using pandas_udf in PySpark 【发布时间】:2020-06-26 06:21:13 【问题描述】:

我有一个 PySpark 数据框,就像

+---+------+------+
|key|value1|value2|
+---+------+------+
|  a|     1|     0|
|  a|     1|    42|
|  b|     3|    -1|
|  b|    10|    -2|
+---+------+------+

我已经定义了一个 pandas_udf 之类的 -

schema = StructType([
    StructField("key", StringType())
])

arr = []
@pandas_udf(schema, PandasUDFType.GROUPED_MAP)
def g(df):
    k = df.key.iloc[0]
    series = [d for d in df.value2]
    arr.append(len(series))
    print(series)
    return pd.DataFrame([k])
df3.groupby("key").apply(g).collect()
print(arr)

显然,数组 arr 应该是 [2, 2],但它仍然是空的。 当我检查驱动程序日志时, print(series) 的输出看起来是正确的,但数组仍然是空的。

返回类型对我来说并不重要,因为我没有更改/处理数据,我只想将它推送到自定义类对象中。

【问题讨论】:

您可以尝试像global arr=[] 一样将arr 设为全局吗?如果不起作用,请尝试使用sc.broadcast(arr) 广播变量 【参考方案1】:

我必须为列表定义一个自定义 Accumulator 并使用它。

from pyspark.accumulators import AccumulatorParam
class ListParam(AccumulatorParam):
    def zero(self, val):
        return []
    def addInPlace(self, val1, val2):
        val1.append(val2)
        return val1

【讨论】:

以上是关于在 PySpark 中使用 pandas_udf 时无法填充数组的主要内容,如果未能解决你的问题,请参考以下文章

在 PySpark 中使用 pandas_udf 时无法填充数组

在 pyspark 中使用 pandas_udf 过滤数据框

为啥运行 pandas_udf 时 Pyspark 失败?

PySpark中pandas_udf的隐式模式?

PySpark。将 Dataframe 传递给 pandas_udf 并返回一个系列

pandas_udf 与 pyspark 3.0 的 scipiy.find_peaks 结果不一致