pyspark 聚合的不同列的不同操作
Posted
技术标签:
【中文标题】pyspark 聚合的不同列的不同操作【英文标题】:different actions for different columns for pyspark aggregation 【发布时间】:2017-11-03 17:03:41 【问题描述】:我有一个按一列分组的 pyspark 数据框,然后我想将几个不同的聚合函数(包括一些自定义函数)应用于不同的列。所以基本上我想做的是这个(我知道语法都是错误的,这只是我想做的一个例子):
fraction = UserDefinedFunction(lambda x: sum(x)*100/count(col4),DoubleType())
exprs = x: "sum" for x in [col1,col2,col3]; x: "avg" for x in [col1,col3]; x: "fraction" for x in [col1,col2]
df1 = df.groupBy(col5).agg(*exprs)
我尝试了不同的版本,例如agg(sum(df.col1,df.col2,df.col3)
、avg(df.col1,df.col3)
、fraction(df.col1,df.col2))
,但没有任何效果。
感谢您的帮助!
【问题讨论】:
【参考方案1】:可以将聚合定义为列表,然后与星号运算符一起使用以解压缩 agg()
的列表。如果是自定义函数 - 不幸的是 pyspark 还不支持编写用户定义的聚合函数(请参阅this answer),但在您的情况下,您可以结合标准函数来实现相同的效果:
fraction = lambda col: sum(col) / count('col4')
aggs = [sum(x) for x in ['col1', 'col2', 'col3']] \
+ [avg(x) for x in ['col1', 'col3']] \
+ [fraction(x) for x in ['col1', 'col2']]
df1.groupBy('col5').agg(*aggs).show()
【讨论】:
以上是关于pyspark 聚合的不同列的不同操作的主要内容,如果未能解决你的问题,请参考以下文章
如何通过不同级别的枢轴聚合然后在pyspark中进行内部连接?