Hive拉链表

Posted 虎鲸不是鱼

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Hive拉链表相关的知识,希望对你有一定的参考价值。

Hive拉链表

背景

笔者在将DataStage任务翻写为Hive On Tez任务时,遇到一个拉链表,实在是头大,特此将脱敏后的套路及心得记录下来,以备后续翻阅。

原理

DataStage

脱敏后大致如图所示。首先从异构数据源获取新数据。然后和Oracle的结果表做left join,关联失败的当然是要插入到结果表的新数据,不解释。关联成功后,要去判断非主键的字段是否发生了变化,如果没有变化,当然是还要保留;如果发生了变化,就要对历史数据做Update,并且将变更后的新数据给插入到结果表。古人用DataStage操作的还是相当的熟练,奈何Oracle大势已去,现在是大数据的天下了。。。

HQL

HQL不像Spark和Flink那样可以从异构数据源做联邦查询。但是异构数据源的问题好办,统一入湖到Hive的ODS即可,就可以一句HQL读到数据。

由于结果表不是那种ACID的Orc表,Parquet表就没有办法去Update了。但是问题总还是需要解决的。

可以考虑全量覆盖的方式,将保留不变的历史数据、新增的数据、修改后的数据、修改后补充插入的新数据这4部分累加起来,就是所需的结果了。但是从没有分区的Hive表读数据再回灌回去有个问题。。。

由于做insert overwrite时,会在location指定的hdfs路径生成一个./tmp的中间路径,写完后,会删除老的Parquet文件,再去rename将./tmp的parquet“搬”回来。当删除老的Parquet文件,还没有来得及rename时MR任务失败,这种小概率事件有可能导致数据发生丢失,就需要手动敲命令抓trash或者手动rename,具有一定的风险,而且不利于发生异常时重跑和修数据,所以要采用类似Java中2PC【两阶段提交】的模式,先将结果灌入到tmp表,再从tmp表回灌到结果表。这么做可以保障事务,当HQL任务跑失败时可以回滚。就可以方便后续SQL Boy们运维时重跑任务。如果数据全部是从上游运算出来的,那么这种不保留历史数据的情况,大不了重跑,直接从结果表读数据运算后写回去也没啥太大的问题。

遇到的问题

Caused by:ong.apache.hadoop.hive.gl.parse.SemanticException:Column 代理键SK Found in more than OneTables/Subauenies --所以使用如下方式:insert overwrite覆盖历史数据,再insert into追加新数据

由于HQL写的太长了,可能遇到了解析的问题,所以解析失败,但是4个tmp单独都是能跑出数据的。所以笔者灵机一动,采用先overwrite覆盖历史数据,再去insert into追加数据的方式。由于迁移平台一定要迁移历史数据,那么这样也可以保证重跑任务时的幂等性。

伪代码

具体的业务不同,大体上拉链表都是这样的。

脱敏后大体如下:

