Mysql数据 抽取(extract)转换(transform)加载(load)实战

Posted 清_澈

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Mysql数据 抽取(extract)转换(transform)加载(load)实战相关的知识,希望对你有一定的参考价值。

表格格式化 

 pager less -SFX;

1、场景一:当基础数据表的数据量比较大(300万以上),需要关联多个表(数据量在100万左右)时,多表left join容易超时,需要分步汇总。

insert into lkg.lkg_source_population (rowguid,
Name,
Sex,
BirthDay,
Nation,
BirthPlace,
Marriage,
PartyOrgName,
Education,
Religion,
ContactsPhone,
NowDistrict,
NowAddress,
GridName,
HOUSEHOLD_NO,
Relation
)
select a.IDCard,
a.Name,
case when a.Sex='DBF2523E-9D19-465D-980A-280F2AA501C6' then '02'
when a.Sex='1F8A8A4C-2013-4988-9AB8-29CD3FB4F11A' then '01' end as Sex ,
a.BirthDate,
case when a.Nation='3434A21E-E783-413E-BAD6-3777DB12D8FA'  then '01' end as Nation ,
a.NativePlace as BirthPlace,
a.MaritalStatus as Marriage,
a.PoliticsStatus as PartyOrgName,
a.Education as Education,
a.ReligiousFaith as Religion,
a.Phone as ContactsPhone,
b.DomicileCode as NowDistrict,
b.DomicileDetail as NowAddress,
c.Name as GridName,
e.HuHao as HOUSEHOLD_NO,
d.Yhzgx as Relation
from test.WG_People_Resident a
 left join test.WG_People_Household b on a.Id = b.ResidentID
left join test.organize c on c.id =b.AreaID
left join test.wg_hjyrkglb d on d.ResidentID = a.id
left join test.WG_RegisterFamily e on e.ID = d.HhID

数据量大时上边的sql容易造成超时中断,分步汇总步骤如下:

创建临时表,先汇总两个表的数据:

CREATE TABLE temp_people_resident
select t1.IDCard,
t1.HouseId as Bcellid,
t1.Name,
t1.Sex,
t1.BirthDate,
t1.Nation,
t1.NativePlace,
t1.MaritalStatus,
t1.PoliticsStatus,
t1.Education,
t1.ReligiousFaith,
t1.Phone,
t2.CreateDate,
t2.DomicileCode,
t2.DomicileDetail,
t2.ResidentID,
t2.AreaID,
null as HOUSE_ID,
null as BUILDING_ID,
null as HhID,
null as GRID_CODE,
null as GRID_NAME,
null as HuHao,
null as Yhzgx 
from test.wg_people_resident t1 
left JOIN 
(select ResidentID,DomicileCode,DomicileDetail,AreaID,CreateDate from test.wg_people_household) t2 on t1.ID = t2.ResidentID;

去重(按身份证号保留最新的一条)操作:

DELETE 
FROM
	test.temp_people_resident t
WHERE
	IDCard NOT IN (
	SELECT
		t.IDCard 
	FROM
		(
		SELECT
			IDCard 
		FROM
			test.temp_people_resident t1 
		WHERE
			t1.CreateDate = ( SELECT max( t2.CreateDate ) FROM test.temp_people_resident t2 WHERE t1.IDCard = t2.IDCard ) 
		) t 
	)

或者按照id去重:

delete from base_household_population_final where id not in (
        select t.max_id from
        (select max(id) as max_id from base_household_population_final group by ID_CARD,GRID_CODE) as t
        );

逐个关联更新其他表:

update test.temp_people_resident  a left join test.organize c on c.id =a.AreaID  set a.GRID_CODE = c.zoningCode, a.GRID_NAME = c.Name;

update test.temp_people_resident  a left join test.wg_hjyrkglb d on d.ResidentID = a.ResidentID  set a.HhID = d.HhID, a.Yhzgx = d.Yhzgx;

update test.temp_people_resident  a left join test.WG_RegisterFamily d on d.ID = a.HhID  set a.HuHao = d.HuHao;

