大数据实时数据同步OGG异构多路映射同步原表&审计表&只存删除数据表实现方案

Posted 赵延东的一亩三分地

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了大数据实时数据同步OGG异构多路映射同步原表&审计表&只存删除数据表实现方案相关的知识,希望对你有一定的参考价值。

文章目录


前言

这里是后续!!!前一章写的太多,编辑器卡死了,只能再写一篇文章补充啦!!!!
前一篇文章地址:【大数据实时数据同步】OGG多路映射同步原表&审计表&只存删除数据表实现方案(一)

博主所在单位目前使用Oracle GoldenGate将各个业务生产库汇聚到一起做数仓实时ODS平台,源端库可能涉及Oracle、mysql、达梦、Guassdb库。为了做增量etl以及记录每条数据的变更历史:

  • 我们采用异构同步,即源端同步过来的表在ODS新增了一个etltime字段,用来记录当前数据变更时间。
  • 为了记录数据的事务变更历史记录,我们将数据的变更记录映射同步到一张tab_name_audit表中。
  • 为了防止源端业务库误删数据,我们将被删除的数据映射同步到一张tab_name_his表中。
  • 原表映射到ods后还是正常的映射同步dml操作。

至于以上方式是怎么实现的,我这里给大家在本地电脑做一次模拟。至于GoldenGate如何安装,请看博主其他相关文章,这里就不赘述了!!!


十一、将SCOTT下所有已同步的HIS表逻辑同步配置改为DEL表操作

1、首先来看一下抽取进程和应用进程我们要修改成什么样的配置

--------------抽取进程:
GGSCI (11g) 4> edit params e_sc

extract e_sc
userid ogg,password ogg
setenv(NLS_LANG=AMERICAN_AMERICA.AL32UTF8)
setenv(ORACLE_SID="orcl")
reportcount every 10 minutes,rate
numfiles 5000
discardfile ./dirrpt/e_sc.dsc,append,megabytes 1000
warnlongtrans 2h,checkinterval 3m
exttrail ./dirdat/sc
threadoptions maxcommitpropagationdelay 60000
dboptions allowunusedcolumn
tranlogoptions archivedlogonly
tranlogoptions altarchivelogdest primary /u01/archivelog
TRANLOGOPTIONS ALTARCHIVEDLOGFORMAT %t_%s_%r.dbf
dynamicresolution
ddl include mapped
ddloptions addtrandata,report
notcpsourcetimer
nocompressupdates
--fetchoptions nousesnapshot
fetchoptions USEROWID
--GETUPDATEBEFORES
NOCOMPRESSDELETES
----------scott.EMP
table SCOTT.EMP,tokens(
TKN-CSN = @GETENV("TRANSACTION", "CSN"),
TKN-COMMIT-TS = @GETENV ("GGHEADER", "COMMITTIMESTAMP"),
TKN-OP-TYPE = @GETENV ("GGHEADER", "OPTYPE")
);
----------SCOTT.DEPT
table SCOTT.DEPT,tokens(
TKN-CSN = @GETENV("TRANSACTION", "CSN"),
TKN-COMMIT-TS = @GETENV ("GGHEADER", "COMMITTIMESTAMP"),
TKN-OP-TYPE = @GETENV ("GGHEADER", "OPTYPE")
);
----------SCOTT.DEL
table SCOTT.DEL,tokens(
TKN-CSN = @GETENV("TRANSACTION", "CSN"),
TKN-COMMIT-TS = @GETENV ("GGHEADER", "COMMITTIMESTAMP"),
TKN-OP-TYPE = @GETENV ("GGHEADER", "OPTYPE")
);
----------SCOTT.nopk_tab
table SCOTT.NOPK_TAB,keycols(EMPNO,ENAME),tokens(
TKN-CSN = @GETENV("TRANSACTION", "CSN"),
TKN-COMMIT-TS = @GETENV ("GGHEADER", "COMMITTIMESTAMP"),
TKN-OP-TYPE = @GETENV ("GGHEADER", "OPTYPE")
);

----------------------应用进程配置
----R_SC2进程
GGSCI (OGG) 25> edit params r_sc2

replicat r_sc2
setenv(NLS_LANG=AMERICAN_AMERICA.AL32UTF8)
userid ogg,password ogg
reportcount every 30 minutes,rate
reperror default,abend
--reperror default,DISCARD
numfiles 50000
checkpointsecs 40
assumetargetdefs
discardfile ./dirrpt/r_sc2.dsc,append,megabytes 1000
allownoopupdates
ddl &
include mapped &
exclude objname scott.*_audit &
exclude optype create &
        objtype 'table' &
exclude optype drop &
        objtype 'table' &
exclude objtype 'index' &
        objname SCOTT.*_his &
