如何取消嵌套具有以下类型的 spark rdd ((String, scala.collection.immutable.Map[String,scala.collection.immutable.M

Posted

技术标签:

【中文标题】如何取消嵌套具有以下类型的 spark rdd ((String, scala.collection.immutable.Map[String,scala.collection.immutable.Map[String,Int]]))【英文标题】:How to un-nest a spark rdd that has the following type ((String, scala.collection.immutable.Map[String,scala.collection.immutable.Map[String,Int]])) 【发布时间】:2015-06-16 15:59:57 【问题描述】:

当我将它打印到屏幕上时,它是一个嵌套地图,内容如下

(5, Map ( "ABCD" -> Map("3200" -> 3,
                    "3350.800" -> 4, 
                    "200.300" -> 3)
 (1, Map ( "DEF" -> Map("1200" -> 32,
                        "1320.800" -> 4, 
                        "2100" -> 3)

我需要这样的东西

Case Class( 5, ABCD 3200, 3)
Case Class(5, ABCD 3350.800, 4)
CaseClass(5,ABCD., 200.300, 3)
CaseClass(1, DEF 1200, 32)
CaseClass(1 DEF, 1320.800, 4)

等等等等。 基本上是一个案例类列表

并将其映射到案例类对象,以便我可以将其保存到 cassandra。 我尝试过做 flatMapValues ,但它只将地图嵌套一层。还使用了 flatMap 。这也不起作用,或者我犯了错误

有什么建议吗?

【问题讨论】:

您的示例输出没有意义。你能把它写成scala类型吗? List(5, ABCD,3200, 3) 或类似的东西。 我希望它们作为案例类列表@JustinPihony 【参考方案1】:

使用 for-comprehension 和一些模式匹配来解构事物相当简单:

 val in = List((5, Map ( "ABCD" -> Map("3200" -> 3,  "3350.800" -> 4, "200.300" -> 3))),
               (1, Map ("DEF" -> Map("1200" -> 32, "1320.800" -> 4, "2100" -> 3))))

case class Thing(a:Int, b:String, c:String, d:Int)

 for   (index, m) <- in
        (k,v) <-m
        (innerK, innerV) <- v
        yield Thing(index, k, innerK, innerV) 

//> res0: List[maps.maps2.Thing] = List(Thing(5,ABCD,3200,3), 
//                                      Thing(5,ABCD,3350.800,4),
//                                      Thing(5,ABCD,200.300,3), 
//                                      Thing(1,DEF,1200,32),
//                                      Thing(1,DEF,1320.800,4),
//                                      Thing(1,DEF,2100,3))

所以让我们挑选一部分进行理解

(index, m) <- in

这是一样的

t <- in
(index, m) = t

在第一行t 将被依次设置为in 的每个元素。 t 因此是一个元组 (Int, Map(...)) 模式匹配让我们将元组的“模式”放在右侧,编译器将元组分开,将index 设置为 Int,将m 设置为 Map。

(k,v) <-m

和之前一样,这等价于

u <-m
(k, v) = u

这一次u 获取 Map 的每个元素。这又是键和值的元组。所以k被依次设置为每个键,v被设置为值。

v 是你的内部地图,所以我们对内部地图再次做同样的事情

(innerK, innerV) <- v

现在我们拥有了创建案例类所需的一切。 yield 只是说每次通过循环收集“产生”的任何东西。

yield Thing(index, k, innerK, innerV) 

在后台,这只是转换为一组地图/平面地图

yield 就是 Thing(index, k, innerK, innerV) 的值

我们为v 的每个元素获取其中一个

v.mapx=>val (innerK, innerV) = t;Thing(index, k, innerK, innerV)

但是外部地图的每个元素都有一个内部地图

m.flatMapy=>val (k, v) = y;v.mapx=>val (innerK, innerV) = t;Thing(index, k, innerK, innerV)

flatMap,因为如果我们刚刚执行map,我们会得到一个列表列表,并且我们希望将其展平为项目列表)

同样,我们对 List 中的每个元素都执行其中一项操作

in.flatMap (z => val (index, m) = z; m.flatMapy=>val (k, v) = y;v.mapx=>val (innerK, innerV) = t;Thing(index, k, innerK, innerV)

让我们在_1_2 style-y 中这样做。

in.flatMap (z=> z._2.flatMapy=>y._2.mapx=>;Thing(z._1, y._1, x._1, x._2)

产生完全相同的结果。但是作为理解不是更清楚吗?

【讨论】:

谢谢@Paul ....你的回答对我来说有点难以理解..你能解释一下正在发生的事情吗...我对 scala/模式匹配相当陌生特别是 如果你不知道模式匹配,你会错过一些很酷的工具,所以我建议你阅读它。我会添加一个解释 我确信 .. 会阅读@paul 对涉及模式匹配示例的博客页面的任何建议。你的回答现在完全有道理。谢谢解释 任何体面的 Scala 书籍都会涵盖这一点。我不知道有什么特别推荐的。 @The Archetypal Paul 你能帮忙并建议如何处理这个***.com/questions/62036791/…【参考方案2】:

如果你喜欢收集操作,你可以这样做

    case class Record(v1: Int, v2: String, v3: Double, v4: Int)

    val data = List(
      (5, Map ( "ABC" ->
        Map(
          3200. -> 3,
          3350.800 -> 4,
          200.300 -> 3))
        ),
      (1, Map ( "DEF" ->
        Map(
          1200. -> 32,
          1320.800 -> 4,
          2100. -> 3))
        )
    )

    val rdd = sc.parallelize(data)

    val result = rdd.flatMap(p => 
      p._2.toList
        .flatMap(q => q._2.toList.map(l => (q._1, l)))
        .map((p._1, _))
    ).map(p => Record(p._1, p._2._1, p._2._2._1, p._2._2._2))

    println(result.collect.toList)
    //List(
    //  Record(5,ABC,3200.0,3),
    //  Record(5,ABC,3350.8,4),
    //  Record(5,ABC,200.3,3),
    //  Record(1,DEF,1200.0,32),
    //  Record(1,DEF,1320.8,4),
    //  Record(1,DEF,2100.0,3)
    //)

【讨论】:

谢谢@user52045 我在一小时前从***.com/questions/30080136/scala-spark-array-mapping 发现了这一点,忘记写答案了.... 无论如何,谢谢你,我在做同样的事情。 可以缩短。您无需在映射之前将地图转换为列表。 是的,我也正要指出这一点......你的答案都是正确的,但保罗的回答在他的解释之后更具可读性。谢谢@Paul

以上是关于如何取消嵌套具有以下类型的 spark rdd ((String, scala.collection.immutable.Map[String,scala.collection.immutable.M的主要内容,如果未能解决你的问题,请参考以下文章

如何知道 Spark 使用 Scala 推断出的 RDD 类型是啥

如何取消嵌套混合类型的元组? [复制]

如何解决嵌套地图函数中的 SPARK-5063

在 Spark 中何时持久化以及何时取消持久化 RDD

将带有嵌套标签的 XML 读入 Spark RDD,并转换为 JSON

Spark - 嵌套 RDD 操作