Hive优化

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Hive优化相关的知识,希望对你有一定的参考价值。

参考技术A

(二)数据倾斜的解决方案
1、参数调节
hive.map.aggr=true
Map 端部分聚合,相当于Combiner

hive.groupby.skewindata=true
有数据倾斜的时候 进行负载均衡 ,当选项设定为true,生成的查询计划会有两个 MR Job。第一个 MR Job 中,Map 的输出结果集合会随机分布到 Reduce 中,每个 Reduce 做部分聚合操作,并输出结果,这样处理的结果是相同的 Group By Key 有可能被分发到不同的 Reduce 中 ,从而达到负载均衡的目的;第二个 MR Job 再根据预处理的数据结果按照 Group By Key 分布到 Reduce 中(这个过程可以保证相同的 Group By Key 被分布到同一个 Reduce 中),最后完成最终的聚合操作。

2、SQL语句调节
Join:
关于驱动表的选取:选用join key分布最均匀的表作为驱动表

做好列裁剪和filter操作,以达到两表做join的时候,数据量相对变小的效果。

大表与小表Join
使用map join让小的维度表(1000条以下的记录条数)先进内存。在map端完成reduce。

大表与大表Join:
把空值的key变成一个字符串加上随机数,把倾斜的数据分到不同的reduce上,由于null值关联不上,处理后并不影响最终结果。

count distinct大量相同特殊值:
count distinct时,将值为空的情况单独处理,如果是计算count distinct,可以不用处理,直接过滤,在最后结果中加1。如果还有其他计算,需要进行group by,可以先将值为空的记录单独处理,再和其他计算结果进行union。

group by维度过小:
采用sum() 与group by的方式来替换count(distinct)完成计算。

特殊情况特殊处理:
在业务逻辑优化效果的不大情况下,有些时候是可以将倾斜的数据单独拿出来处理,最后union回去。

3、应用场景
(一)空值产生的数据倾斜
场景:
如日志中,常会有信息丢失的问题,比如日志中的user_id,如果取其中的user_id和用户表中的user_id 关联,会碰到数据倾斜的问题。

解决方法1 : user_id为空的不参与关联

解决方法2 :赋与空值新的key值

结论:
方法2比方法1效率更好,不但io少了,而且作业数也少了。
解决方法1中 log读取两次,job是2。
解决方法2中 job数是1 。这个优化适合无效 id (比如 -99 , ”, null 等) 产生的倾斜问题。把空值的key变成一个字符串加上随机数,就能把倾斜的数据分到不同的reduce上,解决数据倾斜问题。

(二)不同数据类型关联产生数据倾斜
场景:用户表中user_id字段为int,log表中user_id字段既有string类型也有int类型。当按照user_id进行两个表的Join操作时,默认的Hash操作会按int型的id来进行分配,这样会导致所有string类型id的记录都分配到一个Reducer中。

解决方法 :把数字类型转换成字符串类型

(三)小表不小不大,怎么用 map join 解决倾斜问题
使用 map join 解决小表(记录数少)关联大表的数据倾斜问题,这个方法使用的频率非常高,但如果小表很大,大到map join会出现bug或异常,这时就需要特别的处理。
解决如下:

users 表有 600w+ 的记录,把 users 分发到所有的 map 上也是个不小的开销,而且 map join 不支持这么大的小表。如果用普通的 join,又会碰到数据倾斜的问题。
解决方法:

假如,log里user_id有上百万个,这就又回到原来map join问题。所幸,每日的会员uv不会太多,有交易的会员不会太多,有点击的会员不会太多,有佣金的会员不会太多等等。所以这个方法能解决很多场景下的数据倾斜问题。

Hive入门Hive优化

拉链表

解决方案

如果已经采集的事务事实数据维度状态发生了变化,如何解决数据存储的问题?

覆盖

直接用新的状态覆盖老状态,会导致之前的过程信息丢失。不选用。

时间标记

通过时间来标记数据的每个状态,都保存下来。
通过时间来标记每个状态的存活周期:
startTime:这个状态的开始时间。
endTime:这个状态的结束时间。
最新状态的标记:将endTime设置为9999-12-31 23:59:59来标记当前最新状态。

这么做就可以根据时间来获取对应时间范围的状态:

select * from 表名 where endTime = '9999-12-31'

增加列

通过增加列的方式来标记每个状态,一般不选用,用于状态变化固定的情况(如:一步一步确认操作)。

实现流程

增量采集所有新增(Insert插入的数据)的和更新(Update更新的数据)的数据到更新表(也就是Update表)中。

将原来的拉链表与更新表的数据进行合并(合并过程中:对需要更新的数据进行修改,修改之前最新状态的endtime从9999-12-31更改为本次最新状态的starttime的前一天【T+1离线的情况】)得到最新的拉链表,存放在临时表中(也就是Tmp表)。

将临时表合并的结果覆盖到拉链表中:

INSERT OVERWRITE TABLE 拉链表名 partition (start_time) 
SELECT * from 临时表名;

Hive索引

Hive0.7开始,在3.0版之前都支持索引。3.0之后的版本不再支持索引。

create index 索引名 on 表名(索引字段);

