pyspark 聚合的不同列的不同操作

Posted

技术标签:

【中文标题】pyspark 聚合的不同列的不同操作【英文标题】:different actions for different columns for pyspark aggregation 【发布时间】:2017-11-03 17:03:41 【问题描述】:

我有一个按一列分组的 pyspark 数据框,然后我想将几个不同的聚合函数(包括一些自定义函数)应用于不同的列。所以基本上我想做的是这个(我知道语法都是错误的,这只是我想做的一个例子):

fraction  = UserDefinedFunction(lambda x: sum(x)*100/count(col4),DoubleType())
exprs = x: "sum" for x in [col1,col2,col3]; x: "avg" for x in [col1,col3]; x: "fraction" for x in [col1,col2]


df1 = df.groupBy(col5).agg(*exprs)

我尝试了不同的版本,例如agg(sum(df.col1,df.col2,df.col3)avg(df.col1,df.col3)fraction(df.col1,df.col2)),但没有任何效果。

感谢您的帮助!

【问题讨论】:

【参考方案1】:

可以将聚合定义为列表,然后与星号运算符一起使用以解压缩 agg() 的列表。如果是自定义函数 - 不幸的是 pyspark 还不支持编写用户定义的聚合函数(请参阅this answer),但在您的情况下,您可以结合标准函数来实现相同的效果:

fraction = lambda col: sum(col) / count('col4')
aggs = [sum(x) for x in ['col1', 'col2', 'col3']]  \
    + [avg(x) for x in ['col1', 'col3']] \
    + [fraction(x) for x in ['col1', 'col2']]

df1.groupBy('col5').agg(*aggs).show()

【讨论】:

以上是关于pyspark 聚合的不同列的不同操作的主要内容,如果未能解决你的问题,请参考以下文章

[Pyspark]RDD常用方法总结

如何根据 PySpark 中窗口聚合的条件计算不同值?

如何通过不同级别的枢轴聚合然后在pyspark中进行内部连接?

PySpark:具有不同列的 DataFrames 的动态联合

多列上的多个聚合

Pyspark - 基于数据框中的 2 列的不同记录