Spark:如何从 Spark 数据帧行解析和转换 json 字符串

Posted

技术标签:

【中文标题】Spark:如何从 Spark 数据帧行解析和转换 json 字符串【英文标题】:Spark: How to parse and transform json string from spark data frame rows 【发布时间】:2021-01-29 14:55:32 【问题描述】:

如何从 pyspark 中的 spark 数据帧行中解析和转换 json 字符串?

我正在寻求如何解析的帮助:

json 字符串到 json 结构 output 1 将json字符串转换为a、b和id列output 2

背景:我通过 API 获取具有大量行的 json 字符串(jstr1jstr2、...),这些行保存到 spark df。我可以分别读取每一行的模式,但这不是解决方案,因为模式有大量行,所以速度很慢。每个 jstr 具有相同的架构,列/键 a 和 b 保持不变,只是 id 和列中的值发生变化。

编辑:使用 MapType 架构的 blackbishop 解决方案就像一个魅力 schema = "map<string, array<struct<a:int,b:int>>>"

问题扩展到: How to transform JSON string with multiple keys, from spark data frame rows in pyspark?

from pyspark.sql import Row
jstr1 = '"id_1": ["a": 1, "b": 2, "a": 3, "b": 4]'
jstr2 = '"id_2": ["a": 5, "b": 6, "a": 7, "b": 8]'
    
df = sqlContext.createDataFrame([Row(json=jstr1),Row(json=jstr2)])
    
schema = F.schema_of_json(df.select(F.col("json")).take(1)[0].json)
df2 = df.withColumn('json', F.from_json(F.col('json'), schema))
df2.show()

当前输出:

+--------------------+
|                json|
+--------------------+
|[[[1, 2], [3, 4]]]  |
|                  []|
+--------------------+

所需的输出 1:

+--------------------+-------+
|         json      |   id   |
+--------------------+-------+
|[[[1, 2], [3, 4]]] |   id_1 |
|[[[5, 6], [7, 8]]] |   id_2 |
+--------------------+-------+ 

所需的输出 2:

+---------+----------+-------+
|    a    |     b    |   id  |
+--------------------+-------+
|    1    |    2     |  id_1 |
|    3    |    4     |  id_1 |
|    5    |    6     |  id_2 |
|    7    |    8     |  id_2 |
+---------+----------+-------+
 

【问题讨论】:

您能否指定从问题中的 api 调用获得的 json 行输出?或者你是否在一次 api 调用中得到一个 json 字符串? 这能回答你的问题吗? Pyspark: Parse a column of json strings @Chris 这不能回答问题,因为必须使用 MapType 模式来解决问题 【参考方案1】:

第二行的结果为 null,因为您只使用了与第二行不同的第一行的架构。您可以将 JSON 解析为 MapType,其中键是字符串类型,值是结构数组类型:

schema = "map<string, array<struct<a:int,b:int>>>"

df = df.withColumn('json', F.from_json(F.col('json'), schema))

df.printSchema()
#root
# |-- json: map (nullable = true)
# |    |-- key: string
# |    |-- value: array (valueContainsNull = true)
# |    |    |-- element: struct (containsNull = true)
# |    |    |    |-- a: integer (nullable = true)
# |    |    |    |-- b: integer (nullable = true)

然后,通过一些简单的转换,您可以获得预期的输出:

id 列表示映射中的键,您可以通过map_keys 函数得到它 结构体&lt;a:int, b:int&gt; 表示您使用map_values 函数获得的值
output1 = df.withColumn("id", F.map_keys("json").getItem(0)) \
            .withColumn("json", F.map_values("json").getItem(0))

output1.show(truncate=False)

# +----------------+----+
# |json            |id  |
# +----------------+----+
# |[[1, 2], [3, 4]]|id_1|
# |[[5, 6], [7, 8]]|id_2|
# +----------------+----+

output2 = output1.withColumn("attr", F.explode("json")) \
    .select("id", "attr.*")

output2.show(truncate=False)

# +----+---+---+
# |id  |a  |b  |
# +----+---+---+
# |id_1|1  |2  |
# |id_1|3  |4  |
# |id_2|5  |6  |
# |id_2|7  |8  |
# +----+---+---+

【讨论】:

以上是关于Spark:如何从 Spark 数据帧行解析和转换 json 字符串的主要内容,如果未能解决你的问题,请参考以下文章

Spark中具有固定向量的数据帧行的点积

将 spark 数据帧行写入 dynamoDB 表中的项目

基于其他列(即应用 CDC)将多个 spark 数据帧行组合成一个

Apache Spark:迭代数据帧行并通过 MutableList (Scala) 创建新数据帧

spark 如何从 JSON 推断数字类型?

Spark:如何从具有属性的多个嵌套 XML 文件转换为 Data Frame 数据