with tmp_new as(--获取到异构数据源的新数据
	select
    	需要的字段
    from
    	统一入湖到ODS的Hive表1
    where
    	某些条件
    
    union all
    
    select
    	需要的字段
    from
    	统一入湖到ODS的Hive表2
    where
    	某些条件
    
    union all
    
    select
    	需要的字段
    from
    	统一入湖到ODS的Hive表3
    where
    	某些条件
),
tmp_self1 as(--从自己取数
	select
		需要的字段,
		代理键sk
	from
		result_table t1
),
tmp_main as(--join出主数据
	select
		t1.需要的字段,
		t2.需要的字段
	from
		tmp_new t1
	left join
		tmp_self1 t2
		on t1.主键=t2.主键
),
tmp_change as(--获取到发生变化的数据
	select
		需要的字段,
		t2.需要的字段,
		t2.代理键sk
	from
		tmp_main t1
	where
		代理键sk is not null	--tmp_self1的代理键
		and(
			新的非主键字段值!=对应的原来的非主键字段值
        )
),
tmp_exists_no as(--join失败的数据就是结果表不存在的数据,后续insert用
	select
		需要的字段,
		t2.需要的字段
	from
		tmp_main t1
	where
		代理键sk is null
),
tmp_exists_no_ins as(--实际插入的数据
	select
		需要的字段,
		1 as flg_是否有效,
		昨天 as 开始的有效日期,
		to_date('9999-12-31') as 结束日期,
    	case
    		when 最大的sk is null then 代理键sk,
    		else 代理键sk+最大的sk
    		end as 代理键sk    	
	from(
		select
        	需要的字段,
        	row_number() over(order by 随便一个字段) as 代理键sk--后续生成实际的代理键时要用
        from(
        	select
            	t3.需要的字段,
            	t3.代理键sk,
            	t4.最大的sk
            from(
				select
                	t1.需要的字段,
                	t2.代理键sk_self
				from
                	tmp_exists_no t1
				left join(--需要改动的数据
                	select
                    	主键,
                    	代理键sk as 代理键sk
                    from
                    	result_table
                    where
                    	结束日期=前天
                	) t2
                	on t1.主键=t2.主键
            	)t3,
            	(
                	select
                    	cast(max(代理键sk) as int) as 最大的sk
                    from
                    	result_table
                )t4
        	)t5
    	) t6
),
tmp_change_update as(--发生变化的数据要更新
	select
		t1.代理键,
		t2.所需保留数据的字段
		前天 as 结束日期,--需要修改的字段
		0 as flg_是否有效,--需要修改的字段
		current_timestamp() as 末次时间戳
	from
		tmp_change t1,
		result_table t2
	where
		t1.主键=t2.主键
),
tmp_change_ins as(
    select
    	case
    		when 最大的sk is null then 代理键sk,
    		else 代理键sk+最大的sk
    		end as 代理键sk,
    	需要的字段,
    	昨天 as 开始的有效日期,
    	current_timestamp() as 加载时间戳,
    from(
        select
        	需要的字段,
        	最大的sk,
        	row_number() over(order by 随便一个字段) as 代理键sk--生成从1开始的连续自然数
        from(
        	select
            	t1.需要的字段,
            	t2.最大的sk
            from
            	tmp_change t1,
            	(
            		select
                    	cast(max(代理键sk) as int) as 最大的sk
                    from(
                    	select
                        	代理键sk
                        from
                        	result_table
                        
                        union all
                        
                        select
                        	代理键sk
                        from
                        	tmp_exists_no_ins	--因为是先插入这部分,才去插入修改后的数据。当然也可以换个顺序,这一坨就移到tmp_exists_no_ins中
                    ) t1
            	) t2
        )t3
    )t4
),
tmp_his as(--需要保留的历史数据
select
	需要的字段
from
	result_table
where
	代理键sk not in (--不需要update的数据就是需要保留的历史数据,代理键sk唯一确定数据,可以当主键用
    	select
        	代理键sk
        from
        	tmp_change_update
    )
)
insert overwrite table tmp表	--一定有历史数据
select
	需要的字段
from
	tmp_his
;

--这里在ctrl v一大坨一毛一样的HQL
insert into table tmp表	--追加新insert的数据
select
	需要的字段
from
	tmp_change_update
	
union all

select
	需要的字段
from
	tmp_change_ins
	
union all

select
	需要的字段
from
	tmp_exists_no_ins
;

由于笔者不是专业的SQL Boy,实在是没有能力一句insert 写完,故按照Spark的那种一步一个DataFrame的方式,HQL中一步一个tmp。。。

如果解决了Found in more than OneTables/Subauenies的问题,其实HQL可以短一点。。。

广大学徒工们可以参照笔者的套路,保持大体结构,修改业务不相同的部分,即可开发出拉链表的Hive On Tez任务。

转载请注明出处:https://lizhiyong.blog.csdn.net/article/details/129679071

以上是关于Hive拉链表的主要内容,如果未能解决你的问题,请参考以下文章

大数据Hive3.x数仓开发数仓中数据发生变化如何实现数据存储--拉链表详解

hive中拉链表的设计

Hive拉链表实战-SQL模拟hive仓库拉链表实现

大数据之Hive:hive中的if函数

Hive拉链表

Hive数据模型之历史拉链表