雪花 CDC 从 S3 到雪花
Posted
技术标签:
【中文标题】雪花 CDC 从 S3 到雪花【英文标题】:Snowflake CDC from S3 to Snowflake 【发布时间】:2021-05-12 15:03:01 【问题描述】:我有 S3 存储桶,它每天早上从 oracle 获取数据到 S3。 通过使用 Snowpipe,我将数据加载到名为 t1 的 Snowflake 表中。 现在我正在创建一个新表 t2,它将包含来自表 t1 的 cdc 数据。
我知道我们可以使用任务和流来捕获这一点。然而,流将捕获记录上的事件插入、更新或删除。但在我们的例子中,我们将数据附加到 t1 中,并尝试将 id 列的更新加载到 t2 中。
例如在第 1 天,表 T1 将被加载
Table t1
id salary load_date
-- ------ ----------
1 12000 12-03-2021
2 32000 12-03-2021
3 33000 12-03-2021
表 t2 将直接从 t1 加载
表 t2
id salary load_date
-- ------ ----------
1 12000 12-03-2021
2 32000 12-03-2021
3 33000 12-03-2021
现在是第 2 天,因为我们直接将数据附加到表 t1 中。 看起来是这样的
id salary load_date
-- ------ ----------
1 12000 12-03-2021
2 32000 12-03-2021
3 33000 12-03-2021
5 12500 13-03-2021
2 45000 13-03-2021
现在表 t2 应该有 id 2 的更新值和 id 5 的新值, 如下
id salary load_date
-- ------ ----------
1 12000 12-03-2021
2 45000 12-03-2021
3 33000 12-03-2021
5 12500 13-03-2021
我感觉流不会有帮助,因为主表 (t1) 中没有更新, 因此计划在 t1 的表 t2 上使用带有合并语句的任务 像
CREATE OR REPLACE TASK EMPLOYEES_CDC
WAREHOUSE = COMPUTE_WH
SCHEDULE = 'USING CRON 0 9-17 * * SUN America/Los_Angeles'
AS
merge INTO t2 using (select * from t2 where load_date = current_date) t3 on t2.id = t3.id
when matched then update t2.salary = t1.salary
when not matched then insert (id,salary,load_date) values (t3.id,t3.salary,t3.load_date)
因为合并看起来有点贵。
请建议这是最佳方式还是有更好的方式。
【问题讨论】:
如何识别 T1 中的删除?是否有另一列包含操作类型或其他内容? @SimonDarr 没有删除.. 只需每天将表直接从 S3 追加到 t1。不考虑删除更新。并且操作类型没有其他列 由于您的源系统从不删除数据,因此我认为您的一般附加/合并过程很好。 @danD,我有类似的要求,我不能使用流,你能帮助理解你的查询 @Asher 您对解决方案的哪一部分感到困惑。您使用流面临的问题是什么 【参考方案1】:您描述的解决方案是解决此问题的最佳方法。
流肯定会有所帮助,您只需将合并语句更改为使用流而不是表t1
。因此,您的合并语句将只处理增量。
【讨论】:
我同意使用 Stream,但我唯一想知道的是因为在表 t1 中它总是直接插入并且没有更新.. 那么流的用途是什么。是不是它会与实际表相同。比如它会有什么不同。 Stream 会有新的记录插入到 t1,意思是 delta,如果你在合并中使用 t1,你需要找出最新的记录 Dan 正在使用加载日期来识别最新记录。由于表格自然会按该列聚集,所以我认为它应该和使用流一样快?【参考方案2】:您为什么不参考这个content,它可以帮助您确定如何应用流。
您需要确保在外部表上创建流然后使用它。虽然外部表上的流有一些限制,但您也要检查您的用例。
【讨论】:
以上是关于雪花 CDC 从 S3 到雪花的主要内容,如果未能解决你的问题,请参考以下文章