exclude instr 'constraint' &
---exclude instr 'null' &
exclude instr 'trigger' &
exclude instr 'rename to' &
exclude instr 'grant' &
exclude instr 'revoke' &
exclude instr 'analyze'
DDLOPTIONS MAPSESSIONSCHEMA XUM target ogg
ddloptions report
allowduptargetmap
----------DEPT
getinserts
getupdates
getdeletes
noupdatedeletes
NOINSERTDELETES
map SCOTT.DEPT,target SCOTT.DEPT;
updatedeletes
getinserts
ignoreupdates
ignoredeletes
map SCOTT.DEPT,target SCOTT.DEPT_AUDIT,keycols(DEPTNO),colmap(
usedefaults,
csn=@token("TKN-CSN"),
optime=@token("TKN-COMMIT-TS"),
optype=@token("TKN-OP-TYPE"),
inserttime=@token("TKN-COMMIT-TS"),
curdate=@DATENOW()
);
ignoreinserts
getupdates
getdeletes
map SCOTT.DEPT,target SCOTT.DEPT_AUDIT,keycols(DEPTNO),colmap(
usedefaults,
csn=@token("TKN-CSN"),
optime=@token("TKN-COMMIT-TS"),
optype=@token("TKN-OP-TYPE"),
curdate=@DATENOW()
),filter(@strfind(@token("TKN-OP-TYPE"),"PK UPDATE") =0);
ignoreinserts
getupdates
getdeletes
map SCOTT.DEPT,target SCOTT.DEPT_AUDIT,keycols(DEPTNO),colmap(
usedefaults,
csn=@token("TKN-CSN"),
optime=@token("TKN-COMMIT-TS"),
optype=@case(@token("TKN-OP-TYPE"),"PK UPDATE","INSERT"),
inserttime=@token("TKN-COMMIT-TS"),
curdate=@DATENOW()
),filter(@strfind(@token("TKN-OP-TYPE"),"PK UPDATE") >0),insertallrecords;
ignoreinserts
getupdates
getdeletes
map SCOTT.DEPT,target SCOTT.DEPT_AUDIT,keycols(DEPTNO),colmap(
DEPTNO=before.DEPTNO,
csn=@token("TKN-CSN"),
optime=@token("TKN-COMMIT-TS"),
optype=@case(@token("TKN-OP-TYPE"),"PK UPDATE","DELETE"),
curdate=@DATENOW()
),filter(@strfind(@token("TKN-OP-TYPE"),"PK UPDATE") >0);
ignoreinserts
ignoreupdates
getdeletes
noupdatedeletes
INSERTDELETES
MAP SCOTT.DEPT, TARGET SCOTT.DEPT_DEL,keycols(DEPTNO),colmap(
usedefaults
BEFORE_AFTER=@getenv("GGHEADER", "BEFOREAFTERINDICATOR"),
DEL_DATE=@getenv ("GGHEADER", "COMMITTIMESTAMP"),
OP_FLAG=@getenv ("GGHEADER", "OPTYPE")
);
map SCOTT.DEPT,target SCOTT.DEPT_HIS,keycols(DEPTNO);
----------NOPK_TAB
getinserts
getupdates
getdeletes
noupdatedeletes
NOINSERTDELETES
map SCOTT.NOPK_TAB,target SCOTT.NOPK_TAB,keycols(EMPNO,ENAME);
updatedeletes
getinserts
getupdates
getdeletes
map SCOTT.NOPK_TAB,target SCOTT.NOPK_TAB_AUDIT,keycols(EMPNO,ENAME),colmap(
EMPNO=@if(@strfind(@token("tkn-op-type"),"PK UPDATE")>0,before.EMPNO,EMPNO),
ENAME=@if(@strfind(@token("tkn-op-type"),"PK UPDATE")>0,before.ENAME,ENAME)
csn=@token("tkn-csn"),
optime=@token("tkn-commit-ts"),
optype=@if(@strfind(@token("tkn-op-type"),"PK UPDATE")>0,"DELETE",@token("tkn-op-type")),
inserttime=@eval(@strfind(@token("tkn-op-type"),"INSERT")>0,@token("tkn-commit-ts")),
curdate=@DATENOW()
);
ignoreinserts
getupdates
ignoredeletes
map SCOTT.NOPK_TAB,target SCOTT.NOPK_TAB_AUDIT,keycols(EMPNO,ENAME),colmap(
usedefaults,
csn=@token("tkn-csn"),
optime=@token("tkn-commit-ts"),
optype=@case(@token("tkn-op-type"),"PK UPDATE","INSERT"),
inserttime=@token("tkn-commit-ts"),
curdate=@DATENOW()
),filter(@strfind(@token("tkn-op-type"),"PK UPDATE") >0),insertallrecords;
ignoreinserts
ignoreupdates
getdeletes
noupdatedeletes
INSERTDELETES
MAP SCOTT.NOPK_TAB, TARGET SCOTT.NOPK_TAB_DEL,keycols(EMPNO,ENAME),colmap(
usedefaults,
BEFORE_AFTER=@getenv("GGHEADER", "BEFOREAFTERINDICATOR"),
DEL_DATE=@getenv ("GGHEADER", "COMMITTIMESTAMP"),
OP_FLAG=@getenv ("GGHEADER", "OPTYPE")
);
map SCOTT.NOPK_TAB,target SCOTT.NOPK_TAB_HIS,keycols(EMPNO,ENAME);

