如何截断和加载表中的部分数据?

Posted

技术标签:

【中文标题】如何截断和加载表中的部分数据?【英文标题】:How to truncate and load part of data in a table? 【发布时间】:2020-09-24 06:32:08 【问题描述】:

我使用的是没有分区机制的雪花表,而是有微分区。 我们有一个新的要求,即在桌子上执行不同类型的满载。

场景: 我有一个包含三列的表:ID、名称、Current_Location 第一天的记录:

Se  Name    Current_Location     Rate
1   A           L1               100
2   B           L2               200
3   C           L3               300
4   D           L4               400
5   E           L5               500
6   F           L6               600
7   G           L7               700
8   H           L7               800

我的要求是我每天以不同的速率为每个 Current_Location 获取新数据,即 第2天:

Se  Name    Current_Location     Rate
6   P           L6               6000
7   G           L7               7000
8   H           L7               1100
9   Z           L7               1200

根据列中的值:Current_Location,我先截断了以前的记录,然后加载新记录。例如,使用 Current_Location L7 在上述场景中,第 1 天的 L7 有两条记录,但在第 2 天,我得到了 3 条记录。 所以我必须截断

7   G           L7               700
8   H           L7               800

然后将 Day2 中的所有三个新记录加载到我的表中。 L6也是如此。截断和加载后的最终表应如下所示:

Se  Name    Current_Location     Rate
1   A           L1               100
2   B           L2               200
3   C           L3               300
4   D           L4               400
5   E           L5               500
6   P           L6               6000
7   G           L7               7000
8   H           L7               1100
9   Z           L7               1200

为了实现这一点,我想到了实现:

    将所有新数据插入到 target_table 中。此时,target_table 也包含新旧记录。 然后通过选择 max(Current_Location) 在其上创建一个视图并将其公开给用户。

create or replace view final_view as select * from target_table where Current_Location = (select max(Current_Location) from target_table)

但是我必须稍后用旧的 Current_Location 截断旧记录,一旦我删除它们,我的表和视图就会变得相同。 我正在使用 Snowflake 数据库,尽管有微分区,但其中没有分区的概念。 有没有什么有效的方法来做同样的操作?

Edit1:我探索了合并查询:

MERGE INTO t1 USING t2 ON t1.t1Key = t2.t2Key
    WHEN MATCHED AND t2.marked = 1 THEN DELETE
    WHEN MATCHED AND t2.isNewStatus = 1 THEN UPDATE SET val = t2.newVal, status = t2.newStatus
    WHEN MATCHED THEN UPDATE SET val = t2.newVal
    WHEN NOT MATCHED THEN INSERT (val, status) VALUES (t2.newVal, t2.newStatus);

一旦一个案例被执行,是否也会检查下一个案例? 例如:一行匹配 DELETE 条件,它被删除。现在我必须将具有相同键的行插入表中。 WHEN NOT MACTHED 的案子也会被执行吗?或者一旦一个case被执行,控件就退出MERGE Query?

【问题讨论】:

我认为您需要在此处提供更多信息。此外,这看起来像是您已经问过的重复问题?这里:***.com/questions/64044360/… 是的,我有更多关于这个问题的信息,因为我有办法做到这一点(我还在那个问题中发布了我的做法)。我正在编辑它,但在将我的更改从记事本添加到浏览器之前错误地保存了它。就在不久前为主持人干预提出了一个标志。所以我在那里发布了这个问题。 【参考方案1】:

我看到MERGE 语句会造成麻烦,因为目标表中的行将与源中的多行连接,即如here 所解释的那样是不确定的

你呢?首先delete使用来自新数据集的唯一值Current_Location的记录,然后insert收到所有新记录?

DELETE FROM T1
   WHERE Current_Location in (SELECT DISTINCT Current_Location FROM T2);

INSERT INTO T1
   SELECT * FROM T2;

这似乎产生了您需要的决赛桌,如果我遗漏了什么,请纠正我。

为了跟踪此类操作,您可以引入两个历史表 T3、T4,其中 T3 保存每日删除的记录以及删除日期/时间戳,T4 保存新收到的记录以及插入日期/时间戳

所以首先,执行类似

INSERT INTO T3 
Select *, CURRENT_DATE   // Or CURRENT_TIMESTAMP safer if multiple runs received on same day
     FROM T1
       WHERE Current_Location in (SELECT DISTINCT Current_Location FROM T2);

INSERT INTO T4 
Select *, CURRENT_DATE   // Or CURRENT_TIMESTAMP safer if multiple runs received on same day
     FROM T2;

然后执行建议的删除和插入语句,在这种情况下,您可以考虑通过创建计划的task 定期清理历史记录表,该计划将删除超过某些天的阈值,即 90 天或更长时间您的业​​务需求,只是为了避免积累如此多的历史记录。

注意:您可以考虑使用streams,而不是为正确的 CDC 建议的历史表,但我选择了更简单的解决方案

【讨论】:

【参考方案2】:

您是否尝试过使用 STREAMS?

流类似于变更数据捕获。某个表上的流向您准确显示有关该表的哪些行已添加/删除/更新。因此,您可以使上述逻辑更容易。

更多信息和用于选择更改的此类流的结构可以在此处找到:https://docs.snowflake.com/en/user-guide/streams.html

除此之外,您还可以尝试使用 TASKS。任务是运行特定语句(例如您的截断)并在特定条件下执行的作业(例如您上面的案例)。更多信息请看这里:https://docs.snowflake.com/en/user-guide/tasks-intro.html

最后一个编辑提示另一个提示:也许 MERGE 也适合您的需求,因为它结合了 INSERT 和 UPDATE/DELETE。 https://docs.snowflake.com/en/sql-reference/sql/merge.html

【讨论】:

A stream on a certain table shows you exactly which rows have been added/removed/updated regarding the table 我的数据来自不同的文件,它首先进入一个暂存表。只有在那之后,我才编写了一个逻辑来截断旧数据并插入新数据。我可以在这种情况下使用 Streams 吗? 我探索了合并选项并在问题中添加了我的疑问,因为它太大了,无法在这里发表评论。你能检查一下吗? 您可以尝试在暂存表上创建一个流(CREATE STREAM x ON TABLE y),向表中添加一些数据,然后查询您的流(SELECT * FROM x)。也许结果是进一步处理的一些很好的输入? (也许我弄错了,但是当您的 source_table 与您的 target_table 相同时,您可能会使用处理后删除的临时表之类的东西) 关于您的 MERGE 问题:为什么要删除然后使用相同的键插入?那为什么不更新呢?关于条件:这取决于那里的条件以及您是否使用 UPDATE、DELETE 和/或 INSERT。您可以在有关非确定性和确定性查询 (docs.snowflake.com/en/sql-reference/sql/…) 的部分中找到更多信息,但是当条件匹配时,它会执行(据我所知) 由于以下情况,我无法运行更新:如果我在第 1 天有 3 行用于 L7,而在第 2 天我只得到 L7 的一行,则最终表应该只包含我们从中获得的一行第二天。

以上是关于如何截断和加载表中的部分数据?的主要内容,如果未能解决你的问题,请参考以下文章

如何在数据加载之前截断 AWS Glue 作业中的 RDS 表?

由于数据中的“雪花问题”导致部分加载

FileStream 和 StreamWriter - 如何在写入后截断文件的其余部分?

SQL中如何查询A表中的数据有部分存在B表中并显示出来

MySQL: 13 基于LRU算法淘汰Buffer Pool中的部分缓存

数据库部分(MySql)_3