如何检查从 aws S3 到雪花的数据加载结果

Posted

技术标签:

【中文标题】如何检查从 aws S3 到雪花的数据加载结果【英文标题】:How to check results of data load from aws S3 to Snowflake 【发布时间】:2020-08-05 15:49:11 【问题描述】:

谁能告诉我如何检查从 S3 到 Snowflake 的数据加载结果。

加载过程是面向批处理的,我将文件从 Snowflake 中的某个位置拖放到 S3 存储桶中,我们通过查询从文件中读取的阶段将数据摄取到永久表中。

处理文件中的数据后,需要将文件移动到存档文件夹。

我正在为如何完成最后一步而苦恼?

我想到了一些选项,但不确定它们有多好 选项 1 - Snowflake 中的一个外部函数,它将在 AWS 中引发一个事件,该事件将触发一个 lambda 函数来移动文件。我觉得有点不靠谱

选项 2 - 将加载结果写入 Snowflake 中的审核表,然后从 aws 轮询该表并将其中列出的所有已处理文件移动到存档 - 可能有效,但有点老派,也不是实时的,所以会需要在 Snowflake 端进行额外查询以防止重复

选项 3 - 将文件直接写入舞台和存档文件夹,并使用加载命令的删除选项将其从 Snowflake 中删除 - 不理想并且有点变通方法。

提前非常感谢。我想没必要说我是 Snowflake 的新手 :-)

【问题讨论】:

【参考方案1】:

实际上,所有这些选项都是可行的。只是取决于你的喜好。但是,请注意选项 2。您不需要创建审计表。您可以直接在 Snowflake 中获取特定表或表的加载历史记录。 https://docs.snowflake.com/en/sql-reference/functions/copy_history.html

【讨论】:

谢谢迈克。我决定继续使用管道,因为它是一个简单的选择,即使它有一定的限制。我不知道的一件事是如何触发流向下的处理——只是雪花不支持触发器。所以我可以通过管道加载一个阶段表,然后我不能无缝处理数据。 您可以使用 Streams 和 Tasks 来模拟触发器。在数据加载到的表上添加了一个流。使用 WHEN SYSTEM$STREAM_HAS_DATA = True 选项频繁运行任务。一旦有东西进入流,任务将看到新数据并执行任务中的语句。文档或社区中有很多很好的例子可以帮助你。【参考方案2】:

这就是我在最近一次迁移中的做法。顺便说一句,我不明白需要在 S3 中写入文件。您可以直接写入更安全的内部阶段。 Snowflake 自动加密 S3 中的数据。如果您将文件复制到 S3(我假设您使用的是外部阶段),您将负责保护它的安全。无论如何,我如何在迁移中合并存档过程如下。

我使用 Python 在 Linux 服务器上使用 BCP 从 SQLSERVER 生成 CSV 转储。然后,python 框架拆分文件并使用 LINUX SPLIT 和 GZIP 命令对其进行压缩。发布后,它会对雪花内部阶段进行“PUT”。我检查 PUT 的输出,如果成功,我将文件移动到 LINUX 上的存档文件夹。我们有一个批处理程序,每周运行一次以清理存档文件夹。

【讨论】:

感谢 Rajib,据我了解,您从 Python 脚本中编排了所有内容,并且您的假设是,一旦文件在内部存储中,您就可以归档原始提取。假设 Python 脚本的职责是将文件传递到内部存储,这是有道理的。不幸的是,在我们的例子中,我们需要知道复制到的结果并理想地触发事件。 嗨安德烈,你的假设是正确的。我不知道你将如何在你的用例中实现这一点,而无需轮询进程的状态。我还没有在 Snowflake 中看到触发器支持。我对轮询机制几乎没有注意,尤其是在云上时。

以上是关于如何检查从 aws S3 到雪花的数据加载结果的主要内容,如果未能解决你的问题,请参考以下文章

从 AWS S3 复制到雪花,同时包含函数

预计将解析从 S3 加载的雪花数据中的列时到达记录末尾

雪花 CDC 从 S3 到雪花

如何将查询结果从雪花直接上传到 S3?

来自 S3(COPY) 的雪花负载数据与来自外部表的负载

将数据从aws s3导入mysql或任何非aws数据库