Spark:数据帧聚合(Scala)
Posted
技术标签:
【中文标题】Spark:数据帧聚合(Scala)【英文标题】:Spark: DataFrame Aggregation (Scala) 【发布时间】:2018-01-24 07:01:15 【问题描述】:我有以下要求在 scala 中聚合 Spark 数据帧上的数据。 而且,我有两个数据集。
数据集 1 包含分布在几个不同列上的每个“t”类型的值 (val1, val2..),例如 (t1,t2...)。
val data1 = Seq(
("1","111",200,"221",100,"331",1000),
("2","112",400,"222",500,"332",1000),
("3","113",600,"223",1000,"333",1000)
).toDF("id1","t1","val1","t2","val2","t3","val3")
data1.show()
+---+---+----+---+----+---+----+
|id1| t1|val1| t2|val2| t3|val3|
+---+---+----+---+----+---+----+
| 1|111| 200|221| 100|331|1000|
| 2|112| 400|222| 500|332|1000|
| 3|113| 600|223|1000|333|1000|
+---+---+----+---+----+---+----+
数据集 2 代表相同的事物,每个“t”类型都有一个单独的行。
val data2 = Seq(("1","111",200),("1","221",100),("1","331",1000),
("2","112",400),("2","222",500),("2","332",1000),
("3","113",600),("3","223",1000), ("3","333",1000)
).toDF("id*","t*","val*")
data2.show()
+---+---+----+
|id*| t*|val*|
+---+---+----+
| 1|111| 200|
| 1|221| 100|
| 1|331|1000|
| 2|112| 400|
| 2|222| 500|
| 2|332|1000|
| 3|113| 600|
| 3|223|1000|
| 3|333|1000|
+---+---+----+
现在,我需要 groupBY(id,t,t*) 字段并将 sum(val) 和 sum(val*) 的余额打印为单独的记录。 并且两个余额应该相等。
My output should look like below:
+---+---+--------+---+---------+
|id1| t |sum(val)| t*|sum(val*)|
+---+---+--------+---+---------+
| 1|111| 200|111| 200|
| 1|221| 100|221| 100|
| 1|331| 1000|331| 1000|
| 2|112| 400|112| 400|
| 2|222| 500|222| 500|
| 2|332| 1000|332| 1000|
| 3|113| 600|113| 600|
| 3|223| 1000|223| 1000|
| 3|333| 1000|333| 1000|
+---+---+--------+---+---------+
我正在考虑将 dataset1 分解为每个“t”类型的多个记录,然后与 dataset2 连接。 但是您能否建议我一种更好的方法,如果数据集变大不会影响性能?
【问题讨论】:
【参考方案1】:最简单的解决方案是先进行子选择,然后合并数据集:
val ts = Seq(1, 2, 3)
val dfs = ts.map (t => data1.select("t" + t as "t", "v" + t as "v"))
val unioned = dfs.drop(1).foldLeft(dfs(0))((l, r) => l.union(r))
val ds = unioned.join(df2, 't === col("t*")
here aggregation
你也可以尝试使用explode数组:
val df1 = data1.withColumn("colList", array('t1, 't2, 't3))
.withColumn("t", explode(colList))
.select('t, 'id1 as "id")
val ds = df2.withColumn("val",
when('t === 't1, 'val1)
.when('t === 't2, 'val2)
.when('t === 't3, 'val3)
.otherwise(0))
最后一步是将这个 Dataset 与 data2 连接起来:
ds.join(data2, 't === col("t*"))
.groupBy("t", "t*")
.agg(first("id1") as "id1", sum(val), sum("val*"))
【讨论】:
以上是关于Spark:数据帧聚合(Scala)的主要内容,如果未能解决你的问题,请参考以下文章
Spark多个动态聚合函数,countDistinct不起作用
如何使用 spark-scala 在 spark 数据帧上执行枢轴?