如何在数据框中爆炸嵌套的json数组

Posted

技术标签:

【中文标题】如何在数据框中爆炸嵌套的json数组【英文标题】:How to explode nested json array in data frame 【发布时间】:2020-05-02 20:29:25 【问题描述】:

这是我的输入数据框结构

root
|--Name (String)
|--Version (int)
|--Details (array)

类似这样的:

"Name":"json",
"Version":1,
"Details":[
"
    \"Id\":\"123\",
    \"TaxDetails\":[\"TaxDetail1\":\"val1\", \"TaxDetail2\":\"val2\"]
",
"
    \"Id\":\"234\",
    \"TaxDetails\":[\"TaxDetail3\":\"val3\", \"TaxDetail4\":\"val4\"]
"
]

我想在 TaxDetails 级别展开这个,如下所示:

"Name":"json",
"Version":1,
"TaxDetail":\"TaxDetail1\":\"val1\"   

"Name":"json",
"Version":1,
"TaxDetail":\"TaxDetail2\":\"val2\"   

"Name":"json",
"Version":1,
"TaxDetail":\"TaxDetail3\":\"val3\"   

"Name":"json",
"Version":1,
"TaxDetail":\"TaxDetail4\":\"va4\"

我已经用这样的分解功能分解了详细信息

val explodedDetailDf = inputDf.withColumn("Detail", explode($"Details"))

现在“详细信息”列的数据类型是字符串,当我尝试这样做时:

val explodedTaxDetail = explodedDetailDf.withColumn("TaxDetail", explode($"Detail.TaxDetails"))

上述操作失败并出现错误“AnalysisException 由于数据类型不匹配:函数explode 的输入应该是数组或映射类型,而不是字符串”

如何根据名称分解嵌套的 json 数组?

【问题讨论】:

您必须先使用org.apache.spark.sql.functions 中的from_json() 函数将JSON 字符串列转为结构列。 您的 json 已损坏 嗨,你能检查一下我的答案套件吗?我没有使用我格式化和使用的现有损坏的 json 创建手动模式 我也使用了两次explode函数而没有使用from_json,这是一种常见的做法,只是看看它。如果需要我们可以进一步讨论 【参考方案1】:

explode 将采用映射或数组类型的值。但不是字符串

从您的示例 json 中,Detail.TaxDetails 的类型是字符串而不是数组。

要提取您必须使用的Detail.TaxDetails 字符串类型值

def from_json(e: org.apache.spark.sql.Column,schema: org.apache.spark.sql.types.StructType): org.apache.spark.sql.Column

Note

您的 json 已损坏,我已修改您的 json,如下所示。

scala> val json = """
     |   "Name": "json",
     |   "Version": 1,
     |   "Details": [
     |     "\"Id\":\"123\",\"TaxDetails\":[\"TaxDetail1\":\"val1\", \"TaxDetail2\":\"val2\"]",
     |     "\"Id\":\"234\",\"TaxDetails\":[\"TaxDetail3\":\"val3\", \"TaxDetail4\":\"val4\"]"
     |   ]
     | """

json: String =

  "Name": "json",
  "Version": 1,
  "Details": [
    "\"Id\":\"123\",\"TaxDetails\":[\"TaxDetail1\":\"val1\", \"TaxDetail2\":\"val2\"]",
    "\"Id\":\"234\",\"TaxDetails\":[\"TaxDetail3\":\"val3\", \"TaxDetail4\":\"val4\"]"
  ]


请查看下面的代码如何提取Detail.TaxDetails的值


scala> val df = spark.read.json(Seq(json).toDS)
df: org.apache.spark.sql.DataFrame = [Details: array<string>, Name: string ... 1 more field]

scala> df.printSchema
root
 |-- Details: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- Name: string (nullable = true)
 |-- Version: long (nullable = true)

scala> df.withColumn("details",explode($"details").as("details")).show(false) // inside details array has string values.
+----------------------------------------------------------------------+----+-------+
|details                                                               |Name|Version|
+----------------------------------------------------------------------+----+-------+
|"Id":"123","TaxDetails":["TaxDetail1":"val1", "TaxDetail2":"val2"]|json|1      |
|"Id":"234","TaxDetails":["TaxDetail3":"val3", "TaxDetail4":"val4"]|json|1      |
+----------------------------------------------------------------------+----+-------+

scala> val json = spark.read.json(Seq("""["Id": "123","TaxDetails": ["TaxDetail1": "val1","TaxDetail2": "val2"],"Id": "234","TaxDetails": ["TaxDetail3": "val3","TaxDetail4": "val4"]]""").toDS).schema.json
json: String = "type":"struct","fields":["name":"Id","type":"string","nullable":true,"metadata":,"name":"TaxDetails","type":"type":"array","elementType":"type":"struct","fields":["name":"TaxDetail1","type":"string","nullable":true,"metadata":,"name":"TaxDetail2","type":"string","nullable":true,"metadata":,"name":"TaxDetail3","type":"string","nullable":true,"metadata":,"name":"TaxDetail4","type":"string","nullable":true,"metadata":],"containsNull":true,"nullable":true,"metadata":]

