从 S3 读取大型 JSON 文件 (3K+) 并从数组中选择特定键

Posted

技术标签:

【中文标题】从 S3 读取大型 JSON 文件 (3K+) 并从数组中选择特定键【英文标题】:Read Large JSON files (3K+) from S3 and Select Specific Keys from Array 【发布时间】:2019-07-31 14:46:07 【问题描述】:

我需要读取存储在 s3 中的多个 JSON 文件 (3K+),所有这些文件都具有相同的结构。该结构非常大且嵌套。在这些文件中,是一个包含对象、键:值对的数组。我需要选择其中一些键并将值写入 PySpark 数据帧。我正在使用 PySpark/Python3 在 AWS Glue 中编写代码。

到目前为止,我一直尝试从 S3 文件创建数据框,然后推断架构。我不确定这是否正确,也不确定这是否是最有效的。我也不确定接下来在哪里找到“Products”数组并从数组中提取几个键。

json_data_frame = spark.read.json("s3://" + args['destinationBucketName'] + "/" + args['s3SourcePath'])

json_schema = spark.read.json(json_data_frame.rdd.map(lambda row: row.json)).schema

我想要的结果是一个包含列的数据框,每个列都是数组中的键,并具有来自整个 s3 文件的所有值。

编辑:我做得更进一步:

json_data_frame = spark.read.option("multiLine", True).option("mode", "PERMISSIVE").json("s3://" + args['destinationBucketName'] + "/" + args['s3SourcePath'])

final_data_frame_prep = json_data_frame.withColumn("name", json_data_frame["products"].getItem("name")).withColumn("ndc_product_code", json_data_frame["products"].getItem("ndc_product_code"))

final_data_frame = final_data_frame_prep.select("name","ndc_product_code")

final_data_frame.show(20,False)

我现在在哪里创建数据框,正如我所怀疑的那样,除了每个值都是一个列表,有些是单个项目,有些是多个项目。我现在需要将列表分成单独的行。如果您有任何建议,我会很乐意在那里提供建议。当前数据框:

+------------------+----------------------+
|name |ndc_product_code |
+------------------+----------------------+
|[Refludan] |[50419-150] |
|[Erbitux, Erbitux]|[66733-948, 66733-958]|
+------------------+----------------------+

编辑2:

json_data_frame = spark.read.option("multiLine", True).option("mode", "PERMISSIVE").json("s3://" + args['destinationBucketName'] + "/" + args['s3SourcePath'])

final_data_frame_prep = json_data_frame.withColumn("name", explode(json_data_frame["products"].getItem("name"))).withColumn("ndc_product_code", explode(json_data_frame["products"].getItem("ndc_product_code"))).withColumn("dosage_form", explode(json_data_frame["products"].getItem("dosage_form"))).withColumn("strength", explode(json_data_frame["products"].getItem("strength")))

final_data_frame = final_data_frame_prep.select("name","ndc_product_code","dosage_form","strength")

final_data_frame.show(20,False)

我能够在代码以及剩余的两列中添加爆炸,但在数据框中看到重复,好像列表匹配所有可能性,而不是匹配键所在数组中的对象来自。数据框现在是:

+--------+----------------+-----------+---------+
|name |ndc_product_code|dosage_form|strength |
+--------+----------------+-----------+---------+
|Refludan|50419-150 |Powder |50 mg/1mL|
|Erbitux |66733-948 |Solution |2 mg/1mL |
|Erbitux |66733-948 |Solution |2 mg/1mL |
|Erbitux |66733-948 |Solution |2 mg/1mL |
|Erbitux |66733-948 |Solution |2 mg/1mL |
|Erbitux |66733-958 |Solution |2 mg/1mL |
|Erbitux |66733-958 |Solution |2 mg/1mL |
|Erbitux |66733-958 |Solution |2 mg/1mL |
|Erbitux |66733-958 |Solution |2 mg/1mL |
|Erbitux |66733-948 |Solution |2 mg/1mL |
|Erbitux |66733-948 |Solution |2 mg/1mL |
|Erbitux |66733-948 |Solution |2 mg/1mL |
|Erbitux |66733-948 |Solution |2 mg/1mL |
|Erbitux |66733-958 |Solution |2 mg/1mL |
|Erbitux |66733-958 |Solution |2 mg/1mL |
|Erbitux |66733-958 |Solution |2 mg/1mL |
|Erbitux |66733-958 |Solution |2 mg/1mL |
+--------+----------------+-----------+---------+

编辑3: 我不相信爆炸是我想要的。我将代码恢复为编辑 1。表格显示为

+------------------+----------------------+
|name |ndc_product_code |
+------------------+----------------------+
|[Refludan] |[50419-150] |
|[Erbitux, Erbitux]|[66733-948, 66733-958]|
+------------------+----------------------+

而我想要的是:

+------------------+----------------------+
|name |ndc_product_code |
+------------------+----------------------+
|[Refludan]|[50419-150]|
|[Erbitux]|[66733-948]|
|[Erbitux]|[66733-958]|
+------------------+----------------------+

有没有办法做到这一点,即匹配数组中的位置并基于该位置创建新行?

【问题讨论】:

【参考方案1】:

我明白了!

+--------+----------------+-----------+---------+
|name |ndc_product_code|dosage_form|strength |
+--------+----------------+-----------+---------+
|Refludan|50419-150 |Powder |50 mg/1mL|
|Erbitux |66733-948 |Solution |2 mg/1mL |
|Erbitux |66733-958 |Solution |2 mg/1mL |
+--------+----------------+-----------+---------+

代码是:

# Read in the json files from s3
json_data_frame = spark.read.option("multiLine", True).option("mode", "PERMISSIVE").json("s3://" + args['destinationBucketName'] + "/" + args['s3SourcePath'])

final_data_frame_prepprep = json_data_frame.withColumn("products_exp", explode(json_data_frame["products"]))\

final_data_frame_prep = final_data_frame_prepprep.withColumn("name", final_data_frame_prepprep["products_exp"].getItem("name"))\
                                             .withColumn("ndc_product_code", final_data_frame_prepprep["products_exp"].getItem("ndc_product_code"))\
                                             .withColumn("dosage_form", final_data_frame_prepprep["products_exp"].getItem("dosage_form"))\
                                             .withColumn("strength", final_data_frame_prepprep["products_exp"].getItem("strength"))

final_data_frame = final_data_frame_prep.select("name","ndc_product_code","dosage_form","strength")

final_data_frame.show(20,False)

关键是将数据作为一个整体展开,然后从数组中获取项目,然后选择要保留的内容。我希望这对其他人有所帮助--干杯

【讨论】:

以上是关于从 S3 读取大型 JSON 文件 (3K+) 并从数组中选择特定键的主要内容,如果未能解决你的问题,请参考以下文章

无法使用本地 PySpark 从 S3 读取 json 文件

当Spark从S3读取大文件时,可以将数据分发到不同的节点

Spark - 如何从 S3 读取具有文件名的多个 Json 文件

将大型 csv 文件从 S3 读入 R

Pyspark 从 S3 存储桶的子目录中读取所有 JSON 文件

从s3读取json文件以使用glueContext.read.json粘合pyspark会给出错误的结果