在 PySpark 中使用 pandas_udf 时无法填充数组
Posted
技术标签:
【中文标题】在 PySpark 中使用 pandas_udf 时无法填充数组【英文标题】:Unable to populate array while using pandas_udf in PySpark 【发布时间】:2020-06-26 06:21:13 【问题描述】:我有一个 PySpark 数据框,就像
+---+------+------+
|key|value1|value2|
+---+------+------+
| a| 1| 0|
| a| 1| 42|
| b| 3| -1|
| b| 10| -2|
+---+------+------+
我已经定义了一个 pandas_udf 之类的 -
schema = StructType([
StructField("key", StringType())
])
arr = []
@pandas_udf(schema, PandasUDFType.GROUPED_MAP)
def g(df):
k = df.key.iloc[0]
series = [d for d in df.value2]
arr.append(len(series))
print(series)
return pd.DataFrame([k])
df3.groupby("key").apply(g).collect()
print(arr)
显然,数组 arr 应该是 [2, 2],但它仍然是空的。 当我检查驱动程序日志时, print(series) 的输出看起来是正确的,但数组仍然是空的。
返回类型对我来说并不重要,因为我没有更改/处理数据,我只想将它推送到自定义类对象中。
【问题讨论】:
您可以尝试像global arr=[]
一样将arr 设为全局吗?如果不起作用,请尝试使用sc.broadcast(arr)
广播变量
【参考方案1】:
我必须为列表定义一个自定义 Accumulator 并使用它。
from pyspark.accumulators import AccumulatorParam
class ListParam(AccumulatorParam):
def zero(self, val):
return []
def addInPlace(self, val1, val2):
val1.append(val2)
return val1
【讨论】:
以上是关于在 PySpark 中使用 pandas_udf 时无法填充数组的主要内容,如果未能解决你的问题,请参考以下文章
在 PySpark 中使用 pandas_udf 时无法填充数组
在 pyspark 中使用 pandas_udf 过滤数据框