在 spark 数据框中的几列上替代 groupBy
Posted
技术标签:
【中文标题】在 spark 数据框中的几列上替代 groupBy【英文标题】:Alternative to groupBy on several columns in spark dataframe 【发布时间】:2018-04-05 03:18:39 【问题描述】:我有一个带有如下列的 spark 数据框:
df
--------------------------
A B C D E F amt
"A1" "B1" "C1" "D1" "E1" "F1" 1
"A2" "B2" "C2" "D2" "E2" "F2" 2
我想使用列组合执行 groupBy
(A, B, sum(amt))
(A, C, sum(amt))
(A, D, sum(amt))
(A, E, sum(amt))
(A, F, sum(amt))
使得生成的数据框看起来像:
df_grouped
----------------------
A field value amt
"A1" "B" "B1" 1
"A2" "B" "B2" 2
"A1" "C" "C1" 1
"A2" "C" "C2" 2
"A1" "D" "D1" 1
"A2" "D" "D2" 2
我的尝试如下:
val cols = Vector("B","C","D","E","F")
//code for creating empty data frame with structs for the cols A, field, value and act
for (col <- cols)
empty_df = empty_df.union (df.groupBy($"A",col)
.agg(sum(amt).as(amt)
.withColumn("field",lit(col)
.withColumnRenamed(col, "value"))
我觉得“for”或“foreach”的用法对于像 spark 这样的分布式环境可能很笨拙。对于我正在做的事情,地图功能是否有任何替代方案?在我看来, aggregateByKey 和 collect_list 可能有效;但是,我无法想象一个完整的解决方案。请指教。
【问题讨论】:
您只是想取消旋转B,C,D,E,F
的值吗? sum(amt)
在扮演什么角色?
我的原始数据框是一个大集合。为了使其简单易懂,我将其压缩为几行。由于它很大,我认为 for 循环在内存使用方面可能不是最好的方法,所以 foldleft 可能会更好。
【参考方案1】:
foldLeft
是在 Scala 中设计的非常强大的函数,如果您知道如何使用它。我建议您使用foldLeft
函数(我已评论了代码的清晰性和解释)
//selecting the columns without A and amt
val columnsForAggregation = df.columns.tail.toSet - "amt"
//creating an empty dataframe (format for final output
val finalDF = Seq(("empty", "empty", "empty", 0.0)).toDF("A", "field", "value", "amt")
//using foldLeft for the aggregation and merging each aggreted results
import org.apache.spark.sql.functions._
val (originaldf, transformeddf) = columnsForAggregation.foldLeft((df, finalDF))(tempdf, column) =>
//aggregation on the dataframe with A and one of the column and finally selecting as required in the outptu
val aggregatedf = tempdf._1.groupBy("A", column).agg(sum("amt").as("amt"))
.select(col("A"), lit(column).as("field"), col(column).as("value"), col("amt"))
//union the aggregated results and transferring dataframes for next loop
(df, tempdf._2.union(aggregatedf))
//finally removing the dummy row created
transformeddf.filter(col("A") =!= "empty")
.show(false)
你应该有你想要的数据框
+---+-----+-----+---+
|A |field|value|amt|
+---+-----+-----+---+
|A1 |E |E1 |1.0|
|A2 |E |E2 |2.0|
|A1 |F |F1 |1.0|
|A2 |F |F2 |2.0|
|A2 |B |B2 |2.0|
|A1 |B |B1 |1.0|
|A2 |C |C2 |2.0|
|A1 |C |C1 |1.0|
|A1 |D |D1 |1.0|
|A2 |D |D2 |2.0|
+---+-----+-----+---+
希望回答对你有帮助
上述foldLeft
函数的简明形式是
import org.apache.spark.sql.functions._
val (originaldf, transformeddf) = columnsForAggregation.foldLeft((df, finalDF))(tempdf, column) =>
(df, tempdf._2.union(tempdf._1.groupBy("A", column).agg(sum("amt").as("amt")).select(col("A"), lit(column).as("field"), col(column).as("value"), col("amt"))))
【讨论】:
这样更优雅。谢谢你。 foldleft 方法在内存使用方面也更好吗? 是的,我想是的。如果它帮助您考虑接受和支持 :) 我的原始数据框是一个大集合。为了使其简单易懂,我将其压缩为几行。由于它很大,我认为 for 循环在内存使用方面可能不是最好的方法,所以 foldleft 可能会更好。 去吧。测试并观察;)以上是关于在 spark 数据框中的几列上替代 groupBy的主要内容,如果未能解决你的问题,请参考以下文章