是在单个 groupBy 中还是分别进行所有聚合?
Posted
技术标签:
【中文标题】是在单个 groupBy 中还是分别进行所有聚合?【英文标题】:Do all aggregations in a single groupBy or separately? 【发布时间】:2018-01-25 13:30:02 【问题描述】:我需要在我的 PySpark 代码中对大型数据集进行大量聚合(大约 9 到 10 次)。我可以通过两种方式来处理它:
单组:
df.groupBy(col1, col2).agg("col3":"sum", "col4":"avg", "col5":"min", "col6":"sum", "col7":"max", "col8":"avg", "col9":"sum")
分组并加入
temp1 = df.groupBy(col1, col2).agg("col3":"sum")
temp2 = df.groupBy(col1, col2).agg("col4":"avg")
temp3 = df.groupBy(col1, col2).agg("col5":"min")
.
.
.
temp9 = df.groupBy(col1, col2).agg("col9":"sum")
然后加入所有这 9 个数据帧以获得最终输出。
哪个效率更高?
【问题讨论】:
【参考方案1】:TL;DR 选择第一个。
这甚至不是比赛。仅可读性就足以拒绝第二种解决方案,该解决方案冗长且令人费解。
更不用说,执行计划只是一个怪物(这里只有 2 个表!):
== Physical Plan ==
*Project [col1#512L, col2#513L, sum(col3)#597L, avg(col4)#614, min(col5)#631L]
+- *SortMergeJoin [col1#512L, col2#513L], [col1#719L, col2#720L], Inner
:- *Project [col1#512L, col2#513L, sum(col3)#597L, avg(col4)#614]
: +- *SortMergeJoin [col1#512L, col2#513L], [col1#704L, col2#705L], Inner
: :- *Sort [col1#512L ASC NULLS FIRST, col2#513L ASC NULLS FIRST], false, 0
: : +- *HashAggregate(keys=[col1#512L, col2#513L], functions=[sum(col3#514L)])
: : +- Exchange hashpartitioning(col1#512L, col2#513L, 200)
: : +- *HashAggregate(keys=[col1#512L, col2#513L], functions=[partial_sum(col3#514L)])
: : +- *Project [_1#491L AS col1#512L, _2#492L AS col2#513L, _3#493L AS col3#514L]
: : +- *Filter (isnotnull(_1#491L) && isnotnull(_2#492L))
: : +- Scan ExistingRDD[_1#491L,_2#492L,_3#493L,_4#494L,_5#495L,_6#496L,_7#497L,_8#498L,_9#499L,_10#500L]
: +- *Sort [col1#704L ASC NULLS FIRST, col2#705L ASC NULLS FIRST], false, 0
: +- *HashAggregate(keys=[col1#704L, col2#705L], functions=[avg(col4#707L)])
: +- Exchange hashpartitioning(col1#704L, col2#705L, 200)
: +- *HashAggregate(keys=[col1#704L, col2#705L], functions=[partial_avg(col4#707L)])
: +- *Project [_1#491L AS col1#704L, _2#492L AS col2#705L, _4#494L AS col4#707L]
: +- *Filter (isnotnull(_2#492L) && isnotnull(_1#491L))
: +- Scan ExistingRDD[_1#491L,_2#492L,_3#493L,_4#494L,_5#495L,_6#496L,_7#497L,_8#498L,_9#499L,_10#500L]
+- *Sort [col1#719L ASC NULLS FIRST, col2#720L ASC NULLS FIRST], false, 0
+- *HashAggregate(keys=[col1#719L, col2#720L], functions=[min(col5#723L)])
+- Exchange hashpartitioning(col1#719L, col2#720L, 200)
+- *HashAggregate(keys=[col1#719L, col2#720L], functions=[partial_min(col5#723L)])
+- *Project [_1#491L AS col1#719L, _2#492L AS col2#720L, _5#495L AS col5#723L]
+- *Filter (isnotnull(_1#491L) && isnotnull(_2#492L))
+- Scan ExistingRDD[_1#491L,_2#492L,_3#493L,_4#494L,_5#495L,_6#496L,_7#497L,_8#498L,_9#499L,_10#500L]
与普通聚合相比(对于所有列):
== Physical Plan ==
*HashAggregate(keys=[col1#512L, col2#513L], functions=[max(col7#518L), avg(col8#519L), sum(col3#514L), sum(col6#517L), sum(col9#520L), min(col5#516L), avg(col4#515L)])
+- Exchange hashpartitioning(col1#512L, col2#513L, 200)
+- *HashAggregate(keys=[col1#512L, col2#513L], functions=[partial_max(col7#518L), partial_avg(col8#519L), partial_sum(col3#514L), partial_sum(col6#517L), partial_sum(col9#520L), partial_min(col5#516L), partial_avg(col4#515L)])
+- *Project [_1#491L AS col1#512L, _2#492L AS col2#513L, _3#493L AS col3#514L, _4#494L AS col4#515L, _5#495L AS col5#516L, _6#496L AS col6#517L, _7#497L AS col7#518L, _8#498L AS col8#519L, _9#499L AS col9#520L]
+- Scan ExistingRDD[_1#491L,_2#492L,_3#493L,_4#494L,_5#495L,_6#496L,_7#497L,_8#498L,_9#499L,_10#500L]
【讨论】:
我最近刚开始使用 PySpark,所以我不知道这是否有效。第二种方法(group by 和 join)不会在多个节点上使用并行性,而在第一种方法中这样做会很困难,考虑到我们有 sum 以及avg 作为聚合? @user8371915 你觉得这个***.com/questions/40888946/… 怎么样? @eliasah 看起来不错,但不是重复的,如果这是您的要求。以上是关于是在单个 groupBy 中还是分别进行所有聚合?的主要内容,如果未能解决你的问题,请参考以下文章
pandas使用groupby函数基于指定分组变量对dataframe数据进行分组使用mean函数计算每个分组中的所有数值变量的聚合平均值
添加 groupby 对象的单个数据框的数字列的 Pythonic 方法
pandas使用groupby函数基于指定分组变量对dataframe数据进行分组使用sum函数计算每个分组中的所有数值变量的聚合加和值