scala> val schema = DataType.fromJson(json).asInstanceOf[StructType] // Creating schema for inner string
schema: org.apache.spark.sql.types.StructType = StructType(StructField(Id,StringType,true), StructField(TaxDetails,ArrayType(StructType(StructField(TaxDetail1,StringType,true), StructField(TaxDetail2,StringType,true), StructField(TaxDetail3,StringType,true), StructField(TaxDetail4,StringType,true)),true),true))

scala> spark.time(df.withColumn("details",explode($"details")).withColumn("details",from_json($"details",schema)).withColumn("id",$"details.id").withColumn("taxdetails",explode($"details.taxdetails")).select($"name",$"version",$"id",$"taxdetails.*").show(false))
+----+-------+---+----------+----------+----------+----------+
|name|version|id |TaxDetail1|TaxDetail2|TaxDetail3|TaxDetail4|
+----+-------+---+----------+----------+----------+----------+
|json|1      |123|val1      |val2      |null      |null      |
|json|1      |234|null      |null      |val3      |val4      |
+----+-------+---+----------+----------+----------+----------+


scala>

Updated

上面我已经手动获取了 json 并创建了模式。请检查以下代码以从可用数据中获取架构。

scala> spark.read.json(df.withColumn("details",explode($"details").as("details")).select("details").map(_.getAs[String](0))).printSchema
root
 |-- Id: string (nullable = true)
 |-- TaxDetails: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- TaxDetail1: string (nullable = true)
 |    |    |-- TaxDetail2: string (nullable = true)
 |    |    |-- TaxDetail3: string (nullable = true)
 |    |    |-- TaxDetail4: string (nullable = true)


scala> spark.read.json(df.withColumn("details",explode($"details").as("details")).select("details").map(_.getAs[String](0))).schema
res12: org.apache.spark.sql.types.StructType = StructType(StructField(Id,StringType,true), StructField(TaxDetails,ArrayType(StructType(StructField(TaxDetail1,StringType,true), StructField(TaxDetail2,StringType,true), StructField(TaxDetail3,StringType,true), StructField(TaxDetail4,StringType,true)),true),true))

scala> val schema = spark.read.json(df.withColumn("details",explode($"details").as("details")).select("details").map(_.getAs[String](0))).schema
schema: org.apache.spark.sql.types.StructType = StructType(StructField(Id,StringType,true), StructField(TaxDetails,ArrayType(StructType(StructField(TaxDetail1,StringType,true), StructField(TaxDetail2,StringType,true), StructField(TaxDetail3,StringType,true), StructField(TaxDetail4,StringType,true)),true),true))

scala> spark.time(df.withColumn("details",explode($"details")).withColumn("details",from_json($"details",schema)).withColumn("id",$"details.id").withColumn("taxdetails",explode($"details.taxdetails")).select($"name",$"version",$"id",$"taxdetails.*").show(false))
+----+-------+---+----------+----------+----------+----------+
|name|version|id |TaxDetail1|TaxDetail2|TaxDetail3|TaxDetail4|
+----+-------+---+----------+----------+----------+----------+
|json|1      |123|val1      |val2      |null      |null      |
|json|1      |234|null      |null      |val3      |val4      |
+----+-------+---+----------+----------+----------+----------+

Time taken: 212 ms

scala>

【讨论】:

对损坏的 json 文件感到抱歉。我从我的实际数据中创建了这些数据,并在没有检查的情况下发布了它。 Thnx @Srinivas 回答。您在创建模式时手动创建了 json。您能解释一下如何直接从 Df 获取字符串吗? 我已经手动获取了这些字符串并创建了数据框来获取架构,现在我已经更新了代码以从可用的 DF 中获取架构。 @Srinivas with out fromjson function 我做了,你可以看看。有人在没有提及原因的情况下投了反对票。 @RamGhadiyaram,我当然会检查。 Details 是一个字符串数组,所以我必须使用 from_json 函数将字符串转换为对象。在你的 json 中有 Details 是一个对象数组,所以不需要在那里使用 from_json。【参考方案2】:

由于您提供的早期 json 已损坏,我以这种方式格式化 json,您可以使用您的 explode 2 次并展平数据框。

如下实现...

 package examples

import org.apache.log4j.Level, Logger
import org.apache.spark.sql._
import org.apache.spark.sql.functions._