索引的使用

Hive索引官方指南

Hive的索引对Hive中的数据对应的文件路径、文件中的偏移量构建索引信息,做过滤查询加快MapReduce读取数据的性能。本质还是通过一个MapReduce对所有数据构建索引,将索引信息存储在索引表中。

索引的问题

Hive的索引不会自动更新,后插入的数据并没有索引。只能强制手动更新索引(通过命令走MapReduce程序手动更新索引数据),管理起来并不省心。

ORC索引

ORC文件类型

ORC文件是列式存储,对于列的处理更加友好。相同的数据,占用的空间更小。支持基于数据内容构建文件级别索引。ORC文件中保存了三个层级的统计信息,分别为文件级别stripe级别row group级别的。它们都可以用来根据Search ARGuments(谓词下推条件)判断是否可以跳过某些数据,在统计信息中都包含成员数和是否有null值,并且对于不同类型的数据设置一些特定的统计信息。

ORC文件的内部存储结构:ORC每部存储是按照Strip划分存储的。多个Strip的每个Strip中存储真实的数据。

Row Group Index

将每一列在这个Strip中对应最大值和最小值进行记录,当用户进行比较查询时,可以通过记录的最大与最小值判断查询的数据是否在这个Strip中。如果在,读取Strip,如果不在就直接跳过,不读取Strip。

做范围比较,一般用唯一标识(unique)的那一列,例如id等来进行排序,作为查询条件。

创建表时,需要指定开启row group index:

create  表名()
stored as orc (’orc.create.index=true)

查询数据时,需要开启row group index 过滤查询:

hive.optimize.index.filter=true

想实现基于范围的查询索引过滤,必须由用户自己保证写入orc的数据是基于查询条件有序的:

insert into 表名2
select id,字段名 from 表名1 sort by id 

按照这个排序的字段做过滤,就可以走行组索引,实现范围过滤。

Boom Fitter Index

布隆过滤索引

row group :范围过滤,根据某列排序的结果做范围比较。
bloom filter:等值过滤。

一般的查询条件

时间:直接是分区字段,直接使用分区过滤。
唯一标识符:基于row group index。
如果想要把其他的列作为查询条件,需要指定:为某些列在Strip中构建索引。

根据指定的列,在生成ORC文件时,在每个Strip中构建这一列对应的再这个Strip中的所有值,当进行等值判断时,直接读取对应的索引进行判断,如果在这个Strip中,就读取,如果不在,就跳到下一个Strip,降低数据读取的IO。

创建表的时候,指定为某一列创建索引:

create table () stored as orc (”orc.bloom.filter.columns=”列名称”)
create table () stored as orc (”orc.bloom.filter.columns=”name”)

小文件处理

如何解决底层MapReduce对小文件处理性能比较差的情况?

避免产生小文件:如果产生了就合并存储。
处理数据时提前合并小文件。

如果真的产生了很多小文件,MapReduce中输入类也有措施:
TextInputFormat:按照块的1.1倍大小来划分分片。
CombinerFileInputFormat:将所有文件进行合并以后,再分片。

#设置Hive中底层MapReduce读取数据的输入类:将所有文件合并为一个大文件作为输入
set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;

#如果hive的程序,只有maptask,将MapTask产生的所有小文件进行合并
set hive.merge.mapfiles=true;
#如果hive的程序,有Map和ReduceTask,将ReduceTask产生的所有小文件进行合并
set hive.merge.mapredfiles=true;
#每一个合并的文件的大小
set hive.merge.size.per.task=256000000;
#平均每个文件的大小,如果小于这个值就会进行合并
set hive.merge.smallfiles.avgsize=16000000;

这样配置,如果多个MapTask的平均大小不足16M,就会自动启动一个MapReduce将结果就行合并。

其它属性优化

矢量化查询

按照批次对数据进行读取查询:
不开启:Hive读一条处理一条。
开启:Hive每个批次读取1024条,处理1024条。

set hive.vectorized.execution.enabled = true;
set hive.vectorized.execution.reduce.enabled = true;

零拷贝

数据在操作系统中,从内存读取时不用经过多次拷贝,直接读取(正常情况数据加载到内存条时还会有拷贝。。。CPU中也有3级Cache。很多情况需要多次拷贝到缓存才能被读取到):
不开启:必须在内存中经过多次交换才能读取到数据。
开启了:数据可以直接从内存中读取。

set hive.exec.orc.zerocopy=true;

关联优化器

Hive在解析SQL语句,转换为MapReduce时候,可以将相关联的部分合并在一起执行。
例如:

select deptno,avg(salary) as avgsal from table group by deptno order by deptno;

不开启:先走第一个MapReduce进行分组,结果保存在磁盘,再走第二个MapReduce进行排序,得到最终的结果。
开启了:自动判断所有执行过程语法数是否有重合的过程,有就放在一起执行。这样只需要启动一个MapReduce,就可以得到最终的结果

以上是关于Hive优化的主要内容,如果未能解决你的问题,请参考以下文章

Hive优化之Hive的配置参数优化

Hive与优化方法

Hive与优化方法

Hive与优化方法

Hive优化

Hive优化