如何在 Spark 中一次对多个列进行聚合

Posted

技术标签:

【中文标题】如何在 Spark 中一次对多个列进行聚合【英文标题】:How to do aggregation on multiple columns at once in Spark 【发布时间】:2016-08-12 19:57:35 【问题描述】:

我有一个包含多列的数据框。我想按其中一列分组并一次聚合其他列。假设该表有 4 列,cust_id、f1、f2、f3,我想按 cust_id 分组,然后获取 avg(f1)、avg(f2) 和 avg(f3)。该表将有很多列。有什么提示吗?

以下代码是一个好的开始,但由于我有很多列,手动编写它们可能不是一个好主意。

df.groupBy("cust_id").agg(sum("f1"), sum("f2"), sum("f3"))

【问题讨论】:

SparkSQL: apply aggregate functions to a list of column的可能重复 【参考方案1】:

也许您可以尝试使用列名映射列表:

val groupCol = "cust_id"
val aggCols = (df.columns.toSet - groupCol).map(
  colName => avg(colName).as(colName + "_avg")
).toList

df.groupBy(groupCol).agg(aggCols.head, aggCols.tail: _*)

或者,如果需要,您还可以匹配架构并根据类型构建聚合:

val aggCols = df.schema.collect 
  case StructField(colName, IntegerType, _, _) => avg(colName).as(colName + "_avg")
  case StructField(colName, StringType, _, _) => first(colName).as(colName + "_first")

【讨论】:

我怎样才能将 agg 列命名为 f1_avg 之类的名称? @H.Z.只需在后面加上.as()。在第一个示例中:.map(colName => avg(colName).as(colName+"_avg")) 在第二个示例中,只需将 .as() 放在函数后面 太不可思议了! agg(aggCols: _),但是 agg(aggCols.head, aggCols.tail: _) 可以!纯粹的魔法!你能解释一下背后的原因吗?谢谢。 @JennyYueJin 如果您查看docs 中可用的agg 签名,则没有agg(exprs: Column*) 选项,只有带有头部和尾部的选项。我不确定他们为什么选择这种行为,但我相信这是为了避免使用空参数列表调用agg,例如df.groupBy("col_a").agg(),这可以通过单个exprs: Column* 参数实现。

以上是关于如何在 Spark 中一次对多个列进行聚合的主要内容,如果未能解决你的问题,请参考以下文章

如何在 sklearn 中一次在多个列上应用预处理方法

flink一次对整个窗口进行聚合操作-ProcessWindowFunction

在Oracle中一次对补充数据执行两次UPDATE

如何在 C# 中一次播放多个声音

我如何知道向量函数 (SIMD) 是不是真的一次对多个对象起作用?

如何在Python中一次运行多个while循环[重复]