在 pyspark 中应用用户定义的聚合函数的替代方法

Posted

技术标签:

【中文标题】在 pyspark 中应用用户定义的聚合函数的替代方法【英文标题】:Alternative ways to apply a user defined aggregate function in pyspark 【发布时间】:2018-01-29 12:04:08 【问题描述】:

我正在尝试将用户定义的聚合函数应用于 spark 数据帧,以应用附加平滑,请参见下面的代码:

import findspark
findspark.init()
import pyspark as ps
from pyspark.sql import SQLContext
from pyspark.sql.functions import col, col, collect_list, concat_ws, udf

try:
    sc
except NameError:
    sc = ps.SparkContext()
    sqlContext = SQLContext(sc)

df = sqlContext.createDataFrame([['A', 1],
                            ['A',1],
                            ['A',0],
                            ['B',0],
                            ['B',0],
                            ['B',1]], schema=['name', 'val'])


def smooth_mean(x):
    return (sum(x)+5)/(len(x)+5)

smooth_mean_udf = udf(smooth_mean)

df.groupBy('name').agg(collect_list('val').alias('val'))\
.withColumn('val', smooth_mean_udf('val')).show()

这样做有意义吗?据我了解,这不能很好地扩展,因为我使用的是udf。我也找不到collect_list的确切工作方式,名称中的collect部分似乎表明数据被“收集”到边缘节点,但我假设数据被“收集”到各个节点?

提前感谢您的任何反馈。

【问题讨论】:

【参考方案1】:

据我了解,这无法扩展

您的理解是正确的,这里最大的问题是collect_list 哪个is just good old groupByKey。 Python udf 的影响要小得多,但对于简单的算术运算,使用它没有意义。

只需使用标准聚合

from pyspark.sql.functions import sum as sum_, count

(df
    .groupBy("name")
    .agg(((sum_("val") + 5) / (count("val") + 5)).alias("val"))
    .show())

# +----+-----+
# |name|  val|
# +----+-----+
# |   B| 0.75|
# |   A|0.875|
# +----+-----+

【讨论】:

一个后续问题;你怎么知道 collect_list 使用了groupByKey?如果答案很长/很复杂,我可以将其作为一个新问题发布在 SO 上。

以上是关于在 pyspark 中应用用户定义的聚合函数的替代方法的主要内容,如果未能解决你的问题,请参考以下文章

pyspark 聚合的不同列的不同操作

Pyspark 将列列表转换为聚合函数

如何将 groupBy 和聚合函数应用于 PySpark DataFrame 中的特定窗口?

将用户定义的函数应用于 PySpark 数据帧并返回字典

Pyspark - 一次聚合数据框的所有列[重复]

PySpark 中是不是有与 Pandas 聚合函数 any() 等效的函数?