通过 Tuple2 的 key 将 Tuple2 的 value 部分作为一个 map 组合成单个 map 分组
Posted
技术标签:
【中文标题】通过 Tuple2 的 key 将 Tuple2 的 value 部分作为一个 map 组合成单个 map 分组【英文标题】:Combine value part of Tuple2 which is a map, into single map grouping by the key of Tuple2 【发布时间】:2020-12-18 04:42:00 【问题描述】:我在 Scala 和 Spark 中这样做。
我有 Dataset
的 Tuple2
和 Dataset[(String, Map[String, String])]
。
下面是Dataset
中的值示例。
(A, 1->100, 2->200, 3->100)
(B, 1->400, 4->300, 5->900)
(C, 6->100, 4->200, 5->100)
(B, 1->500, 9->300, 11->900)
(C, 7->100, 8->200, 5->800)
如果您注意到,元组的键或第一个元素可以重复。另外,同一个 Tuple 的 key 对应的 map 中可以有重复的 key(Tuple2 的第二部分)。
我想创建一个最终的Dataset[(String, Map[String, String])]
。并且输出应该如下(来自上面的例子)。此外,地图的最后一个键的值会被保留(检查 B 和 C),并且之前针对 B 和 C 的相同键会被删除。
(A, 1->100, 2->200, 3->100)
(B, 4->300, 1->500, 9->300, 11->900, 5->900)
(C, 6->100, 4->200, 7->100, 8->200, 5->800)
如果需要任何说明,请告诉我。
【问题讨论】:
【参考方案1】:使用数据框:
val df = Seq(("A", Map(1 -> 100, 2 -> 200, 3 -> 100)),
("B", Map(1 -> 400, 4 -> 300, 5 -> 900)),
("C", Map(6 -> 100, 4 -> 200, 5 -> 100)),
("B", Map(1 -> 500, 9 -> 300, 11 -> 900)),
("C", Map(7 -> 100, 8 -> 200, 5 -> 800))).toDF("a", "b")
val df2 = df.select('a, explode('b))
.groupBy("a", "key") //remove the duplicate keys
.agg(last('value).as("value")) //and take the last value for duplicate keys
.groupBy("a")
.agg(map_from_arrays(collect_list('key), collect_list('value)).as("b"))
df2.show()
打印
+---+---------------------------------------------------+
|a |b |
+---+---------------------------------------------------+
|B |[5 -> 900, 9 -> 300, 1 -> 500, 4 -> 300, 11 -> 900]|
|C |[6 -> 100, 8 -> 200, 7 -> 100, 4 -> 200, 5 -> 800] |
|A |[3 -> 100, 1 -> 100, 2 -> 200] |
+---+---------------------------------------------------+
由于涉及到两个聚合,基于 rdd 的 answer 可能会更快
【讨论】:
【参考方案2】:通过使用rdd,
val rdd = sc.parallelize(
Seq(("A", Map(1->100, 2->200, 3->100)),
("B", Map(1->400, 4->300, 5->900)),
("C", Map(6->100, 4->200, 5->100)),
("B", Map(1->500, 9->300, 11->900)),
("C", Map(7->100, 8->200, 5->800)))
)
rdd.reduceByKey((a, b) => a ++ b).collect()
// Array((A,Map(1 -> 100, 2 -> 200, 3 -> 100)), (B,Map(5 -> 900, 1 -> 500, 9 -> 300, 11 -> 900, 4 -> 300)), (C,Map(5 -> 800, 6 -> 100, 7 -> 100, 8 -> 200, 4 -> 200)))
并使用数据框,
val df = spark.createDataFrame(
Seq(("A", Map(1->100, 2->200, 3->100)),
("B", Map(1->400, 4->300, 5->900)),
("C", Map(6->100, 4->200, 5->100)),
("B", Map(1->500, 9->300, 11->900)),
("C", Map(7->100, 8->200, 5->800)))
).toDF("key", "map")
spark.conf.set("spark.sql.mapKeyDedupPolicy","LAST_WIN")
df.withColumn("map", map_entries($"map"))
.groupBy("key").agg(collect_list($"map").alias("map"))
.withColumn("map", flatten($"map"))
.withColumn("map", map_from_entries($"map")).show(false)
+---+---------------------------------------------------+
|key|map |
+---+---------------------------------------------------+
|B |[1 -> 500, 4 -> 300, 5 -> 900, 9 -> 300, 11 -> 900]|
|C |[6 -> 100, 4 -> 200, 5 -> 800, 7 -> 100, 8 -> 200] |
|A |[1 -> 100, 2 -> 200, 3 -> 100] |
+---+---------------------------------------------------+
【讨论】:
谢谢。会试试这个。我可以使用 DF/DS 执行此操作吗? @vijayinani 是的 Datasets 也有groupBy
和 reduceValues
函数。以上是关于通过 Tuple2 的 key 将 Tuple2 的 value 部分作为一个 map 组合成单个 map 分组的主要内容,如果未能解决你的问题,请参考以下文章
scala tuple1 tuple2 tuple3 有啥不同
Scala Tuple2Zipped vs IterableLike zip
写入数据帧时出错:java.lang.RuntimeException:scala.Tuple2 不是 struct<retailer:string,postcode:int> 架构的有效