update test.temp_people_resident  a left join test.wg_bcell d on d.Bcellid = a.Bcellid  set a.HOUSE_ID = d.HOUSE_ID, a.BUILDING_ID = d.BUILDING_ID;

至此汇总完毕。

2、场景二:通过存储过程和数据库定时任务自动执行数据倒库的工作

新建存储过程:

利用mysql的ON DUPLICATE KEY UPDATE操作,将数据灌倒指定表

CREATE DEFINER=`root`@`%` PROCEDURE `event_unicom_to_dsj`()
BEGIN
insert into zbzf_data_bak.events_business (
guid,
event_id,
event_code,
event_name,
event_class_code,
event_reason,
event_description,
event_time,
event_alarm_time,
event_submit_time,
event_region_code,
event_detail_address,
event_longitude,
event_latitude,
event_level_code,
event_status_code,
event_has_attachment,
event_submiter,
event_submit_phone,
event_submit_orgcode,
event_submit_orgname,
event_submit_source,
create_time )
select 
concat('zblt',substring(t1.EVENT_ID,4,20),UNIX_TIMESTAMP(t1.REPORT_TIME) ) as guid,
(case  when LENGTH(t1.EVENT_ID)>36 then substring(t1.EVENT_ID,1,36)
else t1.EVENT_ID end) as event_id,
concat(left(t1.GRID_CODE,8),getEventCode(t1.EVENT_CATEGORY),replace(CURDATE(),'-',''),t1.EVENT_CATEGORY,right(UNIX_TIMESTAMP(t1.REPORT_TIME),6),1 ) as event_code,
t1.EVENT_NAME,
getEventCode(t1.EVENT_CATEGORY) as event_class_code,
t1.EVENT_DESCRIPTION as event_reason,
t1.EVENT_DESCRIPTION as event_description,
t1.REPORT_TIME as event_time,
t1.HAPPEN_DATE as event_alarm_time,
t1.COMPLETE_TIME as event_submit_time,
left(t1.GRID_CODE,8) as event_region_code,
t1.HAPPEN_PLACE as event_detail_address,
t1.LNG as event_longitude,
t1.LAT as event_latitude,
t1.COMPLETE_LEVEL as event_level_code,
case t1.SUCCESSFUL_OR_NO when 'Y' then '4'
when 'N' then '1'
else '2' end as event_status_code,
0 as event_has_attachment,
substring(t1.REAPORTPERSON,0,20) as event_submiter,
t2.PHONENUMBER as event_submit_phone,
t1.GRID_CODE as event_submit_orgcode,
t1.GRID_NAME as event_submit_orgname,
1100 as event_submit_source,
t1.UPDATE_DATE as create_time
 from sd_unicom_node_bak.BASE_DISPUTE_PROCESS t1 
 LEFT JOIN sd_unicom_node_bak.BASE_GRID_USER t2 on t1.REAPORTPERSON = t2.account
where t1.UPDATE_DATE > DATE_SUB(NOW(),INTERVAL 1 DAY)
ON DUPLICATE KEY UPDATE
EVENT_ID=VALUES(EVENT_ID),
event_code=VALUES(event_code), 
event_name=VALUES(event_name), 
event_reason=VALUES(event_reason), 
event_description=VALUES(event_description), 
event_time=VALUES(event_time), 
event_alarm_time=VALUES(event_alarm_time), 
event_submit_time=VALUES(event_submit_time), 
event_region_code=VALUES(event_region_code), 
event_detail_address=VALUES(event_detail_address), 
event_level_code=VALUES(event_level_code), 
event_submiter=VALUES(event_submiter), 
event_submit_phone=VALUES(event_submit_phone), 
event_submit_orgcode=VALUES(event_submit_orgcode), 
event_submit_orgname=VALUES(event_submit_orgname), 
event_submit_source=VALUES(event_submit_source), 
create_time=VALUES(create_time), 
event_longitude=VALUES(event_longitude), 
event_latitude=VALUES(event_latitude),
event_class_code=VALUES(event_class_code),
event_status_code=VALUES(event_status_code),
event_has_attachment=VALUES(event_has_attachment)
;

