Spark:如何从 Spark 数据帧行解析和转换 json 字符串
Posted
技术标签:
【中文标题】Spark:如何从 Spark 数据帧行解析和转换 json 字符串【英文标题】:Spark: How to parse and transform json string from spark data frame rows 【发布时间】:2021-01-29 14:55:32 【问题描述】:如何从 pyspark 中的 spark 数据帧行中解析和转换 json 字符串?
我正在寻求如何解析的帮助:
json 字符串到 json 结构output 1
将json字符串转换为a、b和id列output 2
背景:我通过 API 获取具有大量行的 json 字符串(jstr1
、jstr2
、...),这些行保存到 spark df
。我可以分别读取每一行的模式,但这不是解决方案,因为模式有大量行,所以速度很慢。每个 jstr
具有相同的架构,列/键 a 和 b 保持不变,只是 id
和列中的值发生变化。
编辑:使用 MapType 架构的 blackbishop 解决方案就像一个魅力 schema = "map<string, array<struct<a:int,b:int>>>"
问题扩展到: How to transform JSON string with multiple keys, from spark data frame rows in pyspark?
from pyspark.sql import Row
jstr1 = '"id_1": ["a": 1, "b": 2, "a": 3, "b": 4]'
jstr2 = '"id_2": ["a": 5, "b": 6, "a": 7, "b": 8]'
df = sqlContext.createDataFrame([Row(json=jstr1),Row(json=jstr2)])
schema = F.schema_of_json(df.select(F.col("json")).take(1)[0].json)
df2 = df.withColumn('json', F.from_json(F.col('json'), schema))
df2.show()
当前输出:
+--------------------+
| json|
+--------------------+
|[[[1, 2], [3, 4]]] |
| []|
+--------------------+
所需的输出 1:
+--------------------+-------+
| json | id |
+--------------------+-------+
|[[[1, 2], [3, 4]]] | id_1 |
|[[[5, 6], [7, 8]]] | id_2 |
+--------------------+-------+
所需的输出 2:
+---------+----------+-------+
| a | b | id |
+--------------------+-------+
| 1 | 2 | id_1 |
| 3 | 4 | id_1 |
| 5 | 6 | id_2 |
| 7 | 8 | id_2 |
+---------+----------+-------+
【问题讨论】:
您能否指定从问题中的 api 调用获得的 json 行输出?或者你是否在一次 api 调用中得到一个 json 字符串? 这能回答你的问题吗? Pyspark: Parse a column of json strings @Chris 这不能回答问题,因为必须使用 MapType 模式来解决问题 【参考方案1】:第二行的结果为 null,因为您只使用了与第二行不同的第一行的架构。您可以将 JSON 解析为 MapType,其中键是字符串类型,值是结构数组类型:
schema = "map<string, array<struct<a:int,b:int>>>"
df = df.withColumn('json', F.from_json(F.col('json'), schema))
df.printSchema()
#root
# |-- json: map (nullable = true)
# | |-- key: string
# | |-- value: array (valueContainsNull = true)
# | | |-- element: struct (containsNull = true)
# | | | |-- a: integer (nullable = true)
# | | | |-- b: integer (nullable = true)
然后,通过一些简单的转换,您可以获得预期的输出:
id
列表示映射中的键,您可以通过map_keys
函数得到它
结构体<a:int, b:int>
表示您使用map_values
函数获得的值
output1 = df.withColumn("id", F.map_keys("json").getItem(0)) \
.withColumn("json", F.map_values("json").getItem(0))
output1.show(truncate=False)
# +----------------+----+
# |json |id |
# +----------------+----+
# |[[1, 2], [3, 4]]|id_1|
# |[[5, 6], [7, 8]]|id_2|
# +----------------+----+
output2 = output1.withColumn("attr", F.explode("json")) \
.select("id", "attr.*")
output2.show(truncate=False)
# +----+---+---+
# |id |a |b |
# +----+---+---+
# |id_1|1 |2 |
# |id_1|3 |4 |
# |id_2|5 |6 |
# |id_2|7 |8 |
# +----+---+---+
【讨论】:
以上是关于Spark:如何从 Spark 数据帧行解析和转换 json 字符串的主要内容,如果未能解决你的问题,请参考以下文章
基于其他列(即应用 CDC)将多个 spark 数据帧行组合成一个