基于“Doris”的type2拉链表的Mysql实现

Posted ShenLiang2025

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了基于“Doris”的type2拉链表的Mysql实现相关的知识,希望对你有一定的参考价值。

基于“Doris”的type2拉链表的mysql实现

需求说明

基于Doris实现Type2、拉链表。主要对上游系统里的面积字段进行监控,如果发现变化则跟踪记录到维度表里。

解决方案

type2相关概念见如下链接:

SCD缓慢变化维拉链表

这里特别需要注意的是:

1、因为Doris不支持多表关联的update和delete且在type2实现时需要保证错位的endtime和starttime一致,因此直接在Doris里实现不现实。

2、该方法实际的type2实现是在mysql里,通过同步的方式再回到Doris内。

实施步骤

编写Type2存储过程

#Step1 在Mysql里编写Type2的存储过程

CREATE PROCEDURE `type2pro`()
BEGIN
  DECLARE currtime datetime default now();
	
	DROP TABLE IF EXISTS tmp_stationdate;
	CREATE TABLE `tmp_stationdate`
	(
  `src` varchar(30),
  `stationid` varchar(100) COLLATE utf8_bin DEFAULT NULL,
  `stationname` varchar(100) COLLATE utf8_bin DEFAULT NULL,
  `heatingarea` decimal(12,2) DEFAULT NULL,
  `normalheatingarea` decimal(12,2) DEFAULT NULL,
  `extraheatingarea` decimal(12,2) DEFAULT NULL,
  `constructionarea` decimal(12,2) DEFAULT NULL,
  `chargearea` decimal(12,2) DEFAULT NULL,
  `ispush` varchar(10) COLLATE utf8_bin DEFAULT NULL,
  `iscalculate` varchar(10) COLLATE utf8_bin DEFAULT NULL,
  `participationcalculation` varchar(10) COLLATE utf8_bin DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin;

 -- # 插入临时表,保存之前最新的记录
 INSERT INTO tmp_stationdate
 			SELECT 'hefei',C.stationid,C.stationname,C.heatingarea,C.normalheatingarea,C.extraheatingarea,C.constructionarea,C.chargearea, 
			C.ispush,C.iscalculate,C.participationcalculation
			FROM dim_station C 
			LEFT JOIN g_station B
			ON B.uniqueid = C.stationid AND B.heatingarea = C.heatingarea AND B.normalheatingarea = C.normalheatingarea
			WHERE  (B.heatingarea IS NULL OR B.normalheatingarea IS NULL) AND C.iscurrent = 1;
 
	
    -- # 更新之前的维度变换的数据,即iscurrent=1的endtime更新为当前修改时间
	UPDATE dim_station C 
	LEFT JOIN g_station B
	ON B.uniqueid = C.stationid AND B.heatingarea = C.heatingarea AND B.normalheatingarea = C.normalheatingarea
	SET C.endtime = currtime,C.iscurrent = 0
	WHERE  (B.heatingarea IS NULL OR B.normalheatingarea IS NULL)  AND C.iscurrent = 1;

	
	-- # 插入维度变换的数据,并置最新的iscurrent为1
	INSERT INTO dim_station(
	stationid , stationname,starttime,endtime,iscurrent,heatingarea,normalheatingarea,extraheatingarea,constructionarea,chargearea,ispush,iscalculate,participationcalculation
	)
	SELECT E.uniqueid AS stationid,D.stationname,currtime starttime,STR_TO_DATE('9999-12-31 23:59:59','%Y-%m-%d %H:%i:%s') endtime,1 iscurrent,E.heatingarea,E.normalheatingarea,E.extraheatingarea,E.constructionarea,E.chargearea,E.ispush,E.iscalculate,E.participationcalculation
	FROM tmp_stationdate D
	JOIN g_station E
	ON D.stationid = E.uniqueid;
	
	-- # 插入新增的维度数据并置iscurrent为1
	INSERT INTO dim_station(
	stationid , stationname,starttime,endtime,iscurrent,heatingarea,normalheatingarea,extraheatingarea,constructionarea,chargearea,ispush,iscalculate,participationcalculation
	)
	SELECT A.uniqueid AS stationid,A.name stationname,currtime starttime,STR_TO_DATE('9999-12-31 23:59:59','%Y-%m-%d %H:%i:%s') endtime,1 iscurrent,A.heatingarea,A.normalheatingarea,A.extraheatingarea,A.constructionarea,A.chargearea,A.ispush,A.iscalculate,A.participationcalculation 
	FROM g_station A
	LEFT JOIN dim_station B
	ON A.uniqueid = B.stationid
	WHERE B.stationid IS NULL;
END

编写调用Shell脚本

#Step2 定义Type2调用及数据同步的Shell

#!/bin/bash
port="9030"
username="root"
passwd="Mysql#2023" 
dbname="businessdb"
hostname="192.168.2.50"
labelval=`date  "+%Y%m%d-%H%M%S"`


# 1 Doris数据入本地文件
mysqldump -h192.168.2.50 -P9030 -uroot -p888888 --no-create-info --no-tablespaces --databases dorisdb --tables station > '/root/workspace/station.sql'
mysqldump -h192.168.2.50 -P9030 -uroot -p888888 --no-create-info --no-tablespaces --databases dorisdb --tables dim_station > '/root/workspace/dim_station.sql'

# 2 Mysql清空原有表数据
mysql -uroot -p$passwd -D$dbname -e 'TRUNCATE TABLE station;'
mysql -uroot -p$passwd -D$dbname -e 'TRUNCATE TABLE dim_station;'

# 3 Mysql本地文件入库
mysql -uroot -p$passwd -D$dbname < '/root/workspace/station.sql'
mysql -uroot -p$passwd -D$dbname < '/root/workspace/dim_station.sql'

# 4 Mysql执行存储过程生成Type2
mysql -uroot -p$passwd -D$dbname -e 'call type2pro()'

# 5 删除已经存在的本地文件
rm -rf /var/lib/mysql-files/dim_station.txt

# 6 Mysql导出本地文件,默认字段分割符是制表符"\\t"
mysql -uroot -p'Runa#2020' -D$dbname -e "SELECT * FROM  dim_station INTO OUTFILE '/var/lib/mysql-files/dim_station.txt'";

# 7 清空Doris库表数据(修改dim_station表结构,不再进行表清空操作)
mysql -h192.168.2.50 -P9030 -uroot -p888888 -Ddorisdb -e "TRUNCATE TABLE dim_station;"

# 8 load到Doris库里,这里的%09代表制表符"\\t"
curl --location-trusted -u root:888888 -T /var/lib/mysql-files/dim_station.txt http://192.168.2.50:8030/api/zaozhuang/dim_station/_load?label=dim_station_$labelval&column_separator=%09

编辑Crontab调度

#Step3 Crontab里定义调度

crontab -e新增调度,这里是每隔2个小时执行一次。

0 */2 * * * /root/type2 >/root/type2.log2>&1

以上是关于基于“Doris”的type2拉链表的Mysql实现的主要内容,如果未能解决你的问题,请参考以下文章

hive中拉链表的设计

Doris 基于 Hive 表的全局字典设计与实现

Doris和Mysql的批同步测试

数据仓库之拉链表设计

哔哩哔哩基于拉链表的全量表极限存储优化方案

[数据结构] 哈希表 (开放寻址法+拉链法)