使用 Pyspark 处理 JSON 结构

Posted

技术标签:

【中文标题】使用 Pyspark 处理 JSON 结构【英文标题】:Handle JSON structure with Pyspark 【发布时间】:2019-09-11 01:25:55 【问题描述】:

我是 spark 新手,并试图将以下格式的 JSON 文件读入 spark 数据帧。这是我的 JSON 格式

“元素”:[

Q4

Name:ABC,
Language:English,
Age:45,
Title:SWE

,

Q5

Name:DEF,
Language:English,
Age:60
Title: Engineer
,

Q6

Name:HIJ,
Language:English,
Age:57,
Title:

] 我希望输出是

Name | Language | Age | Title     
ABC  | English  | 45  |   SWE    

DEF  | English  | 60  | Engineer  

HIJ  | English  | 57  |   Null       

如何使用 pyspark 实现这一目标?

【问题讨论】:

【参考方案1】:

请尝试使用

df=spark.read.json()

读取文件。它会将您的数据转换为数据框格式。如果您需要元素内的文档,您可能需要选择 JSON 元素。

--编辑部分,如果你想使用硬编码字符串,请参考 spark doc: 来自 spark 文档的示例内容。

sc = spark.sparkContext
jsonStrings = ['"name":"Yin","address":"city":"Columbus","state":"Ohio"']
otherPeopleRDD = sc.parallelize(jsonStrings)
otherPeople = spark.read.json(otherPeopleRDD)
otherPeople.show()
# +---------------+----+
# |        address|name|
# +---------------+----+
# |[Columbus,Ohio]| Yin|
# +---------------+----+

--Edit2 以您的示例为例,但我只选择了在此处创建数据框所需的数据。我希望,这对你有用。

    import os
    import sys
    from pyspark.sql import SparkSession
    import json
    from pyspark.sql import Row

    spark = SparkSession.builder.master("local").getOrCreate()



    json_doc1='"elements": "Q4":"Name":"ABC","Language":"English","Age":45,"Title":"SWE","Q5": "Name":"DEF","Language":"English","Age":60,"Title": "Engineer"'
    test=json.loads(json_doc1)
    data1=test['elements'].values()
        print (data1)

    #rddd1= sc.parallelize()
    df1=spark.createDataFrame(Row(**x) for x in data1)


    df1.show()


+---+--------+----+--------+
|Age|Language|Name|   Title|
+---+--------+----+--------+
| 60| English| DEF|Engineer|
| 45| English| ABC|     SWE|
+---+--------+----+--------+

谢谢, 手动

【讨论】:

谢谢。从您的示例中,如果我想将 Name、City 和 State 作为我的数据框上的列并删除 address struct ,我该怎么做? 嘿,您需要做的第一件事是..更正 JSON 文档。您在此处给出的 JSON 不正确。您可以使用在线工具轻松验证 JSON。 如果你的工作完成了,更新总是好的。如果没有,那么你也应该更新。

以上是关于使用 Pyspark 处理 JSON 结构的主要内容,如果未能解决你的问题,请参考以下文章

使用 pyspark 从数据框创建 json 结构

如何使用 pySpark 使多个 json 处理更快?

在 PySpark 中使用 rdd.map 解压和编码字符串

Pyspark 将嵌套结构字段转换为 Json 字符串

kafka 到 pyspark 结构化流,将 json 解析为数据帧

尝试使用 JSON 结构将时间箱转换为 Pyspark 中的分钟和小时数组