在 spark 数据框中的几列上替代 groupBy

Posted

技术标签:

【中文标题】在 spark 数据框中的几列上替代 groupBy【英文标题】:Alternative to groupBy on several columns in spark dataframe 【发布时间】:2018-04-05 03:18:39 【问题描述】:

我有一个带有如下列的 spark 数据框:

df
--------------------------
A     B     C    D    E    F   amt
"A1"  "B1" "C1" "D1" "E1"  "F1"  1
"A2"  "B2" "C2" "D2" "E2"  "F2"  2

我想使用列组合执行 groupBy

(A, B, sum(amt))
(A, C, sum(amt))
(A, D, sum(amt))
(A, E, sum(amt))
(A, F, sum(amt))

使得生成的数据框看起来像:

df_grouped
----------------------
A     field    value   amt
"A1"    "B"     "B1"    1
"A2"    "B"     "B2"    2
"A1"    "C"     "C1"    1
"A2"    "C"     "C2"    2
"A1"    "D"     "D1"    1
"A2"    "D"     "D2"    2

我的尝试如下:

val cols = Vector("B","C","D","E","F")
//code for creating empty data frame with structs for the cols A, field, value and act
for (col <- cols)
   empty_df = empty_df.union (df.groupBy($"A",col)
  .agg(sum(amt).as(amt)
  .withColumn("field",lit(col)
  .withColumnRenamed(col, "value"))

我觉得“for”或“foreach”的用法对于像 spark 这样的分布式环境可能很笨拙。对于我正在做的事情,地图功能是否有任何替代方案?在我看来, aggregateByKey 和 collect_list 可能有效;但是,我无法想象一个完整的解决方案。请指教。

【问题讨论】:

您只是想取消旋转B,C,D,E,F 的值吗? sum(amt)在扮演什么角色? 我的原始数据框是一个大集合。为了使其简单易懂,我将其压缩为几行。由于它很大,我认为 for 循环在内存使用方面可能不是最好的方法,所以 foldleft 可能会更好。 【参考方案1】:

foldLeft 是在 Scala 中设计的非常强大的函数,如果您知道如何使用它。我建议您使用foldLeft 函数(我已评论了代码的清晰性和解释

//selecting the columns without A and amt
val columnsForAggregation = df.columns.tail.toSet - "amt"

//creating an empty dataframe (format for final output
val finalDF = Seq(("empty", "empty", "empty", 0.0)).toDF("A", "field", "value", "amt")

//using foldLeft for the aggregation and merging each aggreted results
import org.apache.spark.sql.functions._
val (originaldf, transformeddf) = columnsForAggregation.foldLeft((df, finalDF))(tempdf, column) => 
  //aggregation on the dataframe with A and one of the column and finally selecting as required in the outptu
  val aggregatedf = tempdf._1.groupBy("A", column).agg(sum("amt").as("amt"))
    .select(col("A"), lit(column).as("field"), col(column).as("value"), col("amt"))
  //union the aggregated results and transferring dataframes for next loop
  (df, tempdf._2.union(aggregatedf))



//finally removing the dummy row created
transformeddf.filter(col("A") =!= "empty")
  .show(false)

你应该有你想要的数据框

+---+-----+-----+---+
|A  |field|value|amt|
+---+-----+-----+---+
|A1 |E    |E1   |1.0|
|A2 |E    |E2   |2.0|
|A1 |F    |F1   |1.0|
|A2 |F    |F2   |2.0|
|A2 |B    |B2   |2.0|
|A1 |B    |B1   |1.0|
|A2 |C    |C2   |2.0|
|A1 |C    |C1   |1.0|
|A1 |D    |D1   |1.0|
|A2 |D    |D2   |2.0|
+---+-----+-----+---+

希望回答对你有帮助

上述foldLeft函数的简明形式

import org.apache.spark.sql.functions._
val (originaldf, transformeddf) = columnsForAggregation.foldLeft((df, finalDF))(tempdf, column) => 
  (df, tempdf._2.union(tempdf._1.groupBy("A", column).agg(sum("amt").as("amt")).select(col("A"), lit(column).as("field"), col(column).as("value"), col("amt"))))

【讨论】:

这样更优雅。谢谢你。 foldleft 方法在内存使用方面也更好吗? 是的,我想是的。如果它帮助您考虑接受和支持 :) 我的原始数据框是一个大集合。为了使其简单易懂,我将其压缩为几行。由于它很大,我认为 for 循环在内存使用方面可能不是最好的方法,所以 foldleft 可能会更好。 去吧。测试并观察;)

以上是关于在 spark 数据框中的几列上替代 groupBy的主要内容,如果未能解决你的问题,请参考以下文章

从数据框中删除“重复”行(它们在几列中有所不同)[重复]

如何将数据框的几列与其其他列进行比较

如何在数据框中的每一列上运行 udf?

在R中的几列中获取月度均值的有效方法

从 spark 数据框中的列生成不同的值

Spark 动态 DAG 比硬编码的 DAG 慢很多