将 MERGE 语句的更新列审计为不同表中的行

Posted

技术标签:

【中文标题】将 MERGE 语句的更新列审计为不同表中的行【英文标题】:Audit the Update columns of a MERGE statement as rows in a different table 【发布时间】:2020-09-28 13:48:19 【问题描述】:

我正在努力实现以下目标

合并 TARGET 和 SOURCE 表。 使用输出子句审核更改。

我可以使用 CASE 语句执行以下操作:

审计插入和删除很容易。

我将插入和删除转换为以下格式:

| Action   | Record | ChangedFrom | ChangedTo |  
|----------|--------|-------------|-----------|  
| Inserted | 1      | NULL        | 1         |  
| Deleted  | 2      | 2           | NULL      |  

我无法获得:

| Action  | Record | ChangedFrom        | ChangedTo         |  
|---------|--------|--------------------|-------------------|  
| UPDATED | 1      | ColA_Content_Before| ColA_Content_After| 

Merge 语句将更改后的值输出为每行的列数。 我需要将这些列对更改为行,以便记录/行的每个更改的列都成为审计表中的一个条目。 同样在这里,为了示例,我使用了 3 列,实际上,当我不知道列数是多少时,我需要它来工作。

目标表

| Id | ColA | ColB   | ColC   |
|----|------|--------|--------|
| 1  | Orig | Orig_B | Orig_C |

源表

| Id | ColA       | ColB       | ColC   |
|----|------------|------------|--------|
| 1  | Modified_A | Modified_B | Orig_C |

合并结果

| Id | ColA | ColB   | ColC   | Id | ColA      | ColB       | ColC |
|---:|------|--------|--------|----|-----------|------------|------|
| 1  | Orig | Orig_B | Orig_C | 1  | Modifed_A | Modified_C |      |

我需要这些列作为行

|  Action | Record | ChangedFrom | ChangedTo  |
|--------:|--------|-------------|------------|
| Updated | 1      | Orig        | Modified_A |
| Updated | 1      | Orig_B      | MOdified_B |

我尝试过的方法:

在 proc 中使用 MERGE 来执行批处理语句 在这里,我尝试使用游标遍历行,然后在 case 语句中插入 cols。 使用动态 sql 并透视表。

但是没有成功。

任何指针?

要复制的代码

CREATE TABLE [dbo].[Test_Source]
(
   Id int NOT NULL,
   ColA VARCHAR(50) NOT NULL,
   ColB VARCHAR(50) NOT NULL,
   ColC VARCHAR(50) NOT NULL,
   ColD VARCHAR(50) NOT NULL,
   [UDate] [DATETIME] NOT NULL
) ON [PRIMARY];
GO
CREATE TABLE [dbo].[Test_Target]
(
   Id int NOT NULL,
   ColA VARCHAR(50) NOT NULL,
   ColB VARCHAR(50) NOT NULL,
   ColC VARCHAR(50) NOT NULL,
   ColD VARCHAR(50) NOT NULL,
   [UDate] [DATETIME] NOT NULL
) ON [PRIMARY];
GO


---Insert some test values
INSERT INTO [dbo].[Test_Source]
(
   Id,ColA,ColB,ColC,ColD,UDate
)
VALUES
(
  1,'ColA','ColB','ColC','ColD',GETDATE()
);

INSERT INTO [dbo].[Test_Target]
(
   Id,ColA,ColB,ColC,ColD,UDate
)
VALUES
(
  1,'ColA_After','ColB_After','ColC_After','ColD_After',GETDATE()
);

