如何截断和加载表中的部分数据?
Posted
技术标签:
【中文标题】如何截断和加载表中的部分数据?【英文标题】:How to truncate and load part of data in a table? 【发布时间】:2020-09-24 06:32:08 【问题描述】:我使用的是没有分区机制的雪花表,而是有微分区。 我们有一个新的要求,即在桌子上执行不同类型的满载。
场景: 我有一个包含三列的表:ID、名称、Current_Location 第一天的记录:
Se Name Current_Location Rate
1 A L1 100
2 B L2 200
3 C L3 300
4 D L4 400
5 E L5 500
6 F L6 600
7 G L7 700
8 H L7 800
我的要求是我每天以不同的速率为每个 Current_Location
获取新数据,即
第2天:
Se Name Current_Location Rate
6 P L6 6000
7 G L7 7000
8 H L7 1100
9 Z L7 1200
根据列中的值:Current_Location
,我先截断了以前的记录,然后加载新记录。例如,使用 Current_Location L7
在上述场景中,第 1 天的 L7 有两条记录,但在第 2 天,我得到了 3 条记录。
所以我必须截断
7 G L7 700
8 H L7 800
然后将 Day2 中的所有三个新记录加载到我的表中。 L6也是如此。截断和加载后的最终表应如下所示:
Se Name Current_Location Rate
1 A L1 100
2 B L2 200
3 C L3 300
4 D L4 400
5 E L5 500
6 P L6 6000
7 G L7 7000
8 H L7 1100
9 Z L7 1200
为了实现这一点,我想到了实现:
-
将所有新数据插入到 target_table 中。此时,target_table 也包含新旧记录。
然后通过选择 max(Current_Location) 在其上创建一个视图并将其公开给用户。
create or replace view final_view as select * from target_table where Current_Location = (select max(Current_Location) from target_table)
但是我必须稍后用旧的 Current_Location 截断旧记录,一旦我删除它们,我的表和视图就会变得相同。 我正在使用 Snowflake 数据库,尽管有微分区,但其中没有分区的概念。 有没有什么有效的方法来做同样的操作?
Edit1:我探索了合并查询:
MERGE INTO t1 USING t2 ON t1.t1Key = t2.t2Key
WHEN MATCHED AND t2.marked = 1 THEN DELETE
WHEN MATCHED AND t2.isNewStatus = 1 THEN UPDATE SET val = t2.newVal, status = t2.newStatus
WHEN MATCHED THEN UPDATE SET val = t2.newVal
WHEN NOT MATCHED THEN INSERT (val, status) VALUES (t2.newVal, t2.newStatus);
一旦一个案例被执行,是否也会检查下一个案例?
例如:一行匹配 DELETE 条件,它被删除。现在我必须将具有相同键的行插入表中。 WHEN NOT MACTHED
的案子也会被执行吗?或者一旦一个case被执行,控件就退出MERGE Query?
【问题讨论】:
我认为您需要在此处提供更多信息。此外,这看起来像是您已经问过的重复问题?这里:***.com/questions/64044360/… 是的,我有更多关于这个问题的信息,因为我有办法做到这一点(我还在那个问题中发布了我的做法)。我正在编辑它,但在将我的更改从记事本添加到浏览器之前错误地保存了它。就在不久前为主持人干预提出了一个标志。所以我在那里发布了这个问题。 【参考方案1】:我看到MERGE
语句会造成麻烦,因为目标表中的行将与源中的多行连接,即如here 所解释的那样是不确定的
你呢?首先delete
使用来自新数据集的唯一值Current_Location
的记录,然后insert
收到所有新记录?
DELETE FROM T1
WHERE Current_Location in (SELECT DISTINCT Current_Location FROM T2);
INSERT INTO T1
SELECT * FROM T2;
这似乎产生了您需要的决赛桌,如果我遗漏了什么,请纠正我。
为了跟踪此类操作,您可以引入两个历史表 T3、T4,其中 T3 保存每日删除的记录以及删除日期/时间戳,T4 保存新收到的记录以及插入日期/时间戳
所以首先,执行类似
INSERT INTO T3
Select *, CURRENT_DATE // Or CURRENT_TIMESTAMP safer if multiple runs received on same day
FROM T1
WHERE Current_Location in (SELECT DISTINCT Current_Location FROM T2);
INSERT INTO T4
Select *, CURRENT_DATE // Or CURRENT_TIMESTAMP safer if multiple runs received on same day
FROM T2;
然后执行建议的删除和插入语句,在这种情况下,您可以考虑通过创建计划的task 定期清理历史记录表,该计划将删除超过某些天的阈值,即 90 天或更长时间您的业务需求,只是为了避免积累如此多的历史记录。
注意:您可以考虑使用streams,而不是为正确的 CDC 建议的历史表,但我选择了更简单的解决方案
【讨论】:
【参考方案2】:您是否尝试过使用 STREAMS?
流类似于变更数据捕获。某个表上的流向您准确显示有关该表的哪些行已添加/删除/更新。因此,您可以使上述逻辑更容易。
更多信息和用于选择更改的此类流的结构可以在此处找到:https://docs.snowflake.com/en/user-guide/streams.html
除此之外,您还可以尝试使用 TASKS。任务是运行特定语句(例如您的截断)并在特定条件下执行的作业(例如您上面的案例)。更多信息请看这里:https://docs.snowflake.com/en/user-guide/tasks-intro.html
最后一个编辑提示另一个提示:也许 MERGE 也适合您的需求,因为它结合了 INSERT 和 UPDATE/DELETE。 https://docs.snowflake.com/en/sql-reference/sql/merge.html
【讨论】:
A stream on a certain table shows you exactly which rows have been added/removed/updated regarding the table
我的数据来自不同的文件,它首先进入一个暂存表。只有在那之后,我才编写了一个逻辑来截断旧数据并插入新数据。我可以在这种情况下使用 Streams 吗?
我探索了合并选项并在问题中添加了我的疑问,因为它太大了,无法在这里发表评论。你能检查一下吗?
您可以尝试在暂存表上创建一个流(CREATE STREAM x ON TABLE y),向表中添加一些数据,然后查询您的流(SELECT * FROM x)。也许结果是进一步处理的一些很好的输入? (也许我弄错了,但是当您的 source_table 与您的 target_table 相同时,您可能会使用处理后删除的临时表之类的东西)
关于您的 MERGE 问题:为什么要删除然后使用相同的键插入?那为什么不更新呢?关于条件:这取决于那里的条件以及您是否使用 UPDATE、DELETE 和/或 INSERT。您可以在有关非确定性和确定性查询 (docs.snowflake.com/en/sql-reference/sql/…) 的部分中找到更多信息,但是当条件匹配时,它会执行(据我所知)
由于以下情况,我无法运行更新:如果我在第 1 天有 3 行用于 L7,而在第 2 天我只得到 L7 的一行,则最终表应该只包含我们从中获得的一行第二天。以上是关于如何截断和加载表中的部分数据?的主要内容,如果未能解决你的问题,请参考以下文章
如何在数据加载之前截断 AWS Glue 作业中的 RDS 表?
FileStream 和 StreamWriter - 如何在写入后截断文件的其余部分?