Spark:如何解析嵌套列表的 JSON 字符串以触发数据框?

Posted

技术标签:

【中文标题】Spark:如何解析嵌套列表的 JSON 字符串以触发数据框?【英文标题】:Spark: How to parse JSON string of nested lists to spark data frame? 【发布时间】:2021-02-08 19:46:24 【问题描述】:

如何解析嵌套列表的 JSON 字符串以在 pyspark 中触发数据帧?

输入数据框:

+-------------+-----------------------------------------------+
|url          |json                                           |
+-------------+-----------------------------------------------+
|https://url.a|[[1572393600000, 1.000],[1572480000000, 1.007]]|
|https://url.b|[[1572825600000, 1.002],[1572912000000, 1.000]]|
+-------------+-----------------------------------------------+

root
 |-- url: string (nullable = true)
 |-- json: string (nullable = true)

预期输出:

+---------------------------------------+
|col_1 | col_2               | col_3    |
+---------------------------------------+
| a    | 1572393600000       |  1.000   | 
| a    | 1572480000000       |  1.007   |
| b    | 1572825600000       |  1.002   |
| b    | 1572912000000       |  1.000   |
+---------------------------------------+

示例代码:

import pyspark
import pyspark.sql.functions as F

spark = (pyspark.sql.SparkSession.builder.appName("Downloader_standalone")
    .master('local[*]')
    .getOrCreate())

sc = spark.sparkContext
from pyspark.sql import Row

rdd_list  = [('https://url.a','[[1572393600000, 1.000],[1572480000000, 1.007]]'),
             ('https://url.b','[[1572825600000, 1.002],[1572912000000, 1.000]]')]

jsons = sc.parallelize(rdd_list) 

df = spark.createDataFrame(jsons, "url string, json string")
df.show(truncate=False)
df.printSchema()


(df.withColumn('json', F.from_json(F.col('json'),"array<string,string>"))
.select(F.explode('json').alias('col_1', 'col_2', 'col_3')).show())

例子很少,但我不知道怎么做:

How to parse and transform json string from spark data frame rows in pyspark

How to transform JSON string with multiple keys, from spark data frame rows in pyspark?

【问题讨论】:

【参考方案1】:

通过在字符串中进行一些替换并通过拆分,您可以获得所需的结果:

from pyspark.sql import functions as F

df1 = df.withColumn(
    "col_1",
    F.regexp_replace("url", "https://url.", "")
).withColumn(
    "col_2_3",
    F.explode(
        F.expr("""transform(
            split(trim(both '][' from json), '\\\],\\\['), 
            x -> struct(split(x, ',')[0] as col_2, split(x, ',')[1] as col_3)
        )""")
    )
).selectExpr("col_1", "col_2_3.*")

df1.show(truncate=False)

#+-----+-------------+------+
#|col_1|col_2        |col_3 |
#+-----+-------------+------+
#|a    |1572393600000| 1.000|
#|a    |1572480000000| 1.007|
#|b    |1572825600000| 1.002|
#|b    |1572912000000| 1.000|
#+-----+-------------+------+

说明:

    trim(both '][' from json) :删除尾随和前导字符 [],得到类似:1572393600000, 1.000],[1572480000000, 1.007

    现在你可以用],[ 分割(\\\ 用于转义括号)

    transform 从拆分中获取数组,对于每个元素,它按逗号拆分并创建结构 col_2col_3

    分解从变换中获得的结构数组并星号展开结构列

【讨论】:

感谢您的回答,由于某种原因我得到了异常,您知道如何解决吗? AnalysisException: Can only star expand struct data types. Attribute: `ArrayBuffer(json)`; 效果很好,你能不能帮忙看看 F.expr("""transform 做了什么,什么是 "'\\\" 等等... 如果你能帮忙,最后一件事,如果有像“url&id=ID1234]2]0]”这样的网址,我想提取“ID1234”(id 总是 6 个字符),知道怎么做吗? @Dan 你可以使用 regexp_extract 函数:F.regexp_extract(F.col("url"), ".+(ID\\d4).+", 1)【参考方案2】:
df.select(df.url, F.explode(F.from_json(df.json,"array<string>")))
.select("url",F.from_json((F.col("col")),"array<string>").alias("col"))
.select("url",F.col("col").getItem(0),F.col("col").getItem(1))
.show(truncate=False)

+-------------+-------------+------+
|url          |col[0]       |col[1]|
+-------------+-------------+------+
|https://url.a|1572393600000|1.0   |
|https://url.a|1572480000000|1.007 |
|https://url.b|1572825600000|1.002 |
|https://url.b|1572912000000|1.0   |
+-------------+-------------+------+

【讨论】:

谢谢,在这种情况下,每对总是只有 2 个值。这很好用!

以上是关于Spark:如何解析嵌套列表的 JSON 字符串以触发数据框?的主要内容,如果未能解决你的问题,请参考以下文章

在scala中将spark决策树模型调试字符串转换为嵌套JSON

如何以角度解析嵌套的json

如何使用从 Flutter 中的 json 解析的嵌套映射列表中的函数创建对象

解析 JSON 字符串中的嵌套对象

在 Python 中通过嵌套的 Json/dict 解析

Spark:如何从 Spark 数据帧行解析和转换 json 字符串