SnowFlake-Kafka 连接器 -> 登陆表 -> 目标表。如何清理登陆表
Posted
技术标签:
【中文标题】SnowFlake-Kafka 连接器 -> 登陆表 -> 目标表。如何清理登陆表【英文标题】:SnowFlake-Kafka connector -> Landing Table -> Destination table. How to cleanup Landing Table 【发布时间】:2020-05-12 05:31:08 【问题描述】:我正在调查从 Kafka -> SnowFlake/Kafka 连接器 -> SnowFlake 获取数据。不幸的是,连接器似乎只使用了两列(并将整个 JSON 有效负载放在一列中)。所以我创建了一个流/任务来定期将数据从登陆表复制到目标表(使用插入)。一切都运行良好,除了一旦登陆目标表就删除登陆表中的数据。使用流,我知道什么已经登陆。如何删除其余数据?截断似乎要快得多。我是否只是定期运行删除这些条目的删除任务?我还担心执行这些删除的仓库时间。谢谢
【问题讨论】:
您找到适合您的解决方案了吗?如果有,可以分享一下吗? 【参考方案1】:对于多个语句(如插入、删除等)访问相同更改记录的用例,将它们包围在显式事务语句(Begin..Commit)中,这将锁定流。
您可以有一个额外的列,如标志,使用 Begin 锁定流,使用流从暂存中插入到目标表,使用流执行第二次合并到暂存表以标记列标志。
https://docs.snowflake.com/en/user-guide/streams.html#examples
begin;
select * from <stream>;
insert into target_table select columns from <stream> where metadata$action='INSERT' and flag=0;
merge into staging_table st
using (
select column
from stream
where flag = 0) sc
on st.column=sc.column
when matched then update set st.flag=1;
commit;
delete from staging_table where flag=1;
【讨论】:
我为条件设置了什么?我已经为我的插入任务使用了“when SYSTEM$STREAM_HAS_DATA('kafka_check')”。然后我如何在基础表中说 delete record=0 to record = current,其中 current 是我刚刚在跟踪该表的流中使用的记录?我认为我的问题是如何将流状态转换回底层表以进行有效删除?谢谢 另一种解决方法是停止 kafka 连接器,使用流,截断临时表,重新创建流,然后再次打开连接器。 只暂停雪管而不是停止 Kafka 连接器会起作用吗?或者这是否会导致连接器内的错误?如果暂停有效,我会看到类似于:暂停管道 -> 使用数据 -> 截断 -> 重新创建流 -> 恢复雪管的路径。我们正在尝试解决完全相同的问题。以上是关于SnowFlake-Kafka 连接器 -> 登陆表 -> 目标表。如何清理登陆表的主要内容,如果未能解决你的问题,请参考以下文章