AWS Glue:如何处理具有不同架构的嵌套 JSON
Posted
技术标签:
【中文标题】AWS Glue:如何处理具有不同架构的嵌套 JSON【英文标题】:AWS Glue: How to handle nested JSON with varying schemas 【发布时间】:2018-09-02 15:24:41 【问题描述】:目标: 我们希望使用 AWS Glue 数据目录为驻留在 S3 存储桶中的 JSON 数据创建一个表,然后我们将通过 Redshift Spectrum 对其进行查询和解析。
背景: JSON 数据来自 DynamoDB Streams,并且嵌套很深。第一级 JSON 有一组一致的元素:Keys、NewImage、OldImage、SequenceNumber、ApproximateCreationDateTime、SizeBytes 和 EventName。唯一的变化是有些记录没有NewImage,有些没有OldImage。但是,在这第一级之下,架构变化很大。
理想情况下,我们希望使用 Glue 仅解析 JSON 的第一级,并且基本上将较低级别视为大型 STRING 对象(然后我们将根据需要使用 Redshift Spectrum 对其进行解析)。目前,我们正在将整个记录加载到 Redshift 中的单个 VARCHAR 列中,但记录接近 Redshift 中数据类型的最大大小(最大 VARCHAR 长度为 65535)。因此,我们希望在记录到达 Redshift 之前执行第一级解析。
到目前为止我们已经尝试/参考的内容:
将 AWS Glue Crawler 指向 S3 存储桶会导致数百个表具有一致的***架构(上面列出的属性),但在 STRUCT 元素的更深层次上会出现不同的架构。我们还没有找到一种方法来创建可以从所有这些表中读取并将其加载到单个表中的 Glue ETL 作业。 手动创建表没有成效。我们尝试将每一列设置为 STRING 数据类型,但该作业未能成功加载数据(可能是因为这将涉及从 STRUCT 到 STRING 的一些转换)。将列设置为 STRUCT 时,需要定义架构 - 但这正是从一条记录到另一条记录的不同,因此我们无法提供适用于所有相关记录的通用 STRUCT 架构。 AWS Glue Relationalize transform 很有趣,但不是我们在这种情况下要寻找的(因为我们希望保持部分 JSON 完整,而不是完全压平它)。 Redshift Spectrum 支持几周前的 scalar JSON 数据,但这不适用于我们正在处理的嵌套 JSON。这些似乎都无法帮助处理由 Glue Crawler 创建的数百个表格。问题: 我们将如何使用 Glue(或其他方法)来允许我们仅解析这些记录的第一级 - 同时忽略顶层元素下方的不同模式 - 以便我们可以从 Spectrum 访问它或将其物理加载到红移?
我是 Glue 的新手。我在 Glue 文档中花费了相当多的时间,并在论坛上浏览(有些稀疏的)信息。我可能会遗漏一些明显的东西——或者这可能是当前形式的 Glue 的限制。欢迎任何建议。
谢谢!
【问题讨论】:
您是否还希望所有不同的模式都可以一起访问?还是您更愿意将它们拆分到不同的表中? 嗨 Natalia,如果 Glue 需要在目录中创建多个表,我们希望能够一起访问这些表(例如通过将它们全部加载到 Glue 中的单个目标表中,我们然后可以通过 Spectrum 访问)。 【参考方案1】:到目前为止,这是 Glue 的限制。你看过胶水分类器吗?这是我唯一还没用过的,但可能适合你的需要。您可以为字段或类似内容定义 JSON 路径。
除此之外 - 胶水作业是要走的路。后台是 Spark,所以你几乎可以做任何事情。设置一个开发端点并使用它。在过去的三周里,我遇到了各种障碍,并决定完全放弃任何和所有 Glue 功能,只放弃 Spark,这样它既便携又实际工作。
在设置开发端点时可能需要记住的一件事是 IAM 角色必须具有“/”路径,因此您很可能需要手动创建一个具有此路径的单独角色。自动创建的路径为“/service-role/”。
【讨论】:
嗨 LauriK,感谢您的建议。我简要地尝试了分类器,但它们可能值得更深入地研究。在我尝试的几个分类器中,由于嵌套模式不同,Glue 仍在创建多个表。使用 Glue Jobs 时,是否有一种简单的方法可以为表输入指定通配符(“从前缀 = user_ 的所有表中提取”)?如果它必须指向 Glue Catalog 中的单个表,这似乎面临与上述相同的限制。 这是一个火花脚本,所以答案很可能是肯定的。文档在这里:spark.apache.org/docs/2.1.0/api/python/…【参考方案2】:我不确定您是否可以通过表定义来完成此操作,但您可以通过 ETL 作业通过使用映射函数将***值转换为 JSON 字符串来完成此操作。文档:[link]
import json
# Your mapping function
def flatten(rec):
for key in rec:
rec[key] = json.dumps(rec[key])
return rec
old_df = glueContext.create_dynamic_frame.from_options(
's3',
"paths": ['s3://...'],
"json")
# Apply mapping function f to all DynamicRecords in DynamicFrame
new_df = Map.apply(frame=old_df, f=flatten)
根据我的理解,您可以选择导出到 S3(可能是 Parquet 或其他一些列格式以优化查询)或直接导出到 Redshift,尽管我没有尝试过。
【讨论】:
【参考方案3】:你应该添加一个胶水分类器,最好是 $[*]
在s3爬取json文件时,会读取文件的第一行。
您可以创建一个粘合作业,以便将此 json 文件的数据目录表加载到 redshift 中。
我唯一的问题是 Redshift Spectrum 在读取数据目录中的 json 表时遇到问题..
如果您找到了解决方案,请告诉我
【讨论】:
【参考方案4】:我发现对浅层嵌套 json 有用的过程:
将第一级应用映射为datasource0
;
分解 struct
或 array
对象以摆脱元素级别
df1 = datasource0.toDF().select(id,col1,col2,...,explode(coln).alias(coln)
,其中explode
需要from pyspark.sql.functions import explode
;
通过intact_json = df1.select(id, itct1, itct2,..., itctm)
选择您希望保持原样的JSON对象;
将 df1
转换回 dynamicFrame 并将
dynamicFrame 以及通过dataframe.drop_fields(itct1, itct2,..., itctm)
删除完整的列;
将关系化表与基于“id”的完整表连接起来 列。
【讨论】:
resolvechoice = ResolveChoice.apply(frame = datasource, choice = "make_cols", transformation_ctx = "resolvechoice")
【参考方案5】:
截至 2018 年 12 月 20 日,我能够手动将具有第一级 json 字段的表定义为类型为 STRING 的列。然后在胶水脚本中,动态框架将列作为字符串。从那里,您可以在字段上执行json
类型的Unbox
操作。这将 json 解析字段并派生真正的模式。如果您可以遍历模式列表,则将Unbox
与Filter
结合使用可以让您循环并处理来自同一输入的异构 json 模式。
但是,请注意,这非常慢。我认为胶水在循环的每次迭代期间从 s3 下载源文件。我一直在尝试找到一种方法来保存初始源数据,但看起来.toDF
派生了字符串 json 字段的架构,即使您将它们指定为胶水 StringType。如果我能找到性能更好的解决方案,我会在这里添加评论。
【讨论】:
我能够通过使用胶水框架的内部_rdd
属性获得更好的性能来获得一个pyspark RDD。从那里,我做了一个映射,将消息类型作为键,称为 RDD 的 sortById
和 persist
。然后,我遍历每种不同类型的消息,并使用持久化的 RDD 对消息类型进行了 filter
values
以获取所有消息,然后是 repartition
。最后,回到胶合框架,我使用了create_dynamic_frame_from_rdd
。这似乎有更好的性能,因为它只是从源数据中读取 1 次。以上是关于AWS Glue:如何处理具有不同架构的嵌套 JSON的主要内容,如果未能解决你的问题,请参考以下文章
AWS Glue 数据目录,具有 S3 文件上的分区表和分区中的不同架构
如何处理 Clickhouse 的 AggregatingMergeTree 物化视图中的嵌套字段?