通过 Tuple2 的 key 将 Tuple2 的 value 部分作为一个 map 组合成单个 map 分组

Posted

技术标签:

【中文标题】通过 Tuple2 的 key 将 Tuple2 的 value 部分作为一个 map 组合成单个 map 分组【英文标题】:Combine value part of Tuple2 which is a map, into single map grouping by the key of Tuple2 【发布时间】:2020-12-18 04:42:00 【问题描述】:

我在 Scala 和 Spark 中这样做。

我有 DatasetTuple2Dataset[(String, Map[String, String])]

下面是Dataset 中的值示例。

(A, 1->100, 2->200, 3->100)
(B, 1->400, 4->300, 5->900)
(C, 6->100, 4->200, 5->100)
(B, 1->500, 9->300, 11->900)
(C, 7->100, 8->200, 5->800)

如果您注意到,元组的键或第一个元素可以重复。另外,同一个 Tuple 的 key 对应的 map 中可以有重复的 key(Tuple2 的第二部分)。

我想创建一个最终的Dataset[(String, Map[String, String])]。并且输出应该如下(来自上面的例子)。此外,地图的最后一个键的值会被保留(检查 B 和 C),并且之前针对 B 和 C 的相同键会被删除。

(A, 1->100, 2->200, 3->100)
(B, 4->300, 1->500, 9->300, 11->900, 5->900)
(C, 6->100, 4->200, 7->100, 8->200, 5->800)

如果需要任何说明,请告诉我。

【问题讨论】:

【参考方案1】:

使用数据框:

val df = Seq(("A", Map(1 -> 100, 2 -> 200, 3 -> 100)),
    ("B", Map(1 -> 400, 4 -> 300, 5 -> 900)),
    ("C", Map(6 -> 100, 4 -> 200, 5 -> 100)),
    ("B", Map(1 -> 500, 9 -> 300, 11 -> 900)),
    ("C", Map(7 -> 100, 8 -> 200, 5 -> 800))).toDF("a", "b")

val df2 = df.select('a, explode('b))
    .groupBy("a", "key")           //remove the duplicate keys
    .agg(last('value).as("value")) //and take the last value for duplicate keys
    .groupBy("a")
    .agg(map_from_arrays(collect_list('key), collect_list('value)).as("b"))
df2.show()

打印

+---+---------------------------------------------------+
|a  |b                                                  |
+---+---------------------------------------------------+
|B  |[5 -> 900, 9 -> 300, 1 -> 500, 4 -> 300, 11 -> 900]|
|C  |[6 -> 100, 8 -> 200, 7 -> 100, 4 -> 200, 5 -> 800] |
|A  |[3 -> 100, 1 -> 100, 2 -> 200]                     |
+---+---------------------------------------------------+

由于涉及到两个聚合,基于 rdd 的 answer 可能会更快

【讨论】:

【参考方案2】:

通过使用rdd,

val rdd = sc.parallelize(
    Seq(("A", Map(1->100, 2->200, 3->100)),
        ("B", Map(1->400, 4->300, 5->900)),
        ("C", Map(6->100, 4->200, 5->100)),
        ("B", Map(1->500, 9->300, 11->900)),
        ("C", Map(7->100, 8->200, 5->800)))
)

rdd.reduceByKey((a, b) => a ++ b).collect()

// Array((A,Map(1 -> 100, 2 -> 200, 3 -> 100)), (B,Map(5 -> 900, 1 -> 500, 9 -> 300, 11 -> 900, 4 -> 300)), (C,Map(5 -> 800, 6 -> 100, 7 -> 100, 8 -> 200, 4 -> 200)))

并使用数据框,

val df = spark.createDataFrame(
    Seq(("A", Map(1->100, 2->200, 3->100)),
        ("B", Map(1->400, 4->300, 5->900)),
        ("C", Map(6->100, 4->200, 5->100)),
        ("B", Map(1->500, 9->300, 11->900)),
        ("C", Map(7->100, 8->200, 5->800)))
).toDF("key", "map")

spark.conf.set("spark.sql.mapKeyDedupPolicy","LAST_WIN")

df.withColumn("map", map_entries($"map"))
  .groupBy("key").agg(collect_list($"map").alias("map"))
  .withColumn("map", flatten($"map"))
  .withColumn("map", map_from_entries($"map")).show(false)

+---+---------------------------------------------------+
|key|map                                                |
+---+---------------------------------------------------+
|B  |[1 -> 500, 4 -> 300, 5 -> 900, 9 -> 300, 11 -> 900]|
|C  |[6 -> 100, 4 -> 200, 5 -> 800, 7 -> 100, 8 -> 200] |
|A  |[1 -> 100, 2 -> 200, 3 -> 100]                     |
+---+---------------------------------------------------+

【讨论】:

谢谢。会试试这个。我可以使用 DF/DS 执行此操作吗? @vijayinani 是的 Datasets 也有 groupByreduceValues 函数。

以上是关于通过 Tuple2 的 key 将 Tuple2 的 value 部分作为一个 map 组合成单个 map 分组的主要内容,如果未能解决你的问题,请参考以下文章

scala tuple1 tuple2 tuple3 有啥不同

Scala Tuple2Zipped vs IterableLike zip

写入数据帧时出错:java.lang.RuntimeException:scala.Tuple2 不是 struct<retailer:string,postcode:int> 架构的有效

Python之元组

JavaPairRDD 到 SPARK 中的数据集<Row>

6.Pair RDD操作