在 pyspark 中应用用户定义的聚合函数的替代方法
Posted
技术标签:
【中文标题】在 pyspark 中应用用户定义的聚合函数的替代方法【英文标题】:Alternative ways to apply a user defined aggregate function in pyspark 【发布时间】:2018-01-29 12:04:08 【问题描述】:我正在尝试将用户定义的聚合函数应用于 spark 数据帧,以应用附加平滑,请参见下面的代码:
import findspark
findspark.init()
import pyspark as ps
from pyspark.sql import SQLContext
from pyspark.sql.functions import col, col, collect_list, concat_ws, udf
try:
sc
except NameError:
sc = ps.SparkContext()
sqlContext = SQLContext(sc)
df = sqlContext.createDataFrame([['A', 1],
['A',1],
['A',0],
['B',0],
['B',0],
['B',1]], schema=['name', 'val'])
def smooth_mean(x):
return (sum(x)+5)/(len(x)+5)
smooth_mean_udf = udf(smooth_mean)
df.groupBy('name').agg(collect_list('val').alias('val'))\
.withColumn('val', smooth_mean_udf('val')).show()
这样做有意义吗?据我了解,这不能很好地扩展,因为我使用的是udf
。我也找不到collect_list
的确切工作方式,名称中的collect
部分似乎表明数据被“收集”到边缘节点,但我假设数据被“收集”到各个节点?
提前感谢您的任何反馈。
【问题讨论】:
【参考方案1】:据我了解,这无法扩展
您的理解是正确的,这里最大的问题是collect_list
哪个is just good old groupByKey
。 Python udf
的影响要小得多,但对于简单的算术运算,使用它没有意义。
只需使用标准聚合
from pyspark.sql.functions import sum as sum_, count
(df
.groupBy("name")
.agg(((sum_("val") + 5) / (count("val") + 5)).alias("val"))
.show())
# +----+-----+
# |name| val|
# +----+-----+
# | B| 0.75|
# | A|0.875|
# +----+-----+
【讨论】:
一个后续问题;你怎么知道 collect_list 使用了groupByKey
?如果答案很长/很复杂,我可以将其作为一个新问题发布在 SO 上。以上是关于在 pyspark 中应用用户定义的聚合函数的替代方法的主要内容,如果未能解决你的问题,请参考以下文章