数据摄取:将动态文件从 S3 加载到 Snowflake

Posted

技术标签:

【中文标题】数据摄取:将动态文件从 S3 加载到 Snowflake【英文标题】:Data Ingestion: Load Dynamic Files from S3 to Snowflake 【发布时间】:2021-01-26 20:49:12 【问题描述】:

情况:每个月都有一个 csv 登陆 AWS S3。供应商根据需要从文件中添加/删除/修改列。所以架构是不提前知道的。要求是在 Snowflake 中动态创建一个表并将数据加载到所述表中。 Matillion 是我们的 ELT 工具。

这是我到目前为止所做的。

    设置 Lambda 以检测文件的到达,将其转换为 JSON,上传到另一个 S3 目录并将文件名添加到 SQS。 Matillion 检测 SQS 消息并将包含 JSON 数据的文件加载到 SF 表的 Variant 列中。 SF 存储过程采用变量列并根据 JSON 数据中的字段数生成一个表。 SF 中的 VARIANT 列仅在其 JSON 数据时才以这种方式工作。很遗憾,不支持 CSV。

这适用于 10,000 行。当我使用超过 1GB 的完整文件(超过 10M 行)运行它时,就会出现问题。它会在运行时因磁盘空间不足错误而使 lambda 作业崩溃。

这些是我目前想到的替代方案:

    将 EFS 卷附加到 lambda,并在上传到 S3 之前使用它来存储 JSON 文件。 JSON 数据文件比 CSV 文件大得多,我预计 json 文件大约为 10-20GB,因为该文件有超过 1000 万行。 Matillion 有一个 Excel 查询组件,它可以在其中获取标题并动态创建表并加载文件。我在想我可以将 CSV 中的标题行转换为 Lambda 中的 XLX 文件,将其传递给 Matillion,让它创建结构,然后在创建结构后加载 csv 文件。

我在这里还有哪些其他选择?考虑因素包括用于未来大型 CSV 或类似需求的良好可重复设计模式、EFS 成本、我是否充分利用了可用的工具?谢谢!!!

【问题讨论】:

当您说“供应商添加/修改/删除列”时,您能否详细说明一下,他们是针对所有文件执行此操作还是针对特定文件执行此操作?我想了解架构何时更改,整个数据集都更改了,而不是一些具有不同列号的文件而其他文件仍然具有不同的列号?如果您回答这些问题,很容易提出建议。 【参考方案1】:

为什么不将初始的 csv 文件拆分为多个文件,然后以与您当前相同的方式处理每个文件?

【讨论】:

【参考方案2】:

为什么要将 CSV 转换为 JSON; CSV 直接加载到表中,无需进行 JSON 特殊需要的任何数据转换,横向展平以将 json 转换为关系数据行;以及为什么不使用 Snowflake Snowpipe 功能将数据直接加载到 Snowflake 中而不使用 Matallion。您可以在加载到 Snowflake 之前将大型 csv 文件拆分为较小的块;这将有助于在 SF Warehouse 之间分配数据处理负载。

【讨论】:

您的所有建议都需要提前了解架构/列顺序。我们正在尝试以一种方式解决它,如果供应商更改文件中的列,我们可以根据文件中的列动态创建一个表。【参考方案3】:

我还使用 Matillion 将 CSV 文件从 SFTP 加载到 Snowflake,但不知道架构。

在我的过程中,我在 Snowflake 中创建了一个“临时”表,其中包含 50 个 VARCHAR 列(我们的文件不应超过 50 列)。我们的数据总是包含文本、日期或数字,因此 VARCHAR 不是问题。然后我可以将 .csv 文件加载到临时表中。我相信这也适用于来自 S3 的文件。

这至少会将数据导入 Snowflake。但是,鉴于您的情况,我不确定如何创建“最终”表。 我可以想象能够使用标题行,和/或对每列中包含的数据的“类型”进行一些分析,以确定所需的列类型。

但是,如果您可以创建“最终”表,则可以将数据从 temp.或者改变临时表本身。

【讨论】:

【参考方案4】:

这可以使用外部表来实现,其中外部表将映射到单个列,并且分隔符将是换行符。外部表还有一个特殊的虚拟列,可以对其进行处理以动态提取所有列,然后使用存储过程在任何给定时间根据列数创建一个表。有一个有趣的视频讨论了雪花中的这种限制 (https://youtu.be/hRNu58E6Kmg)

【讨论】:

以上是关于数据摄取:将动态文件从 S3 加载到 Snowflake的主要内容,如果未能解决你的问题,请参考以下文章

Snowpipe 自动摄取配置看起来会触发所有管道

Snowpipe 自动摄取

来自 S3 最佳实践的 Snowpipe 连续摄取

使用查询将多个 excel 文件摄取到 MySQL

使用复制命令将数据从 s3 加载到 redshift

无法从 S3 存储桶(镶木地板文件)将数据加载到 EMR 上的猪中