END

新建存储过程中用到的getEventCode()函数:

CREATE DEFINER=`root`@`%` FUNCTION `getEventCode`(myNum INTEGER) RETURNS int(11)
BEGIN 

DECLARE val INTEGER ;

SELECT 
(case myNum 
when '1' then '41100'
when '0101' then '41101'
when '0102' then '41102'
when '0103' then '41199'
when '0199' then '41199'
when '2' then '41304'
when '0201' then '41304'
when '0202' then '41304'
when '0203' then '41304'
when '0204' then '41304'
when '0205' then '41603'
when '0206' then '41304'
when '3' then '41200'
when '0301' then '41201'
when '0302' then '41201'
when '0303' then '41202'
when '0304' then '41203'
when '0305' then '41204'
when '0306' then '41205'
when '0307' then '41206'
when '0308' then '41207'
when '0309' then '41208'
when '0310' then '41209'
when '0311' then '41210'
when '0312' then '41211'
when '0313' then '41211'
when '0314' then '41299'
when '0315' then '41299'
when '0399' then '41299'
when '4' then '41300'
when '0401' then '41301'
when '0402' then '41301'
when '0403' then '41301'
when '0404' then '41301'
when '0405' then '41302'
when '0406' then '41214'
when '0407' then '40101'
when '0408' then '49901'
when '0499' then '49901'
when '5' then '41400'
when '0501' then '41401'
when '0502' then '41402'
when '0503' then '41499'
when '0599' then '41499'
when '6' then '21500'
when '0601' then '21500'
when '7' then '41500'
when '0701' then '41501'
when '0702' then '41502'
when '0703' then '41504'
when '0704' then '41305'
when '799' then '41599'
when '8' then '49900'
when '0801' then '49900'
when '0802' then '49900'
when '0899' then '49900'
when '9' then '41600'
when '0901' then '41601'
when '0902' then '41602'
when '0903' then '41699'
when '0904' then '41604'
when '0999' then '41699'
else '41699' end
) into val ;

return val ;

END

启动定时任务,每6小时执行一次:

//启动定时任务
DROP EVENT IF EXISTS event_unicom_to_dsj;
CREATE EVENT event_unicom_to_dsj
ON SCHEDULE EVERY 6 hour STARTS TIMESTAMP '2022-04-01 01:00:00'
ON COMPLETION PRESERVE
DO
BEGIN
CALL event_unicom_to_dsj();
END
;

Mysql开启binlog:

SELECT @@sql_mode

my.cnf 修改如下:

sql_mode=STRICT_TRANS_TABLES,NO_ZERO_IN_DATE,NO_ZERO_DATE,ERROR_FOR_DIVISION_BY_ZERO,NO_AUTO_CREATE_USER,NO_ENGINE_SUBSTITUTION 
#开启binlog
binlog_format=row  #// 行模式
binlog_checksum=none   #//none取消CRC校验,crc32带crc校验
log_bin = /var/lib/mysql/mysqlr-bin  #// 开启binlog,log文件路径

event_scheduler=ON;

新建mysql用户并赋权只能访问某数据库

create user 'zbzd_user'@'%' identified by 'PuYkpt!@956';

grant all privileges on zd_node_bak.* to zbzd_user@'%'identified by 'PuYkpt!@956';

FLUSH PRIVILEGES;

新建linux用户赋root权限

useradd xindian

passwd xindian

修改/etc/sudoers文件,找到下面一行,

## Allow root to run any commands anywhere

root ALL=(ALL) ALL

在root下面添加一行,如下所示:

xindian  ALL=(ALL) ALL

以上是关于Mysql数据 抽取(extract)转换(transform)加载(load)实战的主要内容,如果未能解决你的问题,请参考以下文章

Mysql数据 抽取(extract)转换(transform)加载(load)实战

json数据怎么抽取实体

相关英语简称

etl是啥

黑马程序员《数据清洗》学习笔记CSVJSON数据抽取

特征抽取: sklearn.feature_extraction.FeatureHasher