大数据实时数据同步OGG异构多路映射同步原表&审计表&只存删除数据表实现方案
Posted 赵延东的一亩三分地
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了大数据实时数据同步OGG异构多路映射同步原表&审计表&只存删除数据表实现方案相关的知识,希望对你有一定的参考价值。
文章目录
前言
这里是后续!!!前一章写的太多,编辑器卡死了,只能再写一篇文章补充啦!!!!
前一篇文章地址:【大数据实时数据同步】OGG多路映射同步原表&审计表&只存删除数据表实现方案(一)
博主所在单位目前使用Oracle GoldenGate将各个业务生产库汇聚到一起做数仓实时ODS平台,源端库可能涉及Oracle、mysql、达梦、Guassdb库。为了做增量etl以及记录每条数据的变更历史:
- 我们采用异构同步,即源端同步过来的表在ODS新增了一个etltime字段,用来记录当前数据变更时间。
- 为了记录数据的事务变更历史记录,我们将数据的变更记录映射同步到一张tab_name_audit表中。
- 为了防止源端业务库误删数据,我们将被删除的数据映射同步到一张tab_name_his表中。
- 原表映射到ods后还是正常的映射同步dml操作。
至于以上方式是怎么实现的,我这里给大家在本地电脑做一次模拟。至于GoldenGate如何安装,请看博主其他相关文章,这里就不赘述了!!!
十一、将SCOTT下所有已同步的HIS表逻辑同步配置改为DEL表操作
1、首先来看一下抽取进程和应用进程我们要修改成什么样的配置
--------------抽取进程:
GGSCI (11g) 4> edit params e_sc
extract e_sc
userid ogg,password ogg
setenv(NLS_LANG=AMERICAN_AMERICA.AL32UTF8)
setenv(ORACLE_SID="orcl")
reportcount every 10 minutes,rate
numfiles 5000
discardfile ./dirrpt/e_sc.dsc,append,megabytes 1000
warnlongtrans 2h,checkinterval 3m
exttrail ./dirdat/sc
threadoptions maxcommitpropagationdelay 60000
dboptions allowunusedcolumn
tranlogoptions archivedlogonly
tranlogoptions altarchivelogdest primary /u01/archivelog
TRANLOGOPTIONS ALTARCHIVEDLOGFORMAT %t_%s_%r.dbf
dynamicresolution
ddl include mapped
ddloptions addtrandata,report
notcpsourcetimer
nocompressupdates
--fetchoptions nousesnapshot
fetchoptions USEROWID
--GETUPDATEBEFORES
NOCOMPRESSDELETES
----------scott.EMP
table SCOTT.EMP,tokens(
TKN-CSN = @GETENV("TRANSACTION", "CSN"),
TKN-COMMIT-TS = @GETENV ("GGHEADER", "COMMITTIMESTAMP"),
TKN-OP-TYPE = @GETENV ("GGHEADER", "OPTYPE")
);
----------SCOTT.DEPT
table SCOTT.DEPT,tokens(
TKN-CSN = @GETENV("TRANSACTION", "CSN"),
TKN-COMMIT-TS = @GETENV ("GGHEADER", "COMMITTIMESTAMP"),
TKN-OP-TYPE = @GETENV ("GGHEADER", "OPTYPE")
);
----------SCOTT.DEL
table SCOTT.DEL,tokens(
TKN-CSN = @GETENV("TRANSACTION", "CSN"),
TKN-COMMIT-TS = @GETENV ("GGHEADER", "COMMITTIMESTAMP"),
TKN-OP-TYPE = @GETENV ("GGHEADER", "OPTYPE")
);
----------SCOTT.nopk_tab
table SCOTT.NOPK_TAB,keycols(EMPNO,ENAME),tokens(
TKN-CSN = @GETENV("TRANSACTION", "CSN"),
TKN-COMMIT-TS = @GETENV ("GGHEADER", "COMMITTIMESTAMP"),
TKN-OP-TYPE = @GETENV ("GGHEADER", "OPTYPE")
);
----------------------应用进程配置
----R_SC2进程
GGSCI (OGG) 25> edit params r_sc2
replicat r_sc2
setenv(NLS_LANG=AMERICAN_AMERICA.AL32UTF8)
userid ogg,password ogg
reportcount every 30 minutes,rate
reperror default,abend
--reperror default,DISCARD
numfiles 50000
checkpointsecs 40
assumetargetdefs
discardfile ./dirrpt/r_sc2.dsc,append,megabytes 1000
allownoopupdates
ddl &
include mapped &
exclude objname scott.*_audit &
exclude optype create &
objtype 'table' &
exclude optype drop &
objtype 'table' &
exclude objtype 'index' &
objname SCOTT.*_his &
exclude instr 'constraint' &
---exclude instr 'null' &
exclude instr 'trigger' &
exclude instr 'rename to' &
exclude instr 'grant' &
exclude instr 'revoke' &
exclude instr 'analyze'
DDLOPTIONS MAPSESSIONSCHEMA XUM target ogg
ddloptions report
allowduptargetmap
----------DEPT
getinserts
getupdates
getdeletes
noupdatedeletes
NOINSERTDELETES
map SCOTT.DEPT,target SCOTT.DEPT;
updatedeletes
getinserts
ignoreupdates
ignoredeletes
map SCOTT.DEPT,target SCOTT.DEPT_AUDIT,keycols(DEPTNO),colmap(
usedefaults,
csn=@token("TKN-CSN"),
optime=@token("TKN-COMMIT-TS"),
optype=@token("TKN-OP-TYPE"),
inserttime=@token("TKN-COMMIT-TS"),
curdate=@DATENOW()
);
ignoreinserts
getupdates
getdeletes
map SCOTT.DEPT,target SCOTT.DEPT_AUDIT,keycols(DEPTNO),colmap(
usedefaults,
csn=@token("TKN-CSN"),
optime=@token("TKN-COMMIT-TS"),
optype=@token("TKN-OP-TYPE"),
curdate=@DATENOW()
),filter(@strfind(@token("TKN-OP-TYPE"),"PK UPDATE") =0);
ignoreinserts
getupdates
getdeletes
map SCOTT.DEPT,target SCOTT.DEPT_AUDIT,keycols(DEPTNO),colmap(
usedefaults,
csn=@token("TKN-CSN"),
optime=@token("TKN-COMMIT-TS"),
optype=@case(@token("TKN-OP-TYPE"),"PK UPDATE","INSERT"),
inserttime=@token("TKN-COMMIT-TS"),
curdate=@DATENOW()
),filter(@strfind(@token("TKN-OP-TYPE"),"PK UPDATE") >0),insertallrecords;
ignoreinserts
getupdates
getdeletes
map SCOTT.DEPT,target SCOTT.DEPT_AUDIT,keycols(DEPTNO),colmap(
DEPTNO=before.DEPTNO,
csn=@token("TKN-CSN"),
optime=@token("TKN-COMMIT-TS"),
optype=@case(@token("TKN-OP-TYPE"),"PK UPDATE","DELETE"),
curdate=@DATENOW()
),filter(@strfind(@token("TKN-OP-TYPE"),"PK UPDATE") >0);
ignoreinserts
ignoreupdates
getdeletes
noupdatedeletes
INSERTDELETES
MAP SCOTT.DEPT, TARGET SCOTT.DEPT_DEL,keycols(DEPTNO),colmap(
usedefaults
BEFORE_AFTER=@getenv("GGHEADER", "BEFOREAFTERINDICATOR"),
DEL_DATE=@getenv ("GGHEADER", "COMMITTIMESTAMP"),
OP_FLAG=@getenv ("GGHEADER", "OPTYPE")
);
map SCOTT.DEPT,target SCOTT.DEPT_HIS,keycols(DEPTNO);
----------NOPK_TAB
getinserts
getupdates
getdeletes
noupdatedeletes
NOINSERTDELETES
map SCOTT.NOPK_TAB,target SCOTT.NOPK_TAB,keycols(EMPNO,ENAME);
updatedeletes
getinserts
getupdates
getdeletes
map SCOTT.NOPK_TAB,target SCOTT.NOPK_TAB_AUDIT,keycols(EMPNO,ENAME),colmap(
EMPNO=@if(@strfind(@token("tkn-op-type"),"PK UPDATE")>0,before.EMPNO,EMPNO),
ENAME=@if(@strfind(@token("tkn-op-type"),"PK UPDATE")>0,before.ENAME,ENAME)
csn=@token("tkn-csn"),
optime=@token("tkn-commit-ts"),
optype=@if(@strfind(@token("tkn-op-type"),"PK UPDATE")>0,"DELETE",@token("tkn-op-type")),
inserttime=@eval(@strfind(@token("tkn-op-type"),"INSERT")>0,@token("tkn-commit-ts")),
curdate=@DATENOW()
);
ignoreinserts
getupdates
ignoredeletes
map SCOTT.NOPK_TAB,target SCOTT.NOPK_TAB_AUDIT,keycols(EMPNO,ENAME),colmap(
usedefaults,
csn=@token("tkn-csn"),
optime=@token("tkn-commit-ts"),
optype=@case(@token("tkn-op-type"),"PK UPDATE","INSERT"),
inserttime=@token("tkn-commit-ts"),
curdate=@DATENOW()
),filter(@strfind(@token("tkn-op-type"),"PK UPDATE") >0),insertallrecords;
ignoreinserts
ignoreupdates
getdeletes
noupdatedeletes
INSERTDELETES
MAP SCOTT.NOPK_TAB, TARGET SCOTT.NOPK_TAB_DEL,keycols(EMPNO,ENAME),colmap(
usedefaults,
BEFORE_AFTER=@getenv("GGHEADER", "BEFOREAFTERINDICATOR"),
DEL_DATE=@getenv ("GGHEADER", "COMMITTIMESTAMP"),
OP_FLAG=@getenv ("GGHEADER", "OPTYPE")
);
map SCOTT.NOPK_TAB,target SCOTT.NOPK_TAB_HIS,keycols(EMPNO,ENAME);
----R_SC进程:
GGSCI (OGG) 24> edit params r_sc
replicat r_sc
setenv(NLS_LANG=AMERICAN_AMERICA.AL32UTF8)
userid ogg,password ogg
reportcount every 30 minutes,rate
reperror default,abend
numfiles 50000
checkpointsecs 40
assumetargetdefs
discardfile ./dirrpt/r_sc.dsc,append,megabytes 1000
allownoopupdates
ddl &
include mapped &
exclude objname scott.*_audit &
exclude optype create &
objtype 'table' &
exclude optype drop &
objtype 'table' &
exclude objtype 'index' &
objname SCOTT.*_his &
exclude instr 'constraint' &
---exclude instr 'null' &
exclude instr 'trigger' &
exclude instr 'rename to' &
exclude instr 'grant' &
exclude instr 'revoke' &
exclude instr 'analyze'
DDLOPTIONS MAPSESSIONSCHEMA XUM target ogg
ddloptions report
allowduptargetmap
--INSERTALLRECORDS
----------EMP
getinserts
getupdates
getdeletes
noupdatedeletes
noinsertdeletes
map SCOTT.EMP,target SCOTT.EMP;
updatedeletes
--map SCOTT.EMP,target SCOTT.EMP_HIS,keycols(EMPNO);
getinserts
ignoreupdates
ignoredeletes
map SCOTT.EMP,target SCOTT.EMP_AUDIT,keycols(EMPNO),colmap(
usedefaults,
csn=@token("TKN-CSN"),
optime=@token("TKN-COMMIT-TS"),
optype=@token("TKN-OP-TYPE"),
inserttime=@token("TKN-COMMIT-TS"),
curdate=@DATENOW()
);
ignoreinserts
getupdates
getdeletes
map SCOTT.EMP,target SCOTT.EMP_AUDIT,keycols(EMPNO),colmap(
usedefaults,
csn=@token("TKN-CSN"),
optime=@token("TKN-COMMIT-TS"),
optype=@token("TKN-OP-TYPE"),
curdate=@DATENOW()
),filter(@strfind(@token("TKN-OP-TYPE"),"PK UPDATE") =0);
ignoreinserts
getupdates
getdeletes
map SCOTT.EMP,target SCOTT.EMP_AUDIT,keycols(EMPNO),colmap(
usedefaults,
csn=@token("TKN-CSN"),
optime=@token("TKN-COMMIT-TS"),
optype=@case(@token("TKN-OP-TYPE"),"PK UPDATE","INSERT"),
inserttime=@token("TKN-COMMIT-TS"),
curdate=@DATENOW()
),filter(@strfind(@token("TKN-OP-TYPE"),"PK UPDATE") >0),insertallrecords;
ignoreinserts
getupdates
getdeletes
map SCOTT.EMP,target SCOTT.EMP_AUDIT,keycols(EMPNO),colmap(
EMPNO=before.EMPNO,
csn=@token("TKN-CSN"),
optime=@token("TKN-COMMIT-TS"),
optype=@case(@token("TKN-OP-TYPE"),"PK UPDATE","DELETE"),
curdate=@DATENOW()
),filter(@strfind(@token("TKN-OP-TYPE"),"PK UPDATE") >0);
ignoreinserts
ignoreupdates
getdeletes
noupdatedeletes
INSERTDELETES
map SCOTT.EMP,target SCOTT.EMP_HIS,keycols(EMPNO);
2、开始前先停止源端的抽取进程
--------------停止抽取进程
stop e_sc
查看抽取进程结束位置,查看抽取进程“Log Read Checkpoint”的“Seqno”和“RBA”,查看抽取进程“Target Extract Trails”的“Seqno”和“RBA”
GGSCI (11g) 7> info e_Sc detail
EXTRACT E_SC Last Started 2018-02-23 16:22 Status STOPPED
Checkpoint Lag 00:00:00 (updated 00:00:31 ago)
Log Read Checkpoint Oracle Redo Logs
2018-02-26 09:19:12 Seqno 559, RBA 52736
SCN 0.1684462 (1684462)
Target Extract Trails:
Remote Trail Name Seqno RBA Max MB
./dirdat/sc 79 18167 500
--------------停止投递进程
停止投递进程前检查,查看投递进程的“Log Read Checkpoint”的“Seqno”和“RBA”
info D_SC detail
GGSCI (11g) 8> info D_SC detail
EXTRACT D_SC Last Started 2018-02-12 14:20 Status RUNNING
Checkpoint Lag 00:00:00 (updated 00:00:06 ago)
Log Read Checkpoint File ./dirdat/sc000079
2018-02-26 09:19:12.000000 RBA 18167
Target Extract Trails:
Remote Trail Name Seqno RBA Max MB
./dirdat/sc 56 20937 500
--------------如果数值和抽取进程“Target Extract Trails”的“Seqno”和“RBA”一致,才可以停止投递进程
stop D_SC
--------------停止应用进程
停止应用进程前检查,查看应用进程“Log Read Checkpoint”的“Seqno”和“RBA”
info r_sc detail
GGSCI (OGG) 26> info r_sc detail
REPLICAT R_SC Last Started 2018-02-26 10:08 Status RUNNING
Checkpoint Lag 00:00:00 (updated 00:00:10 ago)
Log Read Checkpoint File ./dirdat/sc000056
First Record RBA 20937
..................
info r_sc2 detail
GGSCI (OGG) 27> info r_sc2 detail
REPLICAT R_SC2 Last Started 2018-02-23 15:55 Status RUNNING
Checkpoint Lag 00:00:00 (updated 00:00:16 ago)
Log Read Checkpoint File ./dirdat/sc000056
2018-02-26 09:19:12.014722 RBA 20937
.................
如果上面两个应用进程“Seqno”和“RBA”数值和前面记录下来的投递进程的“Target Extract Trails”的“Seqno”和“RBA”一致,才可以停止应用进程.
现在看来是一致的了,seq都是sc000056,rba都是20937,停止应用进程。
stop r_sc
stop r_sc2
GGSCI (OGG) 29> info all
Program Status Group Lag at Chkpt Time Since Chkpt
MANAGER RUNNING
REPLICAT STOPPED R_SC 00:00:00 00:00:01
REPLICAT STOPPED R_SC2 00:00:00 00:00:01
3、清除原来HIS表非DELETE操作数据
现在将最上面的抽取和应用进程的配置替换现在的抽取和应用进程。接下来的操作就是把已同步的三张表在应用端映射的HIS中保存的原来的所有增删改的记录
只保留被delete的记录,其余保留的插入和修改的历史记录全部删除,这就需要写一个脚本来实现了,写脚本使用ogg端ogg_tables_info表
这张表存的是当前ogg环境中所有已同步表对应的信息,此表的表结构和信息:
create table ogg_tables_info
(
USER_NAME VARCHAR2(50) ,
TABLE_NAME VARCHAR2(50) ,
OGG_USER_NAME VARCHAR2(50) ,
OGG_TABLE_NAME VARCHAR2(50) ,
TABLE_COMMENT VARCHAR2(1000),
TABLE_TYPE VARCHAR2(50) ,
PK_COLUMN VARCHAR2(400) ,
PK_COLUMN_TYPE VARCHAR2(100) ,
HIS_TABLENAME VARCHAR2(100) ,
AUDIT_TABLENAME VARCHAR2(100) ,
FLAG_MON VARCHAR2(1)
)
insert into ogg_tables_info(user_name,table_name,ogg_user_name,ogg_table_name,pk_column,his_tablename,audit_tablename,flag_mon)
values('SCOTT','DEPT','SCOTT','DEPT','DEPTNO','DEPT_HIS','DEPT_AUDIT',1);
insert into ogg_tables_info(user_name,table_name,ogg_user_name,ogg_table_name,pk_column,his_tablename,audit_tablename,flag_mon)
values('SCOTT','EMP','SCOTT','EMP','EMPNO','EMP_HIS','EMP_AUDIT',1);
insert into ogg_tables_info(user_name以上是关于大数据实时数据同步OGG异构多路映射同步原表&审计表&只存删除数据表实现方案的主要内容,如果未能解决你的问题,请参考以下文章
大数据实时数据同步超级详细的生产环境OGG(GoldenGate)12.2实时异构同步Oracle数据部署方案(上)
大数据实时数据同步超级详细的生产环境OGG(GoldenGate)12.2实时异构同步Oracle数据部署方案(下)
大数据实时数据同步超级详细的生产环境OGG(GoldenGate)12.2实时异构同步Oracle数据部署方案(中)
ElasticSearch实战(四十九)-Flink 大数据实时同步方案