SnowFlake-Kafka 连接器 -> 登陆表 -> 目标表。如何清理登陆表

Posted

技术标签:

【中文标题】SnowFlake-Kafka 连接器 -> 登陆表 -> 目标表。如何清理登陆表【英文标题】:SnowFlake-Kafka connector -> Landing Table -> Destination table. How to cleanup Landing Table 【发布时间】:2020-05-12 05:31:08 【问题描述】:

我正在调查从 Kafka -> SnowFlake/Kafka 连接器 -> SnowFlake 获取数据。不幸的是,连接器似乎只使用了两列(并将整个 JSON 有效负载放在一列中)。所以我创建了一个流/任务来定期将数据从登陆表复制到目标表(使用插入)。一切都运行良好,除了一旦登陆目标表就删除登陆表中的数据。使用流,我知道什么已经登陆。如何删除其余数据?截断似乎要快得多。我是否只是定期运行删除这些条目的删除任务?我还担心执行这些删除的仓库时间。谢谢

【问题讨论】:

您找到适合您的解决方案了吗?如果有,可以分享一下吗? 【参考方案1】:

对于多个语句(如插入、删除等)访问相同更改记录的用例,将它们包围在显式事务语句(Begin..Commit)中,这将锁定流。

您可以有一个额外的列,如标志,使用 Begin 锁定流,使用流从暂存中插入到目标表,使用流执行第二次合并到暂存表以标记列标志。

https://docs.snowflake.com/en/user-guide/streams.html#examples

begin;

select * from <stream>;

insert into target_table select columns from <stream> where metadata$action='INSERT' and flag=0;

merge into staging_table st
using (
select column
  from stream
  where flag = 0) sc
  on st.column=sc.column
  when matched then update set st.flag=1;
commit;
delete from staging_table where flag=1;

【讨论】:

我为条件设置了什么?我已经为我的插入任务使用了“when SYSTEM$STREAM_HAS_DATA('kafka_check')”。然后我如何在基础表中说 delete record=0 to record = current,其中 current 是我刚刚在跟踪该表的流中使用的记录?我认为我的问题是如何将流状态转换回底层表以进行有效删除?谢谢 另一种解决方法是停止 kafka 连接器,使用流,截断临时表,重新创建流,然后再次打开连接器。 只暂停雪管而不是停止 Kafka 连接器会起作用吗?或者这是否会导致连接器内的错误?如果暂停有效,我会看到类似于:暂停管道 -> 使用数据 -> 截断 -> 重新创建流 -> 恢复雪管的路径。我们正在尝试解决完全相同的问题。

以上是关于SnowFlake-Kafka 连接器 -> 登陆表 -> 目标表。如何清理登陆表的主要内容,如果未能解决你的问题,请参考以下文章

新品发布 | RPC-1.35 90GHz精密连接器

MySQL连接查询之内连接左连接右连接自连接

在同一连接器的流程中使用自定义连接器的组件

MySQL连接查询 内连接和外连接的区别

工业M5连接器圆形连接器IP68

如何根据连接器名称获取 Kafka 源连接器架构