CREATE PROCEDURE MERGE_TEST 
As
BEGIN
        DECLARE @MERGERESULTS TABLE 
        (
         [Action] nvarchar(50),
         Id_After int ,
         ColA_After nvarchar(50),
         ColB_After nvarchar(50),
         ColC_After nvarchar(50),
         ColD_After nvarchar(50),
         UDate_After nvarchar(50),
         Id_Before int,
         ColA_Before nvarchar(50),
         ColB_Before nvarchar(50),
         ColC_Before nvarchar(50),
         ColD_Before nvarchar(50),
         UDate_Before nvarchar(50)
        );

        DECLARE @AUDITRESULTS TABLE 
        (
         [Action] nvarchar(50),
         Id_After int ,
         ChangedFrom nvarchar(50),
         ChangedTo nvarchar(50)      
         );

        DECLARE
         @cols AS NVARCHAR(MAX),
         @query  AS NVARCHAR(MAX)

        MERGE Test_Target as T
        USING Test_Source as S
        ON T.Id = S.Id
        WHEN MATCHED AND
        T.ColA <> S.ColA OR
        T.ColB <> S.ColB OR
        T.ColC <> S.ColC OR
        T.ColD <> S.ColD OR
        T.UDate <> S.UDate 
        THEN UPDATE SET
        T.ColA = S.ColA ,
        T.ColB = S.ColB ,
        T.ColC = S.ColC ,
        T.ColD = S.ColD ,
        T.UDate = S.UDate 
        WHEN NOT MATCHED BY TARGET
        THEN INSERT 
        (Id,ColA,ColB,ColC,ColD,UDate)
         VALUES
        (S.Id,S.ColA,S.ColB,S.ColC,S.ColD,S.UDate)
         WHEN NOT MATCHED BY SOURCE 
        THEN DELETE 
        OUTPUT $action,Inserted.*,Deleted.* into @MERGERESULTS ;

        select * from @MERGERESULTS;

        select * into #mytemp from @MERGERESULTS

        -- Dynamic sql to pivot table
            select @cols = STUFF((SELECT distinct ',' + QUOTENAME(ColA_After) 
                        from @MERGERESULTS
                FOR XML PATH(''), TYPE
                ).value('.', 'NVARCHAR(MAX)') 
                 ,1,1,'')

            set @query = 'Select * from #mytemp
                    pivot 
                    (
                        max(Id_After)
                        for ColA_After in (' + @cols + ')
                    ) p '

            select  @Cols as 'Columns';

            execute(@query)

     -- Dynamic sql to pivot Table

