Spark 数据集连接和聚合列
Posted
技术标签:
【中文标题】Spark 数据集连接和聚合列【英文标题】:Spark Dataset Join and Aggregate columns 【发布时间】:2019-07-25 22:36:38 【问题描述】:我有三个相同类型的 Spark 数据集 A
case class A(col_a: String, col_b: Int, col_c: Int, col_d: Int, col_e: Int)
val ds_one = Dataset[A](Seq(a, 12, 0, 0, 0), Seq(b, 11, 0, 0, 0))
val ds_two = Dataset[A](Seq(a, 0, 16, 0, 0), Seq(b, 0, 73, 0, 0))
val ds_three = Dataset[A](Seq(a, 0, 0, 9, 0), Seq(b, 0, 0, 64, 0))
如何将三个数据集缩减为一个数据集[A]:
ds_combined = Dataset[A](Seq(a,12,16,9,0), Seq(b,11,73,64,0))
【问题讨论】:
【参考方案1】:看起来您只是按col_a
分组并获得最大值
import org.apache.spark.sql.expressions._
import org.apache.spark.sql.functions._
case class A(col_a: String, col_b: Int, col_c: Int, col_d: Int, col_e: Int)
val ds_one = Seq(A("a", 12, 0, 0, 0), A("b", 11, 0, 0, 0)).toDS
val ds_two = Seq(A("a", 0, 16, 0, 0), A("b", 0, 73, 0, 0)).toDS
val ds_three = Seq(A("a", 0, 0, 9, 0), A("b", 0, 0, 64, 0)).toDS
val ds_union = ds_one.union(ds_two).union(ds_three)
val ds_combined = ds_union
.groupBy("col_a")
.agg(max("col_b").alias("col_b")
, max("col_c").alias("col_c")
, max("col_d").alias("col_d")
, max("col_e").alias("col_e"))
.as[A]
ds_combined.show
ds_combined:org.apache.spark.sql.Dataset[A]
+-----+-----+-----+-----+-----+
|col_a|col_b|col_c|col_d|col_e|
+-----+-----+-----+-----+-----+
| b| 11| 73| 64| 0|
| a| 12| 16| 9| 0|
+-----+-----+-----+-----+-----+
【讨论】:
以上是关于Spark 数据集连接和聚合列的主要内容,如果未能解决你的问题,请参考以下文章
在数值最近的索引、完全外连接、聚合列上合并 Pandas 时间序列数据集到最大值