如何在 AWS Glue 中从 CSV 创建结构化 JSON

Posted

技术标签:

【中文标题】如何在 AWS Glue 中从 CSV 创建结构化 JSON【英文标题】:how to create structured JSON from CSV in AWS Glue 【发布时间】:2021-11-14 22:14:57 【问题描述】:

我能够以预期的格式创建结构化 json,但额外的斜杠出现在 JSON 记录中,并且 json 记录显示为字符串对象。

请详细说明解决方案或让我知道缺少什么或是否存在任何其他方法来实现预期结果。

我目前的结果:


   "awsservices":[
      ""\key":\"string_value"\, \"key":\numeric_value, "\key":\"amazon\web/services"",
      ""\key":\"string_value"\, \"key":\numeric_value, "\key":\"amazon\web/services"",
      ""\key":\"string_value"\, \"key":\numeric_value, "\key":\"amazon\web/services"",
      ""\key":\"string_value"\, \"key":\numeric_value, "\key":\"amazon\web/services""
   ]
 

预期结果:


   "awsservices":[
      "key":"string_value", "key":numeric_value, "key":"amazon web services",
      "key":"string_value", "key":numeric_value, "key":"amazon web services",
      "key":"string_value", "key":numeric_value, "key":"amazon web services",
      "key":"string_value", "key":numeric_value, "key":"amazon\web/services"
   ]
 

我的代码:

SourceDataDYF = glueContext.create_dynamic_frame.from_options(
   format_options = "quoteChar": '"', "escaper":"", "withHeader":True, "separator":"|", "inferSchema":"false",
   connection_type = "s3",
   format = "csv",
   connection_options = "paths": "s3:bucket_name/csv_file_path/"], "recurse":True,
   transformation_ctx = "SourceDataDYF"
)

StageDataDF = SourceDataDYF.toDF()

print("*******************************: WRITE JSON :*******************************")

PreStageDataDF1 = StageDataDF.select(to_json(struct(*StageDataDF.columns)).alias("json")) \
   .groupBy(spark_partition_id()) \
   .agg(collect_list("json").alias("awsservices")) \
   .select(col("awsservices").cast("string")).coalesce(1)

targetDataDYF = DynamicFrame.fromDF(PreStageDataDF1,glueContext,"PreStageDataDF1")
targetDataJSON = glueContext.write_dynamic_frame.from_options(
   frame = targetDataDYF,
   connection_type = "s3",
   connection_options = "path": "s3://result_bucket_name/folder_path/", "partitionKeys": [],
   format = "json",
   transformation_ctx = "targetDataJSON"
)

【问题讨论】:

由于数据基本正确,除了“values”是字符串列表而不是dicts,如果不做.cast("string")会怎样? to_json 应该足够了,你不需要在写之前把它刺痛 @JonSG,我尝试在 pandas 中使用 Pandas.converted 逻辑及其工作并获得预期的 josn 格式。 【参考方案1】:

我使用以下代码创建了结构化或预期格式的 JSON:

StageDataDF = SourceDataDYF.toDF()
StageDataDF.schema
StageDataDF.show(5)

print("**************:Converting Spark DF to pandas DF:******************")
StageDataDF1 = StageDataDF.toPandas()
print(StageDataDF1)

print("#############: Converting into Python Dictionary:#############")
StageDataDF2 = StageDataDF1.to_dict(orient = 'records')
print(StageDataDF2)

print("#############: Converting Dictionary to 2D list:#############")
StageDataDF3 =[[ rec for rec in StageDataDF2 ]]
StageDataDF4 = "awsservices":StageDataDF3
print(StageDataDF4)

print("#############: Converting Python Dictionary to Pandas DataFrame:#############")
StageDataDF5 = pd.DataFrame(data = StageDataDF4)
print(type(StageDataDF5))

print("#############: Converting Pandas DataFrame to Spark DF again:#############")
StageDataDF6 = spark.createDataFrame(StageDataDF5)
print(type(StageDataDF6))
StageDataDF6.show()

print("*******************************: Traget Dynamic Data Frame :*******************************")
targetDataDYF = DynamicFrame.fromDF(StageDataDF6, glueContext, "StageDataDF6").coalesce(1)
print(type(targetDataDYF))
targetDataDYF.show()

targetDataJSON = glueContext.write_dynamic_frame.from_options(frame = targetDataDYF, connection_type = "s3", connection_options = "path": "s3://bucket/folder/", "partitionKeys": [],format = "json",transformation_ctx = "targetDataJSON")

【讨论】:

但问题是,在以 json 格式写入结果时,需要一些数字格式的 json 值,而不是字符串格式。 示例:结果 Json 格式: "awsservices": [ "Process": "ZCF", "qnty": "1", "DiText": "Related", "LastUpdtDate": " 20210819180815”,“代码”:“008”,“CvelCost”:“94.9”,“EndDate”:“20140630000000”,]---------------预期的Json格式: “awsservices”:[“进程”:“ZCF”,“qnty”:1,“DiText”:“相关”,“LastUpdtDate”:20210819180815,“代码”:008,“CvelCost”:94.9,“EndDate”: 20140630000000, ] 如果有人在这里提供帮助,那就太好了。

以上是关于如何在 AWS Glue 中从 CSV 创建结构化 JSON的主要内容,如果未能解决你的问题,请参考以下文章

AWS Athena 从 S3 的 GLUE Crawler 输入 csv 创建的表中返回零记录

AWS Glue Crawler无法提取CSV标头

使用 Python 在 AWS Glue 中打开和读取文件

AWS Glue to Redshift:是否可以替换,更新或删除数据?

AWS Glue to Redshift:是不是可以替换、更新或删除数据?

AWS Glue 和重复数据删除增量 CSV 文件