我如何将平面数据框转换为 spark(scala 或 java)中的嵌套 json

Posted

技术标签:

【中文标题】我如何将平面数据框转换为 spark(scala 或 java)中的嵌套 json【英文标题】:how do i convert a flat dataframe into a nested json in spark (scala or java) 【发布时间】:2017-09-19 14:56:45 【问题描述】:

我有 sql 查询,它返回数据帧中可用的这样的数据集

id,type,name,ppu,batter.id,batter.type,topping.id,topping.type
101,donut,cake,0_55,1001,Regular,5001,None
101,donut,cake,0_55,1002,Chocolate,5001,None
101,donut,cake,0_55,1003,Blueberry,5001,None
101,donut,cake,0_55,1004,Devil's Food,5001,None
101,donut,cake,0_55,1001,Regular,5002,Glazed
101,donut,cake,0_55,1002,Chocolate,5002,Glazed
101,donut,cake,0_55,1003,Blueberry,5002,Glazed
101,donut,cake,0_55,1004,Devil's Food,5002,Glazed
101,donut,cake,0_55,1001,Regular,5003,Chocolate
101,donut,cake,0_55,1002,Chocolate,5003,Chocolate
101,donut,cake,0_55,1003,Blueberry,5003,Chocolate
101,donut,cake,0_55,1004,Devil's Food,5003,Chocolate

我需要将它覆盖到这样的嵌套 json 结构中。


    "id": "101",
    "type": "donut",
    "name": "Cake",
    "ppu": 0.55,
    "batter":
        [
             "id": "1001", "type": "Regular" ,
             "id": "1002", "type": "Chocolate" ,
             "id": "1003", "type": "Blueberry" ,
             "id": "1004", "type": "Devil's Food" 
        ],
    "topping":
        [
             "id": "5001", "type": "None" ,
             "id": "5002", "type": "Glazed" ,
             "id": "5003", "type": "Chocolate" 
        ]

我们是否有可能在我必须编写的 Dataframe 聚合或自定义转换中执行此操作。

在这里找到类似的问题 Writing nested JSON in spark scala 但没有完全正确的答案。

【问题讨论】:

【参考方案1】:

因此,显然没有直接的方法可以通过数据框 API 完成此任务。你可以使用

df.toJson...

但它不会给你想要的输出。

您必须编写一个凌乱的转换,我很想听听任何其他可能的解决方案。我假设您的结果适合内存,因为它必须带回驱动程序。另外,我在这里使用 Gson API 创建 json。

def arrToJson(arr: Array[Row]): JsonObject = 
    val jo = new JsonObject
    arr.map(row => ((row(0) + "," + row(1) + "," + row(2) + "," + row(3)),
      (row(4) + "," + row(5) + "," + row(6) + "," + row(7))))
      .groupBy(_._1).map(f => (f._1.split(","), f._2.map(_._2.split(","))))
      .foreach  x =>
        

          jo.addProperty("id", x._1(0))
          jo.addProperty("type", x._1(1))
          jo.addProperty("name", x._1(2))
          jo.addProperty("ppu", x._1(3))

          val bja = new JsonArray
          val tja = new JsonArray
          x._2.foreach(f => 
            val bjo = new JsonObject
            val tjo = new JsonObject

            bjo.addProperty("id", f(0))
            bjo.addProperty("type", f(1))

            tjo.addProperty("id", f(2))
            tjo.addProperty("type", f(3))

            bja.add(bjo)
            tja.add(tjo)
          )
          jo.add("batter", bja)
          jo.add("topping", tja)

        
      

    jo
  

【讨论】:

谢谢 Chitral,但不幸的是,在我的情况下,这可能会变得复杂,有 100 多列和至少 2 级嵌套。同样在我的情况下,数据可能不适合驱动程序,所以我希望在分布式节点中进行转换。我可以在某种程度上使用 UDAF,但只能用于只有一个嵌套对象的简单对象。我听说 Spark 2.0 对此有更好的支持。但不确定。再次感谢您的尝试:)

以上是关于我如何将平面数据框转换为 spark(scala 或 java)中的嵌套 json的主要内容,如果未能解决你的问题,请参考以下文章

在 spark 和 scala 中,如何将数据框转换或映射到特定列信息?

如何使用scala将特定函数转换为apache spark中的udf函数? [复制]

如何将 RDD [GenericRecord] 转换为 scala 中的数据框?

如何使用 Spark 数据框列上的函数或方法使用 Scala 进行转换

如何在 Scala 中将数据帧转换为 Apache Spark 中的数据集?

spark scala将嵌套的数据框转换为嵌套的数据集