如何使用group by聚合spark中的结构数组
Posted
技术标签:
【中文标题】如何使用group by聚合spark中的结构数组【英文标题】:How to aggregate an array of struct in spark with group by 【发布时间】:2019-09-02 12:48:46 【问题描述】:我使用的是 Spark 2.1。我有一个具有此架构的数据框:
scala> df.printSchema
|-- id: integer (nullable = true)
|-- sum: integer (nullable = true)
|-- distribution: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- lower: integer (nullable = true)
| | |-- upper: integer (nullable = true)
| | |-- count: integer (nullable = true)
我要聚合:
按“id”列分组 “分布”中“sum”和“count”的总和(按“lower”和“upper”分组)在这里我不能分解数据框,因为我会有重复的行并且不能做“sum”列的总和。一种可能性是分别对分布进行求和和聚合,然后通过“id”加入,但用户定义的函数会更简单。
作为输入,我有:
scala> df.show(false)
+---+---+------------------------------------------------------------+
|id |sum|distribution |
+---+---+------------------------------------------------------------+
|1 |1 |[[0,1,2]] |
|1 |1 |[[1,2,5]] |
|1 |7 |[[0,1,1], [1,2,6]] |
|1 |7 |[[0,1,5], [1,2,1], [2,3,1]] |
|2 |1 |[[0,1,1]] |
|2 |2 |[[0,1,1], [1,2,1]] |
|2 |1 |[[0,1,1]] |
|2 |1 |[[2,3,1]] |
|2 |1 |[[0,1,1]] |
|2 |4 |[[0,1,1], [1,2,1], [2,3,1], [3,4,1]] |
+---+---+------------------------------------------------------------+
预期输出:
+---+---+------------------------------------------------------------+
|id |sum|distribution |
+---+---+------------------------------------------------------------+
|1 |16 |[[0,1,8], [1,2,12], [2,3,1]] |
|2 |10 |[[0,1,5], [1,2,2], [2,3,3], [3,4,1]] |
+---+---+------------------------------------------------------------+
【问题讨论】:
【参考方案1】:你可以使用这个UDF:
import org.apache.spark.sql.expressions.UserDefinedFunction
import org.apache.spark.sql.Column, DataFrame, Row, SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.ArrayType, IntegerType, StructField, StructType
# schema to be used in the UDF to have this format in output this format as output of your
val schema: ArrayType = ArrayType(StructType(Seq(
StructField("lower", IntegerType, false),
StructField("upper", IntegerType, false),
StructField("count", IntegerType, false)
)))
val customAggregation = udf((xs: Seq[Seq[Row]]) =>
xs.flatten.map(row => (
row.getAs[Int]("lower"),
row.getAs[Int]("upper"),
row.getAs[Int]("count")
))
.groupBy(x => (x._1, x._2))
.mapValues(_.map(_._3).sum).toSeq
.map(x => (x._1._1, x._1._2, x._2)), schema
)
val df: DataFrame = df_input
.groupBy("id")
.agg(sum("sum"),collect_list("distribution"))
.toDF("id", "sum" ,"distribution")
.withColumn("distribution_agg", flatten(col("distribution")))
结果将是
scala> dfOutput.select("id","sum","distribution_agg").show
+---+---+------------------------------------------------------------+
|id |sum|distribution |
+---+---+------------------------------------------------------------+
|1 |16 |[[0,1,8], [1,2,12], [2,3,1]] |
|2 |10 |[[0,1,5], [1,2,2], [2,3,3], [3,4,1]] |
+---+---+------------------------------------------------------------+
【讨论】:
以上是关于如何使用group by聚合spark中的结构数组的主要内容,如果未能解决你的问题,请参考以下文章
Spark Window 聚合与 Group By/Join 性能
Spark Scala数据框具有单个Group By的多个聚合[重复]
Spark SQL 可以在 GROUP BY 聚合中使用 FIRST_VALUE 和 LAST_VALUE(但这不是标准的)
使用 Spark SQL GROUP BY 对 DataFrame 进行高效的 PairRDD 操作
Apache Spark Group By(获取组中的第一个和最后一个值)
org.apache.spark.sql.AnalysisException:表达式 't2.`sum_click_passed`' 既不在 group by 中,也不是聚合函数