使用 spark 2.3 解析 json 数据
Posted
技术标签:
【中文标题】使用 spark 2.3 解析 json 数据【英文标题】:parse json data with spark 2.3 【发布时间】:2021-11-03 09:54:30 【问题描述】:我有以下 json 数据:
"3200":
"id": "3200",
"value": [
"cat",
"dog"
]
,
"2000":
"id": "2000",
"value": [
"bird"
]
,
"2500":
"id": "2500",
"value": [
"kitty"
]
,
"3650":
"id": "3650",
"value": [
"horse"
]
这个数据的schema,我们用spark加载数据后的printSchema工具如下:
root
|-- 3200: struct (nullable = true)
| |-- id: string (nullable = true)
| |-- value: array (nullable = true)
| | |-- element: string (containsNull = true)
|-- 2000: struct (nullable = true)
| |-- id: string (nullable = true)
| |-- value: array (nullable = true)
| | |-- element: string (containsNull = true)
|-- 2500: struct (nullable = true)
| |-- id: string (nullable = true)
| |-- value: array (nullable = true)
| | |-- element: string (containsNull = true)
|-- 3650: struct (nullable = true)
| |-- id: string (nullable = true)
| |-- value: array (nullable = true)
| | |-- element: string (containsNull = true)
我想得到以下数据框
id value
3200 cat
2000 bird
2500 kitty
3200 dog
3650 horse
如何进行解析以获得预期的输出
【问题讨论】:
【参考方案1】:使用 spark-sql
数据框步骤(与 Mohana 的回答相同)
val df = spark.read.json(Seq(jsonData).toDS())
构建临时视图
df.createOrReplaceTempView("df")
结果:
val cols_k = df.columns.map( x => s"`$x`.id" ).mkString(",")
val cols_v = df.columns.map( x => s"`$x`.value" ).mkString(",")
spark.sql(s"""
with t1 ( select map_from_arrays(array($cols_k),array($cols_v)) s from df ),
t2 ( select explode(s) (key,value) from t1 )
select key, explode(value) value from t2
""").show(false)
+----+-----+
|key |value|
+----+-----+
|2000|bird |
|2500|kitty|
|3200|cat |
|3200|dog |
|3650|horse|
+----+-----+
【讨论】:
你能用scala或者python写吗【参考方案2】:您可以使用stack()
函数转置数据帧,然后提取key
字段并使用explode_outer
函数分解value
字段。
val spark = SparkSession.builder().master("local[*]").getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
import spark.implicits._
val jsonData = """
| "3200":
| "id": "3200",
| "value": [
| "cat",
| "dog"
| ]
| ,
| "2000":
| "id": "2000",
| "value": [
| "bird"
| ]
| ,
| "2500":
| "id": "2500",
| "value": [
| "kitty"
| ]
| ,
| "3650":
| "id": "3650",
| "value": [
| "horse"
| ]
|
|
|""".stripMargin
val df = spark.read.json(Seq(jsonData).toDS())
df.selectExpr("stack (4, *) key")
.select(expr("key.id").as("key"),
explode_outer(expr("key.value")).as("value"))
.show(false)
+----+-----+
|key |value|
+----+-----+
|2000|bird |
|2500|kitty|
|3200|cat |
|3200|dog |
|3650|horse|
+----+-----+
【讨论】:
我们可以有一个替代方案,而不用硬编码 4 号 使用df.columns.length
以上是关于使用 spark 2.3 解析 json 数据的主要内容,如果未能解决你的问题,请参考以下文章
在 Python 中使用 Spark Streaming 解析 JSON 消息
毕设三 spark与phoenix集成插入数据/解析json数组