----R_SC进程:
GGSCI (OGG) 24> edit params r_sc


replicat r_sc
setenv(NLS_LANG=AMERICAN_AMERICA.AL32UTF8)
userid ogg,password ogg
reportcount every 30 minutes,rate
reperror default,abend
numfiles 50000
checkpointsecs 40
assumetargetdefs
discardfile ./dirrpt/r_sc.dsc,append,megabytes 1000
allownoopupdates
ddl &
include mapped &
exclude objname scott.*_audit &
exclude optype create &
        objtype 'table' &
exclude optype drop &
        objtype 'table' &
exclude objtype 'index' &
        objname SCOTT.*_his &
exclude instr 'constraint' &
---exclude instr 'null' &
exclude instr 'trigger' &
exclude instr 'rename to' &
exclude instr 'grant' &
exclude instr 'revoke' &
exclude instr 'analyze'
DDLOPTIONS MAPSESSIONSCHEMA XUM target ogg
ddloptions report
allowduptargetmap
--INSERTALLRECORDS
----------EMP
getinserts
getupdates
getdeletes
noupdatedeletes
noinsertdeletes
map SCOTT.EMP,target SCOTT.EMP;
updatedeletes
--map SCOTT.EMP,target SCOTT.EMP_HIS,keycols(EMPNO);
getinserts
ignoreupdates
ignoredeletes
map SCOTT.EMP,target SCOTT.EMP_AUDIT,keycols(EMPNO),colmap(
usedefaults,
csn=@token("TKN-CSN"),
optime=@token("TKN-COMMIT-TS"),
optype=@token("TKN-OP-TYPE"),
inserttime=@token("TKN-COMMIT-TS"),
curdate=@DATENOW()
);
ignoreinserts
getupdates
getdeletes
map SCOTT.EMP,target SCOTT.EMP_AUDIT,keycols(EMPNO),colmap(
usedefaults,
csn=@token("TKN-CSN"),
optime=@token("TKN-COMMIT-TS"),
optype=@token("TKN-OP-TYPE"),
curdate=@DATENOW()
),filter(@strfind(@token("TKN-OP-TYPE"),"PK UPDATE") =0);
ignoreinserts
getupdates
getdeletes
map SCOTT.EMP,target SCOTT.EMP_AUDIT,keycols(EMPNO),colmap(
usedefaults,
csn=@token("TKN-CSN"),
optime=@token("TKN-COMMIT-TS"),
optype=@case(@token("TKN-OP-TYPE"),"PK UPDATE","INSERT"),
inserttime=@token("TKN-COMMIT-TS"),
curdate=@DATENOW()
),filter(@strfind(@token("TKN-OP-TYPE"),"PK UPDATE") >0),insertallrecords;
ignoreinserts
getupdates
getdeletes
map SCOTT.EMP,target SCOTT.EMP_AUDIT,keycols(EMPNO),colmap(
EMPNO=before.EMPNO,
csn=@token("TKN-CSN"),
optime=@token("TKN-COMMIT-TS"),
optype=@case(@token("TKN-OP-TYPE"),"PK UPDATE","DELETE"),
curdate=@DATENOW()
),filter(@strfind(@token("TKN-OP-TYPE"),"PK UPDATE") >0);
ignoreinserts
ignoreupdates
getdeletes
noupdatedeletes
INSERTDELETES
map SCOTT.EMP,target SCOTT.EMP_HIS,keycols(EMPNO);

2、开始前先停止源端的抽取进程

--------------停止抽取进程
stop e_sc
查看抽取进程结束位置,查看抽取进程“Log Read Checkpoint”的“Seqno”和“RBA”,查看抽取进程“Target Extract Trails”的“Seqno”和“RBA”
GGSCI (11g) 7> info e_Sc detail

EXTRACT    E_SC      Last Started 2018-02-23 16:22  Status STOPPED
Checkpoint Lag      00:00:00 (updated 00:00:31 ago)
Log Read Checkpoint  Oracle Redo Logs
                    2018-02-26 09:19:12  Seqno 559, RBA 52736
                    SCN 0.1684462 (1684462)

  Target Extract Trails:

  Remote Trail Name                                Seqno        RBA    Max MB

  ./dirdat/sc                                        79      18167        500