-- Trying cursor and row count (incomplete)
            --declare @Id int

            --set rowcount 0
            --select 
            --t.Action ,
            --CASE WHEN (t.ColA_After <> t.ColA_Before)
            --  THEN t.ColA_After  
            --END,
            --CASE WHEN (t.ColA_After <> t.ColA_Before)
            --  THEN t.ColA_Before  
            --END,

            --CASE WHEN (t.ColB_After <> t.ColB_Before)
            --  THEN t.ColB_After  
            --END,
            --CASE WHEN (t.ColB_After <> t.ColB_Before)
            --  THEN t.ColB_Before  
            --END,

            --CASE WHEN (t.ColC_After <> t.ColC_Before)
            --  THEN t.ColC_After  
            --END,
            --CASE WHEN (t.ColC_After <> t.ColC_Before)
            --  THEN t.ColC_Before 
            --END,

            --CASE WHEN (t.ColD_After <> t.ColD_Before)
            --  THEN t.ColD_After  
            --END,
            --CASE WHEN (t.ColD_After <> t.ColD_Before)
            --  THEN t.ColD_Before  
            --END
            --from #mytemp t
            --inner join #mytemp t1
            --on t.Id_After = t1.Id_Before
            --where 
            --t.ColA_After <> t1.ColA_Before OR
            --t.ColB_After <> t1.ColB_Before OR
            --t.ColC_After <> t1.ColC_Before OR
            --t.ColD_After <> t1.ColD_Before 
            --set rowcount 1

            --select @Id = Id_After from #mytemp

            --while @@rowcount <> 0
            --begin
            --  set rowcount 0
            --  (select * from #mytemp where Id_After = @Id)

            --  delete #mytemp where au_id = @au_id

            --  set rowcount 1
            --  select @au_id = au_id from #mytemp
            --end
            --set rowcount 0

END

EXEC MERGE_TEST

这似乎是一个切线

【问题讨论】:

【参考方案1】:

我通过使用 unpivot 实现了上述目标

不确定这是否是最好的方法,但可以完成工作。

我的解决方案:

使用过程并选择 MERGE 结果到 TABLE 变量中,列后缀为 _Before 和 _After。 使用 where 条件审核插入和更新。 使用 UNPIVOT 审核更新。 pivot col 值可用于获取更新后的列名。 UNPIVOT 需要所有 cols 具有相同的数据类型,因此可以对它们进行 CAST/CONVERT,我刚刚在选择时将所有列存储为 nvarchars()。

创建审核脚本

SET ANSI_NULLS ON
GO

SET QUOTED_IDENTIFIER ON
GO

CREATE TABLE [dbo].[Audit](
    [Id] [int] IDENTITY(1,1) NOT NULL,
    [ChangedFrom] [nvarchar](100) NULL,
    [ChangedTo] [nvarchar](100) NULL,
    [Action] [nvarchar](100) NULL,
    [ChangedDate] [datetime2](7) NULL,
    [Column] [nvarchar](100) NULL
 CONSTRAINT [PK_Audit] PRIMARY KEY CLUSTERED 
(
    [Id] ASC
)WITH (PAD_INDEX = OFF, STATISTICS_NORECOMPUTE = OFF, IGNORE_DUP_KEY = OFF, ALLOW_ROW_LOCKS = ON, ALLOW_PAGE_LOCKS = ON, OPTIMIZE_FOR_SEQUENTIAL_KEY = OFF) ON [PRIMARY]
) ON [PRIMARY]
GO

创建过程脚本

CREATE PROCEDURE MERGE_AUDIT
As
BEGIN
        --Temporary TABLE variable to store the MERGE results
        DECLARE @MERGERESULTS TABLE 
        (
         [Action] nvarchar(50),
         Id_After nvarchar(50),
         ColA_After nvarchar(50),
         ColB_After nvarchar(50),
         ColC_After nvarchar(50),
         ColD_After nvarchar(50),
         UDate_After nvarchar(50),
         Id_Before nvarchar(50),
         ColA_Before nvarchar(50),
         ColB_Before nvarchar(50),
         ColC_Before nvarchar(50),
         ColD_Before nvarchar(50),
         UDate_Before nvarchar(50)
        );

        MERGE Test_Target as T
        USING Test_Source as S
        ON T.Id = S.Id
        WHEN MATCHED AND
        T.ColA <> S.ColA OR
        T.ColB <> S.ColB OR
        T.ColC <> S.ColC OR
        T.ColD <> S.ColD OR
        T.UDate <> S.UDate 
        THEN UPDATE SET
        T.ColA = S.ColA ,
        T.ColB = S.ColB ,
        T.ColC = S.ColC ,
        T.ColD = S.ColD ,
        T.UDate = S.UDate 
        WHEN NOT MATCHED BY TARGET
        THEN INSERT 
        (Id,ColA,ColB,ColC,ColD,UDate)
         VALUES
        (S.Id,S.ColA,S.ColB,S.ColC,S.ColD,S.UDate)
         WHEN NOT MATCHED BY SOURCE 
        THEN DELETE 
        OUTPUT $action,Inserted.*,Deleted.* into @MERGERESULTS ;

        -- Audit the Inserts
        insert into [Audit]
        (
            ChangedFrom,
            ChangedTo,
            Action,
            ChangedDate
        )
        select 
            Id_Before,
            Id_After,
            [Action],
            GETDATE()
        from @MERGERESULTS where [Action] = 'INSERT' ;

        --Audit the DELETES
        insert into [Audit]
        (
            ChangedFrom,
            ChangedTo,
            Action,
            ChangedDate
        )
        select 
            Id_Before,
            Id_After,
            [Action],
            GETDATE()
        from @MERGERESULTS where [Action] = 'DELETE' ;

        --unpivot to audit the UPDATES,
        --USING REPLACE to remove the AFTER suffix when getting the column name
        -- When storing the MERGE results in the TABLE variable having all columns as NVARCHAR is needed as to UNPIVOT the datataypes are needed to be same
        insert into [Audit]
        (
            ChangedFrom,
            ChangedTo,
            Action,
            ChangedDate,
            [Column]
        )
        select * from (select 
            ChangedFrom,
            ChangedTo,
            [Action],
            GETDATE() as ChangedDate,
            REPLACE(p1col, '_After', '')  as [Column]
            from 
            (select 
            *
            from @MERGERESULTS
            UNPIVOT
            (
                   ChangedFrom
                   FOR pcol IN (ColA_Before,ColB_Before,ColC_Before,ColD_Before)
            ) AS P
            UNPIVOT
            (
                ChangedTo 
                For p1col IN (ColA_After,ColB_After,ColC_After,ColD_After )
            ) p1) as a
            where 
            a.pcol = 'ColA_Before' and a.p1col = 'ColA_After' OR
            a.pcol = 'ColB_Before' and a.p1col = 'ColB_After' OR
            a.pcol = 'ColC_Before' and a.p1col = 'ColC_After' OR
            a.pcol = 'ColD_Before' and a.p1col = 'ColD_After') as o -- this where condition is needed to get only relevant results, else we will get all the cols as it is a cross join
        where o.ChangedFrom <> o.ChangedTo;
END

执行结果

EXEC MERGE_AUDIT

【讨论】:

以上是关于将 MERGE 语句的更新列审计为不同表中的行的主要内容,如果未能解决你的问题,请参考以下文章

带条件的 Oracle SQL 合并语句

oracle merge into用法

update和insert的区别

SQL 基础之子查询多表插入merge 语句跟踪一段时间数据变化(二十)

Mysql根据不同的行更新表中的行值

SQL Server merge用法