Mysql数据 抽取(extract)转换(transform)加载(load)实战
Posted 清_澈
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Mysql数据 抽取(extract)转换(transform)加载(load)实战相关的知识,希望对你有一定的参考价值。
1、场景一:当基础数据表的数据量比较大(300万以上),需要关联多个表(数据量在100万左右)时,多表left join容易超时,需要分步汇总。
insert into lkg.lkg_source_population (rowguid,
Name,
Sex,
BirthDay,
Nation,
BirthPlace,
Marriage,
PartyOrgName,
Education,
Religion,
ContactsPhone,
NowDistrict,
NowAddress,
GridName,
HOUSEHOLD_NO,
Relation
)
select a.IDCard,
a.Name,
case when a.Sex='DBF2523E-9D19-465D-980A-280F2AA501C6' then '02'
when a.Sex='1F8A8A4C-2013-4988-9AB8-29CD3FB4F11A' then '01' end as Sex ,
a.BirthDate,
case when a.Nation='3434A21E-E783-413E-BAD6-3777DB12D8FA' then '01' end as Nation ,
a.NativePlace as BirthPlace,
a.MaritalStatus as Marriage,
a.PoliticsStatus as PartyOrgName,
a.Education as Education,
a.ReligiousFaith as Religion,
a.Phone as ContactsPhone,
b.DomicileCode as NowDistrict,
b.DomicileDetail as NowAddress,
c.Name as GridName,
e.HuHao as HOUSEHOLD_NO,
d.Yhzgx as Relation
from test.WG_People_Resident a
left join test.WG_People_Household b on a.Id = b.ResidentID
left join test.organize c on c.id =b.AreaID
left join test.wg_hjyrkglb d on d.ResidentID = a.id
left join test.WG_RegisterFamily e on e.ID = d.HhID
数据量大时上边的sql容易造成超时中断,分步汇总步骤如下:
创建临时表,先汇总两个表的数据:
CREATE TABLE temp_people_resident
select t1.IDCard,
t1.HouseId as Bcellid,
t1.Name,
t1.Sex,
t1.BirthDate,
t1.Nation,
t1.NativePlace,
t1.MaritalStatus,
t1.PoliticsStatus,
t1.Education,
t1.ReligiousFaith,
t1.Phone,
t2.CreateDate,
t2.DomicileCode,
t2.DomicileDetail,
t2.ResidentID,
t2.AreaID,
null as HOUSE_ID,
null as BUILDING_ID,
null as HhID,
null as GRID_CODE,
null as GRID_NAME,
null as HuHao,
null as Yhzgx
from test.wg_people_resident t1
left JOIN
(select ResidentID,DomicileCode,DomicileDetail,AreaID,CreateDate from test.wg_people_household) t2 on t1.ID = t2.ResidentID;
去重(按身份证号保留最新的一条)操作:
DELETE
FROM
test.temp_people_resident t
WHERE
IDCard NOT IN (
SELECT
t.IDCard
FROM
(
SELECT
IDCard
FROM
test.temp_people_resident t1
WHERE
t1.CreateDate = ( SELECT max( t2.CreateDate ) FROM test.temp_people_resident t2 WHERE t1.IDCard = t2.IDCard )
) t
)
或者按照id去重:
delete from base_household_population_final where id not in (
select t.max_id from
(select max(id) as max_id from base_household_population_final group by ID_CARD,GRID_CODE) as t
);
逐个关联更新其他表:
update test.temp_people_resident a left join test.organize c on c.id =a.AreaID set a.GRID_CODE = c.zoningCode, a.GRID_NAME = c.Name;
update test.temp_people_resident a left join test.wg_hjyrkglb d on d.ResidentID = a.ResidentID set a.HhID = d.HhID, a.Yhzgx = d.Yhzgx;
update test.temp_people_resident a left join test.WG_RegisterFamily d on d.ID = a.HhID set a.HuHao = d.HuHao;
update test.temp_people_resident a left join test.wg_bcell d on d.Bcellid = a.Bcellid set a.HOUSE_ID = d.HOUSE_ID, a.BUILDING_ID = d.BUILDING_ID;
至此汇总完毕。
2、场景二:通过存储过程和数据库定时任务自动执行数据倒库的工作
新建存储过程:
利用mysql的ON DUPLICATE KEY UPDATE操作,将数据灌倒指定表
CREATE DEFINER=`root`@`%` PROCEDURE `event_unicom_to_dsj`()
BEGIN
insert into zbzf_data_bak.events_business (
guid,
event_id,
event_code,
event_name,
event_class_code,
event_reason,
event_description,
event_time,
event_alarm_time,
event_submit_time,
event_region_code,
event_detail_address,
event_longitude,
event_latitude,
event_level_code,
event_status_code,
event_has_attachment,
event_submiter,
event_submit_phone,
event_submit_orgcode,
event_submit_orgname,
event_submit_source,
create_time )
select
concat('zblt',substring(t1.EVENT_ID,4,20),UNIX_TIMESTAMP(t1.REPORT_TIME) ) as guid,
(case when LENGTH(t1.EVENT_ID)>36 then substring(t1.EVENT_ID,1,36)
else t1.EVENT_ID end) as event_id,
concat(left(t1.GRID_CODE,8),getEventCode(t1.EVENT_CATEGORY),replace(CURDATE(),'-',''),t1.EVENT_CATEGORY,right(UNIX_TIMESTAMP(t1.REPORT_TIME),6),1 ) as event_code,
t1.EVENT_NAME,
getEventCode(t1.EVENT_CATEGORY) as event_class_code,
t1.EVENT_DESCRIPTION as event_reason,
t1.EVENT_DESCRIPTION as event_description,
t1.REPORT_TIME as event_time,
t1.HAPPEN_DATE as event_alarm_time,
t1.COMPLETE_TIME as event_submit_time,
left(t1.GRID_CODE,8) as event_region_code,
t1.HAPPEN_PLACE as event_detail_address,
t1.LNG as event_longitude,
t1.LAT as event_latitude,
t1.COMPLETE_LEVEL as event_level_code,
case t1.SUCCESSFUL_OR_NO when 'Y' then '4'
when 'N' then '1'
else '2' end as event_status_code,
0 as event_has_attachment,
substring(t1.REAPORTPERSON,0,20) as event_submiter,
t2.PHONENUMBER as event_submit_phone,
t1.GRID_CODE as event_submit_orgcode,
t1.GRID_NAME as event_submit_orgname,
1100 as event_submit_source,
t1.UPDATE_DATE as create_time
from sd_unicom_node_bak.BASE_DISPUTE_PROCESS t1
LEFT JOIN sd_unicom_node_bak.BASE_GRID_USER t2 on t1.REAPORTPERSON = t2.account
where t1.UPDATE_DATE > DATE_SUB(NOW(),INTERVAL 1 DAY)
ON DUPLICATE KEY UPDATE
EVENT_ID=VALUES(EVENT_ID),
event_code=VALUES(event_code),
event_name=VALUES(event_name),
event_reason=VALUES(event_reason),
event_description=VALUES(event_description),
event_time=VALUES(event_time),
event_alarm_time=VALUES(event_alarm_time),
event_submit_time=VALUES(event_submit_time),
event_region_code=VALUES(event_region_code),
event_detail_address=VALUES(event_detail_address),
event_level_code=VALUES(event_level_code),
event_submiter=VALUES(event_submiter),
event_submit_phone=VALUES(event_submit_phone),
event_submit_orgcode=VALUES(event_submit_orgcode),
event_submit_orgname=VALUES(event_submit_orgname),
event_submit_source=VALUES(event_submit_source),
create_time=VALUES(create_time),
event_longitude=VALUES(event_longitude),
event_latitude=VALUES(event_latitude),
event_class_code=VALUES(event_class_code),
event_status_code=VALUES(event_status_code),
event_has_attachment=VALUES(event_has_attachment)
;
END
新建存储过程中用到的getEventCode()函数:
CREATE DEFINER=`root`@`%` FUNCTION `getEventCode`(myNum INTEGER) RETURNS int(11)
BEGIN
DECLARE val INTEGER ;
SELECT
(case myNum
when '1' then '41100'
when '0101' then '41101'
when '0102' then '41102'
when '0103' then '41199'
when '0199' then '41199'
when '2' then '41304'
when '0201' then '41304'
when '0202' then '41304'
when '0203' then '41304'
when '0204' then '41304'
when '0205' then '41603'
when '0206' then '41304'
when '3' then '41200'
when '0301' then '41201'
when '0302' then '41201'
when '0303' then '41202'
when '0304' then '41203'
when '0305' then '41204'
when '0306' then '41205'
when '0307' then '41206'
when '0308' then '41207'
when '0309' then '41208'
when '0310' then '41209'
when '0311' then '41210'
when '0312' then '41211'
when '0313' then '41211'
when '0314' then '41299'
when '0315' then '41299'
when '0399' then '41299'
when '4' then '41300'
when '0401' then '41301'
when '0402' then '41301'
when '0403' then '41301'
when '0404' then '41301'
when '0405' then '41302'
when '0406' then '41214'
when '0407' then '40101'
when '0408' then '49901'
when '0499' then '49901'
when '5' then '41400'
when '0501' then '41401'
when '0502' then '41402'
when '0503' then '41499'
when '0599' then '41499'
when '6' then '21500'
when '0601' then '21500'
when '7' then '41500'
when '0701' then '41501'
when '0702' then '41502'
when '0703' then '41504'
when '0704' then '41305'
when '799' then '41599'
when '8' then '49900'
when '0801' then '49900'
when '0802' then '49900'
when '0899' then '49900'
when '9' then '41600'
when '0901' then '41601'
when '0902' then '41602'
when '0903' then '41699'
when '0904' then '41604'
when '0999' then '41699'
else '41699' end
) into val ;
return val ;
END
启动定时任务,每6小时执行一次:
//启动定时任务
DROP EVENT IF EXISTS event_unicom_to_dsj;
CREATE EVENT event_unicom_to_dsj
ON SCHEDULE EVERY 6 hour STARTS TIMESTAMP '2022-04-01 01:00:00'
ON COMPLETION PRESERVE
DO
BEGIN
CALL event_unicom_to_dsj();
END
;
Mysql开启binlog:
SELECT @@sql_mode
my.cnf 修改如下:
sql_mode=STRICT_TRANS_TABLES,NO_ZERO_IN_DATE,NO_ZERO_DATE,ERROR_FOR_DIVISION_BY_ZERO,NO_AUTO_CREATE_USER,NO_ENGINE_SUBSTITUTION
#开启binlog
binlog_format=row #// 行模式
binlog_checksum=none #//none取消CRC校验,crc32带crc校验
log_bin = /var/lib/mysql/mysqlr-bin #// 开启binlog,log文件路径
event_scheduler=ON;
新建mysql用户并赋权只能访问某数据库
create user 'zbzd_user'@'%' identified by 'PuYkpt!@956';
grant all privileges on zd_node_bak.* to zbzd_user@'%'identified by 'PuYkpt!@956';
FLUSH PRIVILEGES;
新建linux用户赋root权限
useradd xindian
passwd xindian
修改/etc/sudoers文件,找到下面一行,
## Allow root to run any commands anywhere
root ALL=(ALL) ALL
在root下面添加一行,如下所示:
xindian ALL=(ALL) ALL
以上是关于Mysql数据 抽取(extract)转换(transform)加载(load)实战的主要内容,如果未能解决你的问题,请参考以下文章