--------------停止投递进程
停止投递进程前检查,查看投递进程的“Log Read Checkpoint”的“Seqno”和“RBA”
info D_SC detail
GGSCI (11g) 8> info D_SC detail

EXTRACT    D_SC      Last Started 2018-02-12 14:20  Status RUNNING
Checkpoint Lag      00:00:00 (updated 00:00:06 ago)
Log Read Checkpoint  File ./dirdat/sc000079
                    2018-02-26 09:19:12.000000  RBA 18167

  Target Extract Trails:

  Remote Trail Name                                Seqno        RBA    Max MB

  ./dirdat/sc                                        56      20937        500

--------------如果数值和抽取进程“Target Extract Trails”的“Seqno”和“RBA”一致,才可以停止投递进程
stop D_SC 

--------------停止应用进程
停止应用进程前检查,查看应用进程“Log Read Checkpoint”的“Seqno”和“RBA”

info r_sc detail 
GGSCI (OGG) 26> info r_sc detail

REPLICAT  R_SC      Last Started 2018-02-26 10:08  Status RUNNING
Checkpoint Lag      00:00:00 (updated 00:00:10 ago)
Log Read Checkpoint  File ./dirdat/sc000056
                    First Record  RBA 20937
..................

info r_sc2 detail 

GGSCI (OGG) 27> info r_sc2 detail

REPLICAT  R_SC2    Last Started 2018-02-23 15:55  Status RUNNING
Checkpoint Lag      00:00:00 (updated 00:00:16 ago)
Log Read Checkpoint  File ./dirdat/sc000056
                    2018-02-26 09:19:12.014722  RBA 20937
.................

如果上面两个应用进程“Seqno”和“RBA”数值和前面记录下来的投递进程的“Target Extract Trails”的“Seqno”和“RBA”一致,才可以停止应用进程.
现在看来是一致的了,seq都是sc000056,rba都是20937,停止应用进程。
stop r_sc
stop r_sc2
GGSCI (OGG) 29> info all



Program    Status      Group      Lag at Chkpt  Time Since Chkpt



MANAGER    RUNNING                                          

REPLICAT    STOPPED    R_SC        00:00:00      00:00:01    

REPLICAT    STOPPED    R_SC2      00:00:00      00:00:01

3、清除原来HIS表非DELETE操作数据

现在将最上面的抽取和应用进程的配置替换现在的抽取和应用进程。接下来的操作就是把已同步的三张表在应用端映射的HIS中保存的原来的所有增删改的记录
只保留被delete的记录,其余保留的插入和修改的历史记录全部删除,这就需要写一个脚本来实现了,写脚本使用ogg端ogg_tables_info表
这张表存的是当前ogg环境中所有已同步表对应的信息,此表的表结构和信息:

create table ogg_tables_info 
(
USER_NAME       VARCHAR2(50)  ,
TABLE_NAME      VARCHAR2(50)  ,
OGG_USER_NAME   VARCHAR2(50)  ,
OGG_TABLE_NAME  VARCHAR2(50)  ,
TABLE_COMMENT   VARCHAR2(1000),
TABLE_TYPE      VARCHAR2(50)  ,
PK_COLUMN       VARCHAR2(400) ,
PK_COLUMN_TYPE  VARCHAR2(100) ,
HIS_TABLENAME   VARCHAR2(100) ,
AUDIT_TABLENAME VARCHAR2(100) ,
FLAG_MON        VARCHAR2(1)         
)
insert into ogg_tables_info(user_name,table_name,ogg_user_name,ogg_table_name,pk_column,his_tablename,audit_tablename,flag_mon)
values('SCOTT','DEPT','SCOTT','DEPT','DEPTNO','DEPT_HIS','DEPT_AUDIT',1);
insert into ogg_tables_info(user_name,table_name,ogg_user_name,ogg_table_name,pk_column,his_tablename,audit_tablename,flag_mon)
values('SCOTT','EMP','SCOTT','EMP','EMPNO','EMP_HIS','EMP_AUDIT',1);
insert into ogg_tables_info(user_name以上是关于大数据实时数据同步OGG异构多路映射同步原表&审计表&只存删除数据表实现方案的主要内容,如果未能解决你的问题,请参考以下文章

大数据实时数据同步超级详细的生产环境OGG(GoldenGate)12.2实时异构同步Oracle数据部署方案(上)

大数据实时数据同步超级详细的生产环境OGG(GoldenGate)12.2实时异构同步Oracle数据部署方案(下)

大数据实时数据同步超级详细的生产环境OGG(GoldenGate)12.2实时异构同步Oracle数据部署方案(中)

ElasticSearch实战(四十九)-Flink 大数据实时同步方案

ElasticSearch实战(四十九)-Flink 大数据实时同步方案

客快物流大数据项目(六十一):将消费的kafka数据同步到Kudu中