Pyspark 将 JSON 读取为 dict 或 struct 而不是数据帧/RDD

Posted

技术标签:

【中文标题】Pyspark 将 JSON 读取为 dict 或 struct 而不是数据帧/RDD【英文标题】:Pyspark read a JSON as a dict or struct not a dataframe/RDD 【发布时间】:2020-01-29 13:55:45 【问题描述】:

我有一个保存在 S3 中的 JSON 文件,我试图在 PySpark 中打开/读取/存储/任何内容作为字典或结构。它看起来像这样:


    "filename": "some_file.csv",
    "md5": "md5 hash",
    "client_id": "some uuid",
    "mappings": 
        "shipping_city": "City",
        "shipping_country": "Country",
        "shipping_zipcode": "Zip",
        "shipping_address1": "Street Line 1",
        "shipping_address2": "Street Line 2",
        "shipping_state_abbreviation": "State"
    

我想从 S3 中读取它并将其存储为字典或结构。当我这样读时:

inputJSON = "s3://bucket/file.json"
dfJSON = sqlContext.read.json(inputJSON, multiLine=True)

我得到一个删除映射的数据框,如下所示:

+---------+-------------+----------------------------------------------------------+-------+
|client_id|filename     |mappings                                                  |md5    |
+-----------------------+----------------------------------------------------------+-------+
|some uuid|some_file.csv|[City, Country, Zip, Street Line 1, Street Line 2, State] |md5hash|
+-----------------------+----------------------------------------------------------+-------+

是否可以打开文件并将其读入字典,以便我可以访问映射或其他类似的东西?:

jsonDict = inputFile
mappingDict = jsonDict['mappings']

【问题讨论】:

【参考方案1】:

我能够通过将 boto3 添加到 EMR 集群并使用以下代码来解决此问题:

import boto3
import json

s3 = boto3.resource('s3')
obj = s3.Object('slm-transaction-incoming','All_Starbucks_Locations_in_the_US.json')
string = obj.get()['Body'].read().decode('utf-8')

json = json.loads(string)

可以通过在 EMR 终端中键入以下内容来添加 boto3:

sudo pip-3.6 install boto3

【讨论】:

【参考方案2】:

你可以试试这样的:

inputJSON = "/tmp/some_file.json"
dfJSON = spark.read.json(inputJSON, multiLine=True)

dfJSON.printSchema()


root
 |-- client_id: string (nullable = true)
 |-- filename: string (nullable = true)
 |-- mappings: struct (nullable = true)
 |    |-- shipping_address1: string (nullable = true)
 |    |-- shipping_address2: string (nullable = true)
 |    |-- shipping_city: string (nullable = true)
 |    |-- shipping_country: string (nullable = true)
 |    |-- shipping_state_abbreviation: string (nullable = true)
 |    |-- shipping_zipcode: string (nullable = true)
 |-- md5: string (nullable = true)


dict_mappings = dfJSON.select("mappings").toPandas().set_index('mappings').T.to_dict('list')

dict_mappings

Row(shipping_address1='Street Line 1', shipping_address2='Street Line 2', shipping_city='City', shipping_country='Country', shipping_state_abbreviation='State', shipping_zipcode='Zip'): []

OR(没有 Pandas)

list_map = map(lambda row: row.asDict(), dfJSON.select("mappings").collect())
dict_mappings2 = t['mappings']: t for t in list_map

【讨论】:

欣赏解决方案,但我需要为此安装 pandas,这不是我能做的。 请尝试不使用 Pandas 的替代方案 - 我刚刚更新了我的答案。不过,它可能需要一些调整。希望这会有所帮助!

以上是关于Pyspark 将 JSON 读取为 dict 或 struct 而不是数据帧/RDD的主要内容,如果未能解决你的问题,请参考以下文章

使用 Pyspark 将每个 json 对象读取为 Dataframe 中的单行?

将 JSON 多行文件加载到 pyspark 数据框中

Python之dict(或对象)与json之间转化

Python之dict(或对象)与json之间的互相转化

Python之dict(或对象)与json之间的互相转化

Python中:dict(或对象)与json之间的互相转化