如何在 Spark 中一次对多个列进行聚合
Posted
技术标签:
【中文标题】如何在 Spark 中一次对多个列进行聚合【英文标题】:How to do aggregation on multiple columns at once in Spark 【发布时间】:2016-08-12 19:57:35 【问题描述】:我有一个包含多列的数据框。我想按其中一列分组并一次聚合其他列。假设该表有 4 列,cust_id、f1、f2、f3,我想按 cust_id 分组,然后获取 avg(f1)、avg(f2) 和 avg(f3)。该表将有很多列。有什么提示吗?
以下代码是一个好的开始,但由于我有很多列,手动编写它们可能不是一个好主意。
df.groupBy("cust_id").agg(sum("f1"), sum("f2"), sum("f3"))
【问题讨论】:
SparkSQL: apply aggregate functions to a list of column的可能重复 【参考方案1】:也许您可以尝试使用列名映射列表:
val groupCol = "cust_id"
val aggCols = (df.columns.toSet - groupCol).map(
colName => avg(colName).as(colName + "_avg")
).toList
df.groupBy(groupCol).agg(aggCols.head, aggCols.tail: _*)
或者,如果需要,您还可以匹配架构并根据类型构建聚合:
val aggCols = df.schema.collect
case StructField(colName, IntegerType, _, _) => avg(colName).as(colName + "_avg")
case StructField(colName, StringType, _, _) => first(colName).as(colName + "_first")
【讨论】:
我怎样才能将 agg 列命名为 f1_avg 之类的名称? @H.Z.只需在后面加上.as()
。在第一个示例中:.map(colName => avg(colName).as(colName+"_avg"))
在第二个示例中,只需将 .as()
放在函数后面
太不可思议了! agg(aggCols: _),但是 agg(aggCols.head, aggCols.tail: _) 可以!纯粹的魔法!你能解释一下背后的原因吗?谢谢。
@JennyYueJin 如果您查看docs 中可用的agg
签名,则没有agg(exprs: Column*)
选项,只有带有头部和尾部的选项。我不确定他们为什么选择这种行为,但我相信这是为了避免使用空参数列表调用agg
,例如df.groupBy("col_a").agg()
,这可以通过单个exprs: Column*
参数实现。以上是关于如何在 Spark 中一次对多个列进行聚合的主要内容,如果未能解决你的问题,请参考以下文章
flink一次对整个窗口进行聚合操作-ProcessWindowFunction