主体拉链表存储过程

Posted sorliran

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了主体拉链表存储过程相关的知识,希望对你有一定的参考价值。

pro_scd_reg_marpripinfo

CREATE OR REPLACE PROCEDURE pro_scd_reg_marpripinfo IS
  n_log_id PLS_INTEGER;
BEGIN
  /*
  1.变更信息、吊销信息、注销信息
  2.根据REG_MARPRIPINFO中是否有记录判断全量或增量抽取,如无,则全量抽取,如有,则增量抽取
  */
  --REG_MARPRIPINFO主体登记信息
  DECLARE
    n_log_id PLS_INTEGER := pkg_debug.log_start(REG_MARPRIPINFO);
  BEGIN
    pkg_idx.drop_all(ODS_REG_MARPRIPINFO_LOCAL, TRUE);
    EXECUTE IMMEDIATE TRUNCATE TABLE ODS_REG_MARPRIPINFO_LOCAL;
    INSERT /*+append*/
    INTO ods_reg_marpripinfo_local
      SELECT * FROM syn_reg_marpripinfo;
    COMMIT;
    pkg_idx.create_pk(in_v_column_list => ID,TIMESTAMP,
                      in_v_table_name  => ODS_REG_MARPRIPINFO_LOCAL,
                      in_b_force       => TRUE);
    pkg_idx.create_idx(in_v_column_list => ID,APPRDATE,
                       in_v_table_name  => ODS_REG_MARPRIPINFO_LOCAL,
                       in_b_force       => TRUE);
    pkg_debug.log_end(n_log_id);
  EXCEPTION
    WHEN OTHERS THEN
      pkg_debug.log_end(n_log_id);
  END;
  DECLARE
    n_log_id PLS_INTEGER;
  BEGIN
    n_log_id := pkg_debug.log_start(ODS_REG_MARPRIPINFO);
    pkg_idx.drop_all(F_BGXX, TRUE); --变更信息
    EXECUTE IMMEDIATE TRUNCATE TABLE F_BGXX;
    --变更信息
    INSERT /*+append*/
    INTO f_bgxx
      SELECT * FROM v_bgxx;
    COMMIT;
    pkg_idx.create_pk(in_v_column_list => MARPRID,APPRDATE,
                      in_v_table_name  => F_BGXX,
                      in_b_force       => TRUE);
    --吊销信息
    pkg_idx.drop_all(F_DXXX, TRUE); --吊销信息
    EXECUTE IMMEDIATE TRUNCATE TABLE F_DXXX;
    INSERT /*+append*/
    INTO f_dxxx
      SELECT * FROM v_dxxx;
    COMMIT;
    pkg_idx.create_pk(in_v_column_list => MARPRID,APPRDATE,
                      in_v_table_name  => F_DXXX,
                      in_b_force       => TRUE);
    --注销信息
    pkg_idx.drop_all(F_ZXXX, TRUE); --注销信息
    EXECUTE IMMEDIATE TRUNCATE TABLE F_ZXXX;
    INSERT /*+append*/
    INTO f_zxxx
      SELECT * FROM v_zxxx;
    COMMIT;
    pkg_idx.create_pk(in_v_column_list => MARPRID,APPRDATE,
                      in_v_table_name  => F_ZXXX,
                      in_b_force       => TRUE);
    --改制信息
    pkg_idx.drop_all(F_GZXX, TRUE); --改制信息(内资外资互转)
    EXECUTE IMMEDIATE TRUNCATE TABLE F_GZXX;
    INSERT /*+append*/
    INTO f_gzxx
      SELECT * FROM v_gzxx;
    COMMIT;
    pkg_idx.create_pk(in_v_column_list => MARPRID,APPRDATE,
                      in_v_table_name  => F_GZXX,
                      in_b_force       => TRUE);
    --迁入迁出信息
    pkg_idx.drop_all(F_QRQC, TRUE);
    EXECUTE IMMEDIATE TRUNCATE TABLE F_QRQC;
    INSERT /*+append*/
    INTO f_qrqc
      SELECT * FROM v_qrqc;
    COMMIT;
    pkg_idx.create_pk(in_v_column_list => MARPRID,APPRDATE,
                      in_v_table_name  => F_QRQC,
                      in_b_force       => TRUE);
    pkg_debug.log_end(n_log_id);
  EXCEPTION
    WHEN OTHERS THEN
      pkg_debug.log_end(n_log_id);
  END;
  IF NOT pkg_tab.no_data_in(ODS_REG_MARPRIPINFO) THEN
    n_log_id := pkg_debug.log_start(ODS_REG_MARPRIPINFO, 每日增量);
    --更新目标表中非缓慢变化字段
    MERGE INTO ods_reg_marpripinfo dest
    USING ods_reg_marpripinfo_local src
    ON (src.id = dest.id AND to_char(src.apprdate, yyyymmdd) = to_char(dest.apprdate, yyyymmdd))
    WHEN MATCHED THEN
      UPDATE
         SET /*TIMESTAMP     = src.timestamp
             ,*/ namepreapprid = src.namepreapprid,
             entname       = src.entname,
             enttra        = src.enttra,
             grpshform     = src.grpshform,
             oplocdistrict = src.oplocdistrict,
             industryphy   = src.industryphy,
             industryco    = src.industryco,
             lerep         = src.lerep,
             regcap        = src.regcap,
             regcapcur     = src.regcapcur,
             reccap        = src.reccap,
             forreccap     = src.forreccap,
             forregcap     = src.forregcap,
             congro        = src.congro,
             dom           = src.dom,
             tel           = src.tel,
             postalcode    = src.postalcode,
             email         = src.email,
             abuitemco     = src.abuitemco,
             opscope       = src.opscope,
             ptbusscope    = src.ptbusscope,
             enttype       = src.enttype,
             enttypeitem   = src.enttypeitem,
             enttypeminu   = src.enttypeminu,
             opfrom        = src.opfrom,
             opto          = src.opto,
             regno         = src.regno,
             oldregno      = src.oldregno,
             forregno      = src.forregno,
             supervper     = src.supervper,
             superorgid    = src.superorgid,
             estdate       = src.estdate
             --apprdate=src.apprdate
            ,
             perid         = src.perid,
             accopin       = src.accopin,
             remark        = src.remark,
             state         = src.state,
             orgid         = src.orgid,
             jobid         = src.jobid,
             adbusign      = src.adbusign,
             townsign      = src.townsign,
             regtype       = src.regtype,
             priorgid      = src.priorgid,
             superpriorgid = src.superpriorgid,
             approrgid     = src.approrgid,
             enttypepro    = src.enttypepro,
             optype        = src.optype,
             empnum        = src.empnum,
             compform      = src.compform,
             supdistrict   = src.supdistrict,
             venind        = src.venind,
             parnum        = src.parnum,
             exenum        = src.exenum,
             opform        = src.opform,
             insform       = src.insform,
             hypotaxis     = src.hypotaxis,
             forcapindcode = src.forcapindcode,
             midpreindcode = src.midpreindcode,
             protype       = src.protype,
             impdatesign   = src.impdatesign,
             oploc         = src.oploc,
             copynum       = src.copynum,
             enttypegb     = src.enttypegb,
             compformgb    = src.compformgb,
             hotindfocus   = src.hotindfocus,
             parform       = src.parform,
             industryphygb = src.industryphygb,
             industrycogb  = src.industrycogb,
             appperid      = src.appperid,
             uniscid       = src.uniscid
       WHERE src.timestamp > dest.timestamp
         and src.estdate = dest.estdate;
    COMMIT;
    --当目标表中已存在ID相同且核准日期<源表核准日期,将插入新记录
    INSERT /*+append*/
    INTO ods_reg_marpripinfo nologging
      SELECT src.*,
             NULL    AS bgxm,
             NULL    AS dxrq,
             NULL    AS zxrq,
             NULL    AS old_qylx,
             NULL    AS old_djjg,
             sysdate as cqrq
        FROM ods_reg_marpripinfo_local src
       WHERE exists (select 1
                from (SELECT t.*,
                             row_number() over(PARTITION BY t.id ORDER BY timestamp desc) rn
                        FROM ods_reg_marpripinfo t) t
               where t.rn = 1
                 and src.id = t.id
                 and src.estdate <> t.estdate)
          or apprdate > (SELECT MAX(apprdate)
                           FROM ods_reg_marpripinfo dest
                          WHERE dest.id = src.id) log errors INTO
       ods_reg_marpripinfo_err;
    COMMIT;
    --当源表中有新增的企业时,直接插入新数据
    INSERT /*+append*/
    INTO ods_reg_marpripinfo nologging
      SELECT id,
             TIMESTAMP,
             namepreapprid,
             entname,
             enttra,
             grpshform,
             oplocdistrict,
             industryphy,
             industryco,
             lerep,
             regcap,
             regcapcur,
             reccap,
             forreccap,
             forregcap,
             congro,
             dom,
             tel,
             postalcode,
             email,
             abuitemco,
             opscope,
             ptbusscope,
             enttype,
             enttypeitem,
             enttypeminu,
             opfrom,
             opto,
             regno,
             oldregno,
             forregno,
             supervper,
             superorgid,
             estdate,
             apprdate,
             perid,
             accopin,
             remark,
             state,
             orgid,
             jobid,
             adbusign,
             townsign,
             regtype,
             priorgid,
             superpriorgid,
             approrgid,
             enttypepro,
             optype,
             empnum,
             compform,
             supdistrict,
             venind,
             parnum,
             exenum,
             opform,
             insform,
             hypotaxis,
             forcapindcode,
             midpreindcode,
             protype,
             impdatesign,
             oploc,
             copynum,
             enttypegb,
             compformgb,
             hotindfocus,
             parform,
             industryphygb,
             industrycogb,
             appperid,
             uniscid,
             NULL          AS bgxm --变更项目
            ,
             NULL          AS zxrq --注销日期
            ,
             NULL          AS dxrq --吊销日期
            ,
             NULL          AS old_qylx --原企业类型
            ,
             NULL          AS old_djjg --原登记机关
            ,
             sysdate       as cqrq
        FROM ods_reg_marpripinfo_local src
       WHERE NOT EXISTS (SELECT NULL
                FROM ods_reg_marpripinfo dest
               WHERE src.id = dest.id);
    COMMIT;
    --将迁出户不存在于目标表的记录追加到目标表
    INSERT INTO ods_reg_marpripinfo
      (id,
       TIMESTAMP,
       namepreapprid,
       entname,
       enttra,
       grpshform,
       oplocdistrict,
       industryphy,
       industryco,
       lerep,
       regcap,
       regcapcur,
       reccap,
       forreccap,
       forregcap,
       congro,
       dom,
       tel,
       postalcode,
       email,
       abuitemco,
       opscope,
       ptbusscope,
       enttype,
       enttypeitem,
       enttypeminu,
       opfrom,
       opto,
       regno,
       oldregno,
       forregno,
       supervper,
       superorgid,
       estdate,
       apprdate,
       perid,
       accopin,
       remark,
       state,
       orgid,
       jobid,
       adbusign,
       townsign,
       regtype,
       priorgid,
       superpriorgid,
       approrgid,
       enttypepro,
       optype,
       empnum,
       compform,
       supdistrict,
       venind,
       parnum,
       exenum,
       opform,
       insform,
       hypotaxis,
       forcapindcode,
       midpreindcode,
       protype,
       impdatesign,
       oploc,
       copynum,
       enttypegb,
       uniscid,
       appperid,
       compformgb,
       hotindfocus,
       industrycogb,
       industryphygb,
       parform,
       cqrq)
      SELECT id,
             estdate,
             namepreapprid,
             entname,
             enttra,
             grpshform,
             oplocdistrict,
             industryphy,
             industryco,
             lerep,
             regcap,
             regcapcur,
             reccap,
             forreccap,
             forregcap,
             congro,
             dom,
             tel,
             postalcode,
             email,
             abuitemco,
             opscope,
             ptbusscope,
             enttype,
             enttypeitem,
             enttypeminu,
             opfrom,
             opto,
             regno,
             oldregno,
             forregno,
             supervper,
             superorgid,
             estdate,
             apprdate,
             perid,
             accopin,
             remark,
             09 state --对迁出户状态设为09
            ,
             999999 orgid --对迁出户主管机关设为999999
            ,
             jobid,
             adbusign,
             townsign,
             regtype,
             priorgid,
             superpriorgid,
             approrgid,
             enttypepro,
             optype,
             empnum,
             compform,
             supdistrict,
             venind,
             parnum,
             exenum,
             opform,
             insform,
             hypotaxis,
             forcapindcode,
             midpreindcode,
             protype,
             impdatesign,
             oploc,
             copynum,
             enttypegb,
             uniscid,
             NULL AS appperid,
             NULL AS compformgb,
             NULL AS hotindfocus,
             NULL AS industrycogb,
             NULL AS industryphygb,
             NULL AS parform,
             sysdate as cqrq
        FROM ods_reg_marpripinfo_inv inv
       WHERE id NOT IN (SELECT id FROM ods_reg_marpripinfo dest);
    COMMIT;
    --更新目标表的变更项目字段
    MERGE INTO ods_reg_marpripinfo dest
    USING f_bgxx bgxx
    ON (dest.id = bgxx.marprid AND dest.apprdate = bgxx.apprdate)
    WHEN MATCHED THEN
      UPDATE SET bgxm = bgxx.altitem;
    COMMIT;
    --更新目标表的吊销日期字段
    MERGE INTO ods_reg_marpripinfo dest
    USING f_dxxx dxxx
    ON (dest.id = dxxx.marprid AND dest.apprdate = dxxx.apprdate)
    WHEN MATCHED THEN
      UPDATE SET dxrq = dxxx.apprdate;
    --更新目标表的是否注销字段
    MERGE INTO ods_reg_marpripinfo dest
    USING f_zxxx zxxx
    ON (dest.id = zxxx.marprid AND dest.apprdate = zxxx.apprdate)
    WHEN MATCHED THEN
      UPDATE SET zxrq = zxxx.apprdate;
    COMMIT;
    --更新目标表的内转外标志
    MERGE INTO ods_reg_marpripinfo dest
    USING f_gzxx gzxx
    ON (dest.id = gzxx.marprid AND dest.apprdate = gzxx.apprdate)
    WHEN MATCHED THEN
      UPDATE SET old_qylx = gzxx.enttype_old;
    pkg_debug.log_end(n_log_id);
    --更新目标表的原主管机构
    MERGE INTO ods_reg_marpripinfo dest
    USING f_qrqc qrqc
    ON (dest.id = qrqc.marprid AND dest.apprdate = qrqc.apprdate)
    WHEN MATCHED THEN
      UPDATE SET old_djjg = qrqc.moutareregorgid;
  ELSE
    --如果为全量抽取,则TRUNCATE ODS_REG_MARPRIPINFO并重新抽取数据
    n_log_id := pkg_debug.log_start(ODS_REG_MARPRIPINFO, 首次全量);
    pkg_idx.drop_all(ODS_REG_MARPRIPINFO);
    EXECUTE IMMEDIATE TRUNCATE TABLE ODS_REG_MARPRIPINFO;
    INSERT /*+append*/
    INTO ods_reg_marpripinfo nologging
      (id,
       TIMESTAMP,
       namepreapprid,
       entname,
       enttra,
       grpshform,
       oplocdistrict,
       industryphy,
       industryco,
       lerep,
       regcap,
       regcapcur,
       reccap,
       forreccap,
       forregcap,
       congro,
       dom,
       tel,
       postalcode,
       email,
       abuitemco,
       opscope,
       ptbusscope,
       enttype,
       enttypeitem,
       enttypeminu,
       opfrom,
       opto,
       regno,
       oldregno,
       forregno,
       supervper,
       superorgid,
       estdate,
       apprdate,
       perid,
       accopin,
       remark,
       state,
       orgid,
       jobid,
       adbusign,
       townsign,
       regtype,
       priorgid,
       superpriorgid,
       approrgid,
       enttypepro,
       optype,
       empnum,
       compform,
       supdistrict,
       venind,
       parnum,
       exenum,
       opform,
       insform,
       hypotaxis,
       forcapindcode,
       midpreindcode,
       protype,
       impdatesign,
       oploc,
       copynum,
       enttypegb,
       compformgb,
       hotindfocus,
       parform,
       industryphygb,
       industrycogb,
       appperid,
       uniscid,
       bgxm,
       dxrq,
       zxrq,
       cqrq)
      SELECT id,
             estdate       TIMESTAMP,
             namepreapprid,
             entname,
             enttra,
             grpshform,
             oplocdistrict,
             industryphy,
             industryco,
             lerep,
             regcap,
             regcapcur,
             reccap,
             forreccap,
             forregcap,
             congro,
             dom,
             tel,
             postalcode,
             email,
             abuitemco,
             opscope,
             ptbusscope,
             enttype,
             enttypeitem,
             enttypeminu,
             opfrom,
             opto,
             regno,
             oldregno,
             forregno,
             supervper,
             superorgid,
             estdate,
             apprdate,
             perid,
             accopin,
             remark,
             state,
             orgid,
             jobid,
             adbusign,
             townsign,
             regtype,
             priorgid,
             superpriorgid,
             approrgid,
             enttypepro,
             optype,
             empnum,
             compform,
             supdistrict,
             venind,
             parnum,
             exenum,
             opform,
             insform,
             hypotaxis,
             forcapindcode,
             midpreindcode,
             protype,
             impdatesign,
             oploc,
             copynum,
             enttypegb,
             compformgb,
             hotindfocus,
             parform,
             industryphygb,
             industrycogb,
             appperid,
             uniscid,
             NULL          AS bgxm,
             NULL          AS sfdx,
             NULL          AS sfzx,
             sysdate       as cqrq
        FROM ods_reg_marpripinfo_local;
    COMMIT;
    pkg_idx.create_pk(in_v_column_list => ID,APPRDATE,
                      in_v_table_name  => ODS_REG_MARPRIPINFO_HZRQ);
    pkg_idx.create_idx(in_v_column_list => ID,TIMESTAMP,
                       in_v_table_name  => ODS_REG_MARPRIPINFO_HZRQ);
    pro_scd_reg_marpripinfo;
    pkg_debug.log_end(n_log_id);
  END IF;
EXCEPTION
  WHEN OTHERS THEN
    pkg_debug.log_end(n_log_id);
END;

 

以上是关于主体拉链表存储过程的主要内容,如果未能解决你的问题,请参考以下文章

大数据Hive3.x数仓开发数仓中数据发生变化如何实现数据存储--拉链表详解

数仓-拉链表的详细实现过程

分区拉链表

数据仓库:拉链表详解

拉链表

如何在Impala中实现拉链表