object JsonTest extends App 
  Logger.getLogger("org").setLevel(Level.OFF)

  private[this] implicit val spark = SparkSession.builder().master("local[*]").getOrCreate()

  import spark.implicits._

  val jsonString =
    """
      |
      |  "Name": "json",
      |  "Version": "1",
      |  "Details": [
      |    
      |      "Id": "123",
      |      "TaxDetails": [
      |        
      |          "TaxDetail1": "val1",
      |          "TaxDetail2": "val2"
      |        
      |      ]
      |    ,
      |    
      |    "Id":"234",
      |    "TaxDetails":[
      |    
      |    "TaxDetail3":"val3"
      |    , "TaxDetail4":"val4"
      |    
      |    ]
      |
      |  ]
      |
    """.stripMargin
  val df3 = spark.read.json(Seq(jsonString).toDS)
  df3.printSchema()
  df3.show(false)
  val explodedDetailDf = df3.withColumn("Detail", explode($"Details"))
  // explodedDetailDf.show(false)
  val explodedTaxDetail = explodedDetailDf.withColumn("TaxDetail", explode($"Detail.TaxDetails"))
  explodedTaxDetail.show(false)

  val finaldf = explodedTaxDetail.select($"Name", $"Version"
    , to_json(struct
    (col("TaxDetail.TaxDetail1").as("TaxDetail1"))
    ).as("TaxDetails"))
    .union(
      explodedTaxDetail.select($"Name", $"Version"
        , to_json(struct
        (col("TaxDetail.TaxDetail2").as("TaxDetail2"))
        ).as("TaxDetails"))
    )
    .union(
      explodedTaxDetail.select($"Name", $"Version"
        , to_json(struct
        (col("TaxDetail.TaxDetail3").as("TaxDetail3"))
        ).as("TaxDetails"))
    )
    .union(
      explodedTaxDetail.select($"Name", $"Version"
        , to_json(struct
        (col("TaxDetail.TaxDetail4").as("TaxDetail4"))
        ).as("TaxDetails"))
    ).filter(!($"TaxDetails" === ""))

  finaldf.show(false)
  finaldf.toJSON.show(false)


结果:

root
 |-- Details: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- Id: string (nullable = true)
 |    |    |-- TaxDetails: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- TaxDetail1: string (nullable = true)
 |    |    |    |    |-- TaxDetail2: string (nullable = true)
 |    |    |    |    |-- TaxDetail3: string (nullable = true)
 |    |    |    |    |-- TaxDetail4: string (nullable = true)
 |-- Name: string (nullable = true)
 |-- Version: string (nullable = true)

+---------------------------------------------------+----+-------+
|Details                                            |Name|Version|
+---------------------------------------------------+----+-------+
|[[123, [[val1, val2,,]]], [234, [[,, val3, val4]]]]|json|1      |
+---------------------------------------------------+----+-------+

+---------------------------------------------------+----+-------+------------------------+---------------+
|Details                                            |Name|Version|Detail                  |TaxDetail      |
+---------------------------------------------------+----+-------+------------------------+---------------+
|[[123, [[val1, val2,,]]], [234, [[,, val3, val4]]]]|json|1      |[123, [[val1, val2,,]]] |[val1, val2,,] |
|[[123, [[val1, val2,,]]], [234, [[,, val3, val4]]]]|json|1      |[234, [[,, val3, val4]]]|[,, val3, val4]|
+---------------------------------------------------+----+-------+------------------------+---------------+

+----+-------+---------------------+
|Name|Version|TaxDetails           |
+----+-------+---------------------+
|json|1      |"TaxDetail1":"val1"|
|json|1      |"TaxDetail2":"val2"|
|json|1      |"TaxDetail3":"val3"|
|json|1      |"TaxDetail4":"val4"|
+----+-------+---------------------+

您所期望的最终输出:

+----------------------------------------------------------------------+
|value                                                                 |
+----------------------------------------------------------------------+
|"Name":"json","Version":"1","TaxDetails":"\"TaxDetail1\":\"val1\""|
|"Name":"json","Version":"1","TaxDetails":"\"TaxDetail2\":\"val2\""|
|"Name":"json","Version":"1","TaxDetails":"\"TaxDetail3\":\"val3\""|
|"Name":"json","Version":"1","TaxDetails":"\"TaxDetail4\":\"val4\""|
+----------------------------------------------------------------------+

【讨论】:

以上是关于如何在数据框中爆炸嵌套的json数组的主要内容,如果未能解决你的问题,请参考以下文章

Pandas - 在数据框中的列内扩展嵌套的 json 数组

如何基于相等性检查在 Spark 中使用内部数组查询嵌套 json

将 JSON 数组嵌套到 Python Pandas DataFrame

如何从 JSON 映射嵌套数组?

如何在 Ios 中解析数组数据中的嵌套 Json 对象

如何爆炸结构数组?