hive缓慢变化维

Posted zeroLinked

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了hive缓慢变化维相关的知识,希望对你有一定的参考价值。

维度建模的数据仓库中,有一个概念叫Slowly Changing Dimensions,中文一般翻译成”缓慢变化维”,经常被简写为SCD。缓慢变化维的提出是因为在现实世界中,维度的属性并不是静态的,它会随着时间的流失发生缓慢的变化。这种随时间发生变化的维度我们一般称之为缓慢变化维,并且把处理维度表的历史变化信息的问题称为处理缓慢变化维的问题,有时也简称为处理SCD的问题。

  • 针对的需求:
    • 表中的部分字段会被更新
    • 需要查看某一个时间点或者时间段的历史快照信息
    • 变化的比例和频率不是很大

举个例子

我们在业务表中,比如工单(workorder)这张表,由于从工单创建到工单归档这中间需要很多处理人进行处理,处理时长也相对较长,我们从关系型数据到hive或hbase中就可能需要用到缓慢变化维。

比如,我们按照一天的间隔对数据进行抽取。第一天,mysql数据表如下:

idstatuscreateTimeupdateTime
112022-11-052022-11-05
222022-11-052022-11-05
312022-11-052022-11-05
422022-11-052022-11-05

到了第二天,MySQL中数据表如下:

idstatuscreateTimeupdateTime
112022-11-052022-11-05
232022-11-052022-11-06
312022-11-052022-11-05
422022-11-052022-11-05
522022-11-062022-11-06
622022-11-062022-11-06

很显然,第二天对id为2的数据进行了更新,且新增了id为5和6的两条新的数据。
对于在hive中,我们对于抽取过来的数据,需要做进一步的处理:

  • 针对当日抽取得到的数据进行分区(ods层)
  • 再建立dw层数据,即从ods层到dw层的数据处理,我们赋予两个新的字段:startTime和endTime
    • startTime,就是当日抽取的时间
    • endTime,就是我们数据的结束时间,对于新的数据,我们用9999-12-31表示,对于旧的数据,我们可以将其为抽取那天

实现过程

1、模拟数据

建立工单表,这里只做demo演示,所以只抽取更新的字段

在mysql中插入数据,表示为第一天以来的全部数据

create table if not exists workorder( 
id varchar(50), 
status varchar(50), 
createtime varchar(50), 
updatetime varchar(50)
) ; 
insert into workorder(id , status , createtime, updatetime) values
('1', '1', '2022-11-05', '2022-11-05'),
('2', '2', '2022-11-05', '2022-11-05'),
('3', '1', '2022-11-05', '2022-11-05'),
('4', '2', '2022-11-05', '2022-11-05');

2、导入第一天的数据

在hive中,建立ods层的表,并分区,导入第一天数据

-- 创建ods层表
create table if not exists ods_workorder(
  id string,
  status string,
  createtime string,
  updatetime string
)
partitioned by (dt string)
row format delimited fields terminated by '\\t';
-- 新增表分区
alter table ods_workorder add  partition (dt='2022-11-05');
-- 创建dw层表
create table if not exists dw_workorder(
  id string,
  status string,
  createtime string,
  updatetime string,
  starttime string,
  endtime string
)
row format delimited fields terminated by '\\t';

使用sqoop导入第一天数据到hive的ods层

sqoop import \\
--connect jdbc:mysql://localhost:3306/test \\
--username root \\
--password 123456 \\
--table workorder \\
--m 1  \\
--delete-target-dir \\
--fields-terminated-by '\\t' \\
--target-dir /user/hive/warehouse/ods_workorder/dt=2022-11-05

将第一天数据从ods导入dw层,由于是第一天数据,endtime默认是9999-12-31

insert overwrite table dw_workorder
select
  id string,
  status string,
  createtime string,
  updatetime string,
  updatetime as starttime, 
  '9999-12-31' as endtime
from
  ods_workorder
where
  dt = '2022-11-05';

3、导入第二天数据

第二天,mysql中新增两条并更新了一条

UPDATE workorder SET status = '3', modifytime = '2022-11-06' WHERE id = '3';
INSERT INTO workorder(id, status, createtime, updatetime) VALUES
('5', '2', '2022-11-06', '2022-11-06'),
('6', '2', '2022-11-06', '2022-11-06');

使用sqoop进行增量导入

sqoop import \\
--connect jdbc:mysql://localhost:3306/test \\
--username root \\
--password 123456 \\
--target-dir /user/hive/warehouse/ods_workorder/dt=2022-11-06 \\
--query "select * from workorder where updatetime = '2022-11-06' and \\$CONDITIONS" \\
--delete-target-dir \\
--fields-terminated-by '\\t' \\
--m 1 

这时候,在dw层中,我们需要新增一张表,用于计算endtime,同时,它存储在dw层中对于dw_workorder的“前面的数据”,在hive中

create temporary  table if not exists dw_tmp_workorder(
  id string,
  status string,
  createtime string,
  updatetime string,
  starttime string, 
  endtime string  
)
row format delimited fields terminated by '\\t';

关键代码处理:将更新的旧数据设置endtime为2022-11-05,表示生效时间为当天,新的数据的endtime字段更新为9999-12-31,标识为最新的数据

insert overwrite table  dw_tmp_workorder
select
  t1.id,                
  t1.status,            
  t1.createtime,              
  t1.updatetime,              
  t1.starttime,           
  case when (t2.id is not null and t1.endtime = '9999-12-31' )
  then '2022-11-05'
  else t1.endtime
  end as endtime 
from
  dw_workorder t1
  left join
  (select * from ods_workorder where dt='2022-11-06') t2
   on t1.id = t2.id
union all
select
  id,               
  status,          
  createtime,              
  updatetime,             
  updatetime as starttime, 
   '9999-12-31' as endtime 
from
  ods_workorder where dt='2022-11-06';

最后我们将tmp表中的数据插入到dw_workorder中

insert overwrite table  dw_workorder 
select * from dw_tmp_workorder;

至此,我们的数据就处理完成了

查询旧数据:

 select * from dw_workorder where starttime <=  '2022-11-05' and endtime >= '2022-11-05' ;

查询最新数据:

 select * from dw_workorder where endtime = '9999-12-31' ;

以上是关于hive缓慢变化维的主要内容,如果未能解决你的问题,请参考以下文章

hive数仓中缓慢变化维

大数据学习(三十一)数据仓库如何处理缓慢变化维

SCD缓慢变化维拉链表

关于缓慢变化维

深入解析数据仓库中的缓慢变化维

12.缓慢变化维处理技术