使用 spark 2.3 解析 json 数据

Posted

技术标签:

【中文标题】使用 spark 2.3 解析 json 数据【英文标题】:parse json data with spark 2.3 【发布时间】:2021-11-03 09:54:30 【问题描述】:

我有以下 json 数据:


  "3200": 
    "id": "3200",
    "value": [
      "cat",
      "dog"
    ]
  ,
  "2000": 
    "id": "2000",
    "value": [
      "bird"
    ]
  ,
  "2500": 
    "id": "2500",
    "value": [
      "kitty"
    ]
  ,
  "3650": 
     "id": "3650",
      "value": [
      "horse"
    ]
  

这个数据的schema,我们用spark加载数据后的printSchema工具如下:

    root
 |-- 3200: struct (nullable = true)
 |    |-- id: string (nullable = true)
 |    |-- value: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |-- 2000: struct (nullable = true)
 |    |-- id: string (nullable = true)
 |    |-- value: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |-- 2500: struct (nullable = true)
 |     |-- id: string (nullable = true)
 |    |-- value: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |-- 3650: struct (nullable = true)
 |   |-- id: string (nullable = true)
 |    |-- value: array (nullable = true)
 |    |    |-- element: string (containsNull = true)

我想得到以下数据框

id    value

 3200  cat
 2000  bird
 2500  kitty
 3200  dog
 3650  horse 

如何进行解析以获得预期的输出

【问题讨论】:

【参考方案1】:

使用 spark-sql

数据框步骤(与 Mohana 的回答相同)

val df = spark.read.json(Seq(jsonData).toDS())

构建临时视图

df.createOrReplaceTempView("df")

结果:

val cols_k = df.columns.map( x => s"`$x`.id" ).mkString(",")
val cols_v = df.columns.map( x => s"`$x`.value" ).mkString(",")
spark.sql(s""" 
with t1 ( select map_from_arrays(array($cols_k),array($cols_v)) s from df ),
     t2 ( select explode(s) (key,value) from t1 )
     select key, explode(value) value from t2

""").show(false)

+----+-----+
|key |value|
+----+-----+
|2000|bird |
|2500|kitty|
|3200|cat  |
|3200|dog  |
|3650|horse|
+----+-----+

【讨论】:

你能用scala或者python写吗【参考方案2】:

您可以使用stack() 函数转置数据帧,然后提取key 字段并使用explode_outer 函数分解value 字段。

val spark = SparkSession.builder().master("local[*]").getOrCreate()
spark.sparkContext.setLogLevel("ERROR")

import spark.implicits._

val jsonData = """
                   |  "3200": 
                   |    "id": "3200",
                   |    "value": [
                   |      "cat",
                   |      "dog"
                   |    ]
                   |  ,
                   |  "2000": 
                   |    "id": "2000",
                   |    "value": [
                   |      "bird"
                   |    ]
                   |  ,
                   |  "2500": 
                   |    "id": "2500",
                   |    "value": [
                   |      "kitty"
                   |    ]
                   |  ,
                   |  "3650": 
                   |     "id": "3650",
                   |      "value": [
                   |      "horse"
                   |    ]
                   |  
                   |
                   |""".stripMargin

val df = spark.read.json(Seq(jsonData).toDS())

df.selectExpr("stack (4, *) key")
    .select(expr("key.id").as("key"),
      explode_outer(expr("key.value")).as("value"))
    .show(false)

+----+-----+
|key |value|
+----+-----+
|2000|bird |
|2500|kitty|
|3200|cat  |
|3200|dog  |
|3650|horse|
+----+-----+

【讨论】:

我们可以有一个替代方案,而不用硬编码 4 号 使用df.columns.length

以上是关于使用 spark 2.3 解析 json 数据的主要内容,如果未能解决你的问题,请参考以下文章

在 Python 中使用 Spark Streaming 解析 JSON 消息

使用Spark解析多个JSON模式

避免在 Spark 中解析 json 子字段

毕设三 spark与phoenix集成插入数据/解析json数组

使用 Spark 使用包含结构的结构的数组进行 Json 解析

Spark:如何从 Spark 数据帧行解析和转换 json 字符串