从 S3 读取大型 JSON 文件 (3K+) 并从数组中选择特定键
Posted
技术标签:
【中文标题】从 S3 读取大型 JSON 文件 (3K+) 并从数组中选择特定键【英文标题】:Read Large JSON files (3K+) from S3 and Select Specific Keys from Array 【发布时间】:2019-07-31 14:46:07 【问题描述】:我需要读取存储在 s3 中的多个 JSON 文件 (3K+),所有这些文件都具有相同的结构。该结构非常大且嵌套。在这些文件中,是一个包含对象、键:值对的数组。我需要选择其中一些键并将值写入 PySpark 数据帧。我正在使用 PySpark/Python3 在 AWS Glue 中编写代码。
到目前为止,我一直尝试从 S3 文件创建数据框,然后推断架构。我不确定这是否正确,也不确定这是否是最有效的。我也不确定接下来在哪里找到“Products”数组并从数组中提取几个键。
json_data_frame = spark.read.json("s3://" + args['destinationBucketName'] + "/" + args['s3SourcePath'])
json_schema = spark.read.json(json_data_frame.rdd.map(lambda row: row.json)).schema
我想要的结果是一个包含列的数据框,每个列都是数组中的键,并具有来自整个 s3 文件的所有值。
编辑:我做得更进一步:
json_data_frame = spark.read.option("multiLine", True).option("mode", "PERMISSIVE").json("s3://" + args['destinationBucketName'] + "/" + args['s3SourcePath'])
final_data_frame_prep = json_data_frame.withColumn("name", json_data_frame["products"].getItem("name")).withColumn("ndc_product_code", json_data_frame["products"].getItem("ndc_product_code"))
final_data_frame = final_data_frame_prep.select("name","ndc_product_code")
final_data_frame.show(20,False)
我现在在哪里创建数据框,正如我所怀疑的那样,除了每个值都是一个列表,有些是单个项目,有些是多个项目。我现在需要将列表分成单独的行。如果您有任何建议,我会很乐意在那里提供建议。当前数据框:
+------------------+----------------------+
|name |ndc_product_code |
+------------------+----------------------+
|[Refludan] |[50419-150] |
|[Erbitux, Erbitux]|[66733-948, 66733-958]|
+------------------+----------------------+
编辑2:
json_data_frame = spark.read.option("multiLine", True).option("mode", "PERMISSIVE").json("s3://" + args['destinationBucketName'] + "/" + args['s3SourcePath'])
final_data_frame_prep = json_data_frame.withColumn("name", explode(json_data_frame["products"].getItem("name"))).withColumn("ndc_product_code", explode(json_data_frame["products"].getItem("ndc_product_code"))).withColumn("dosage_form", explode(json_data_frame["products"].getItem("dosage_form"))).withColumn("strength", explode(json_data_frame["products"].getItem("strength")))
final_data_frame = final_data_frame_prep.select("name","ndc_product_code","dosage_form","strength")
final_data_frame.show(20,False)
我能够在代码以及剩余的两列中添加爆炸,但在数据框中看到重复,好像列表匹配所有可能性,而不是匹配键所在数组中的对象来自。数据框现在是:
+--------+----------------+-----------+---------+
|name |ndc_product_code|dosage_form|strength |
+--------+----------------+-----------+---------+
|Refludan|50419-150 |Powder |50 mg/1mL|
|Erbitux |66733-948 |Solution |2 mg/1mL |
|Erbitux |66733-948 |Solution |2 mg/1mL |
|Erbitux |66733-948 |Solution |2 mg/1mL |
|Erbitux |66733-948 |Solution |2 mg/1mL |
|Erbitux |66733-958 |Solution |2 mg/1mL |
|Erbitux |66733-958 |Solution |2 mg/1mL |
|Erbitux |66733-958 |Solution |2 mg/1mL |
|Erbitux |66733-958 |Solution |2 mg/1mL |
|Erbitux |66733-948 |Solution |2 mg/1mL |
|Erbitux |66733-948 |Solution |2 mg/1mL |
|Erbitux |66733-948 |Solution |2 mg/1mL |
|Erbitux |66733-948 |Solution |2 mg/1mL |
|Erbitux |66733-958 |Solution |2 mg/1mL |
|Erbitux |66733-958 |Solution |2 mg/1mL |
|Erbitux |66733-958 |Solution |2 mg/1mL |
|Erbitux |66733-958 |Solution |2 mg/1mL |
+--------+----------------+-----------+---------+
编辑3: 我不相信爆炸是我想要的。我将代码恢复为编辑 1。表格显示为
+------------------+----------------------+
|name |ndc_product_code |
+------------------+----------------------+
|[Refludan] |[50419-150] |
|[Erbitux, Erbitux]|[66733-948, 66733-958]|
+------------------+----------------------+
而我想要的是:
+------------------+----------------------+
|name |ndc_product_code |
+------------------+----------------------+
|[Refludan]|[50419-150]|
|[Erbitux]|[66733-948]|
|[Erbitux]|[66733-958]|
+------------------+----------------------+
有没有办法做到这一点,即匹配数组中的位置并基于该位置创建新行?
【问题讨论】:
【参考方案1】:我明白了!
+--------+----------------+-----------+---------+
|name |ndc_product_code|dosage_form|strength |
+--------+----------------+-----------+---------+
|Refludan|50419-150 |Powder |50 mg/1mL|
|Erbitux |66733-948 |Solution |2 mg/1mL |
|Erbitux |66733-958 |Solution |2 mg/1mL |
+--------+----------------+-----------+---------+
代码是:
# Read in the json files from s3
json_data_frame = spark.read.option("multiLine", True).option("mode", "PERMISSIVE").json("s3://" + args['destinationBucketName'] + "/" + args['s3SourcePath'])
final_data_frame_prepprep = json_data_frame.withColumn("products_exp", explode(json_data_frame["products"]))\
final_data_frame_prep = final_data_frame_prepprep.withColumn("name", final_data_frame_prepprep["products_exp"].getItem("name"))\
.withColumn("ndc_product_code", final_data_frame_prepprep["products_exp"].getItem("ndc_product_code"))\
.withColumn("dosage_form", final_data_frame_prepprep["products_exp"].getItem("dosage_form"))\
.withColumn("strength", final_data_frame_prepprep["products_exp"].getItem("strength"))
final_data_frame = final_data_frame_prep.select("name","ndc_product_code","dosage_form","strength")
final_data_frame.show(20,False)
关键是将数据作为一个整体展开,然后从数组中获取项目,然后选择要保留的内容。我希望这对其他人有所帮助--干杯
【讨论】:
以上是关于从 S3 读取大型 JSON 文件 (3K+) 并从数组中选择特定键的主要内容,如果未能解决你的问题,请参考以下文章
无法使用本地 PySpark 从 S3 读取 json 文件
Spark - 如何从 S3 读取具有文件名的多个 Json 文件