雪花 CDC 从 S3 到雪花

Posted

技术标签:

【中文标题】雪花 CDC 从 S3 到雪花【英文标题】:Snowflake CDC from S3 to Snowflake 【发布时间】:2021-05-12 15:03:01 【问题描述】:

我有 S3 存储桶,它每天早上从 oracle 获取数据到 S3。 通过使用 Snowpipe,我将数据加载到名为 t1 的 Snowflake 表中。 现在我正在创建一个新表 t2,它将包含来自表 t1 的 cdc 数据。

我知道我们可以使用任务和流来捕获这一点。然而,流将捕获记录上的事件插入、更新或删除。但在我们的例子中,我们将数据附加到 t1 中,并尝试将 id 列的更新加载到 t2 中。

例如在第 1 天,表 T1 将被加载

Table t1
id      salary      load_date
--      ------      ----------
1       12000       12-03-2021
2       32000       12-03-2021
3       33000       12-03-2021

表 t2 将直接从 t1 加载

表 t2

id      salary      load_date
--      ------      ----------
1       12000       12-03-2021
2       32000       12-03-2021
3       33000       12-03-2021

现在是第 2 天,因为我们直接将数据附加到表 t1 中。 看起来是这样的

id      salary      load_date
--      ------      ----------
1       12000       12-03-2021
2       32000       12-03-2021
3       33000       12-03-2021
5       12500       13-03-2021
2       45000       13-03-2021

现在表 t2 应该有 id 2 的更新值和 id 5 的新值, 如下

id      salary      load_date
--      ------      ----------
1       12000       12-03-2021
2       45000       12-03-2021
3       33000       12-03-2021
5       12500       13-03-2021

我感觉流不会有帮助,因为主表 (t1) 中没有更新, 因此计划在 t1 的表 t2 上使用带有合并语句的任务 像

CREATE OR REPLACE TASK EMPLOYEES_CDC
  WAREHOUSE = COMPUTE_WH
  SCHEDULE = 'USING CRON 0 9-17 * * SUN America/Los_Angeles'
AS
merge INTO t2 using (select * from t2 where load_date = current_date) t3 on t2.id = t3.id
      when matched then update t2.salary = t1.salary
      when not matched then insert (id,salary,load_date) values (t3.id,t3.salary,t3.load_date)

因为合并看起来有点贵。

请建议这是最佳方式还是有更好的方式。

【问题讨论】:

如何识别 T1 中的删除?是否有另一列包含操作类型或其他内容? @SimonDarr 没有删除.. 只需每天将表直接从 S3 追加到 t1。不考虑删除更新。并且操作类型没有其他列 由于您的源系统从不删除数据,因此我认为您的一般附加/合并过程很好。 @danD,我有类似的要求,我不能使用流,你能帮助理解你的查询 @Asher 您对解决方案的哪一部分感到困惑。您使用流面临的问题是什么 【参考方案1】:

您描述的解决方案是解决此问题的最佳方法。 流肯定会有所帮助,您只需将合并语句更改为使用流而不是表t1。因此,您的合并语句将只处理增量。

【讨论】:

我同意使用 Stream,但我唯一想知道的是因为在表 t1 中它总是直接插入并且没有更新.. 那么流的用途是什么。是不是它会与实际表相同。比如它会有什么不同。 Stream 会有新的记录插入到 t1,意思是 delta,如果你在合并中使用 t1,你需要找出最新的记录 Dan 正在使用加载日期来识别最新记录。由于表格自然会按该列聚集,所以我认为它应该和使用流一样快?【参考方案2】:

您为什么不参考这个content,它可以帮助您确定如何应用流。

您需要确保在外部表上创建流然后使用它。虽然外部表上的流有一些限制,但您也要检查您的用例。

【讨论】:

以上是关于雪花 CDC 从 S3 到雪花的主要内容,如果未能解决你的问题,请参考以下文章

每天从 S3 存储桶加载一个新文件到雪花表

如何检查从 aws S3 到雪花的数据加载结果

从 AWS S3 复制到雪花,同时包含函数

将数据从雪花卸载到 s3 时,如何将日期时间戳添加到 zip 文件?

预计将解析从 S3 加载的雪花数据中的列时到达记录末尾

如何将查询结果从雪花直接上传到 S3?