HIVE优化和数据倾斜合并小文件

Posted 吃再多糖也不长胖

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了HIVE优化和数据倾斜合并小文件相关的知识,希望对你有一定的参考价值。

HIVE优化和数据倾斜、合并小文件

执行计划(explain)

EXPLAIN [EXTENDED(详细) | DEPENDENCY | AUTHORIZATION] query

没有生成MR任务的explain

hive (default)> explain select * from emp;
Explain
STAGE DEPENDENCIES:
  Stage-0 is a root stage

STAGE PLANS:
  Stage: Stage-0
    Fetch Operator
      limit: -1
      Processor Tree:
        TableScan
          alias: emp
          Statistics: Num rows: 1 Data size: 7020 Basic stats: COMPLETE Column stats: NONE
          Select Operator
            expressions: empno (type: int), ename (type: string), job (type: string), mgr (type: int), hiredate (type: string), sal (type: double), comm (type: double), deptno (type: int)
            outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5, _col6, _col7
            Statistics: Num rows: 1 Data size: 7020 Basic stats: COMPLETE Column stats: NONE
            ListSink

生成MR任务的explain

hive (default)> explain select deptno, avg(sal) avg_sal from emp group by deptno;
Explain
STAGE DEPENDENCIES:
  Stage-1 is a root stage
  Stage-0 depends on stages: Stage-1
STAGE PLANS:
  Stage: Stage-1
    Map Reduce
      Map Operator Tree:
          TableScan
            alias: emp
            Statistics: Num rows: 1 Data size: 7020 Basic stats: COMPLETE Column stats: NONE
            Select Operator
              expressions: sal (type: double), deptno (type: int)
              outputColumnNames: sal, deptno
              Statistics: Num rows: 1 Data size: 7020 Basic stats: COMPLETE Column stats: NONE
              Group By Operator
                aggregations: sum(sal), count(sal)
                keys: deptno (type: int)
                mode: hash
                outputColumnNames: _col0, _col1, _col2
                Statistics: Num rows: 1 Data size: 7020 Basic stats: COMPLETE Column stats: NONE
                Reduce Output Operator
                  key expressions: _col0 (type: int)
                  sort order: +
                  Map-reduce partition columns: _col0 (type: int)
                  Statistics: Num rows: 1 Data size: 7020 Basic stats: COMPLETE Column stats: NONE
                  value expressions: _col1 (type: double), _col2 (type: bigint)
      Execution mode: vectorized
      Reduce Operator Tree:
        Group By Operator
          aggregations: sum(VALUE._col0), count(VALUE._col1)
          keys: KEY._col0 (type: int)
          mode: mergepartial
          outputColumnNames: _col0, _col1, _col2
          Statistics: Num rows: 1 Data size: 7020 Basic stats: COMPLETE Column stats: NONE
          Select Operator
            expressions: _col0 (type: int), (_col1 / _col2) (type: double)
            outputColumnNames: _col0, _col1
            Statistics: Num rows: 1 Data size: 7020 Basic stats: COMPLETE Column stats: NONE
            File Output Operator
              compressed: false
              Statistics: Num rows: 1 Data size: 7020 Basic stats: COMPLETE Column stats: NONE
              table:
                  input format: org.apache.hadoop.mapred.SequenceFileInputFormat
                  output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
                  serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe

  Stage: Stage-0
    Fetch Operator
      limit: -1
      Processor Tree:
        ListSink

Fetch 抓取

fetch抓取是指hive中对某些情况的查询可以不必使用mapreduce计算,例如select * from emp; 这种情况下hive可以简单读取emp对应的存储目录下的文件,然后输出结果到控制台。

在hive-default.xml.template文件中hive.fetch.task.conversion默认为more,老版本为minimal,修改为more后,在全局查找、字段查找、limit查找都不走mapreduce。

本地模式

hive的执行通常会触发整个hadoop集群的MR启动。有时候hive的输入量很小,这种情况下为了查询触发MR任务消耗的时候会比实际job的执行时间会多的多。
对大多数情况下,hive通过本地模式单机上处理所有的任务,对于小数据集,执行时间可以被明显缩短。

可以设置hive.exec.model.local.auto为true来启动这个优化。 同时设置set
hive.exec.mode.local.auto.inputbytes.max=50000000; 当输入量小于这个值的时候,启动单机。
或者设置set
hive.exec.mode.local.auto.input.files.max=10;当输入最大文件数小于这个数的时候,启动单机。

小表join大表(MapJoin)

旧版本:将key相对分散、且数据量小的表放在join左边,可以有效减少内存溢出错误的发生。然后再进一步使用map join 让小的维度表(小于1000条数据)先在map端完成join操作。
新版本:对小表join大表和大表join小表做了优化,小表放在左右两边都行。

开启mapjoin:set hive.auto.convert.join=true;默认是true。

mapjoin工作机制:
1.local task a 扫描小表,将数据转换为hash table写入到本地哈希文件中,然后等到下一阶段mr任务启动的时候,加入到distribute cache中缓存。
2.task b 是一个没有reduce的MR,启动MR,在map阶段 task扫描大表b,然后用大表b的每一条数据去跟distribute cache进行join,直接输出结果。
3.因为mapjoin没有reduce,由map端直接输出结果文件,所以由多少个task就有多少结果文件。

大表join大表

大表join大表 有时候join超时是因为大表中某些key数据太多了,相同的key发送到相同的reducer中,导致内存不足。
很多情况下,这些key对应的都是异常数据,我们需要在sql中进行过滤或者转换。

空值异常key需要过滤

如果空值是异常数据的话,需要过滤
join前加where xx is not null

空值非异常需要转换

有时候某个key空值很多,但是相对应的数据不是异常数据,需要要包含在join中,这时可以将表a的key为空的字段赋予一个随机的值,使得数据随即均匀的落到不同reducer中。
select a.* from tablea a full join bigtableb b on nvl(n.id,rand()) = b.id;

Group by 优化

默认情况下,map阶段的同一key值数据发送到同一个reducer中,当一个key数据过大时就出现数据倾斜问题。
但是并不是所有的聚合操作都必须在reducer端处理完成,很多聚合操作可以在map端进行部份聚合,然后在reduecr得出最后结果。
设置:

set hive.map.aggr = true; 是否在map端聚合
set hive.groupby.mappper.在map端聚合的数据条数
set hive.groupby.skewindata =true;有数据倾斜的时候开启负载均衡。

负载均衡group by 流程:
当选项为true的时候,生成的查询计划会有两个MR job。第一个job中,Map的输出结果会随机分配到reduce中,每个reduce做部分聚合,并输出结果。
这样的好处是相同的group by key 极有可能分发到不同的reduce中。达到负载均衡的目的。
第二个job再根据预处理的结果按照group by key 分布到reduce中,这个过程保证相同的group by key可以分布到同一个reduce,完成最终的聚合操作。
代码:

set hive.groupby.skewindata = true;
select deptno from emp group by deptno;

笛卡尔积

尽量避免笛卡尔积,join的时候不加on条件或者无效的on条件,hive只能用1个reducer处理笛卡尔积。

行列过滤

列处理:只拿需要的列,尽量分区使用,少用select *
行处理:先子查询,再关联表。避免全表关联。。

 select b.id from bigtable b
join (select id from bigtable where id <= 10 ) o on b.id = o.id;

合理设置map和reduce数

hive的作业通常会通过input的目录产生一个或者多个map任务
所以主要的决定因素有:input的文件总个数,input的文件大小,集群设置块block的大小。

是不是map数越多就越好

答案是否定的,一个任务如果有很多小文件(远远小于block大小128M),则每个小文件都会被当成一个block,用一个mapper进行处理,而一个map的启动和初始化的时间远远大于逻辑处理时间,就会造成很大的资源浪费,而且执行的mapper数是受限的。

是不是每个mapper 处理接近128M的文件块,就可以高枕无忧了。

不一定,如果一个文件大小接近128M,但是这个文件只有一两个字段,有几千万行的数据。正常用一个map去完成,如果map处理的逻辑复杂,用一个map处理也会耗时很多、

合理设置mapper数

复杂文件增加mapper数

input的文件很大时,任务逻辑复杂,map执行效率缓慢时候,可以考虑增加mapper数,使得每个mapper处理的数据量减少,提高任务效率。
增加mapper量方法:

> 根据computeSliteSize(Math.max(minSize,Math.min(maxSize,blocksize)))=blocksize=128M公式,调整maxSize最大值就行,让maxsize最大值小于blocksize就能增加mapper数
> 例如: 
> set mapreduce.input.fileinputformat.split.maxsize=100; select count(*) from emp; 
Hadoop job information for Stage-1: number of mappers: 6; number of reducers:

减少mapper数(小文件合并)

1、在map执行之前将小文件合并,减少mapper数。combineHiceInputFormat具有对小文件合并的功能,hiveinputformat没有该功能。
例如

set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;

2、在mapReduce任务结束后合并小文件的设置:

在map-only任务结束时合并小文件,默认是true
SET hive.merge.mapfiles = true;
在map-reduce任务结束时合并小文件,默认是false
SET hive.merge.mapredfiles = true;
合并文件大小,默认256M
SET hive.merge.size.per.task = 268435456;
当输出文件平均值小于该大小的时候,启动一个独立的map-reduce任务进行合并
SET hive.merge.smallfiles.avgsize = 16777216;

合理设置reducer数

过多的启动和初始化reducer也会耗费很多时间和资源。
且过多的reduce生产,输出的文件也多很多,如果这些输出文件作为下一个任务的输入,也会出现小文件过多的问题。
但是如果reducer数很少,任务逻辑复杂的话,会很耗费时间,所以合理的设置reducer数也很重要。在设置reducer数的时候,要考虑两个原则:处理大数据量利用合适的reducer数 和使得单个reduce任务处理的数据量大小也要合适。
设置reducer量方法:

1、在hadoop的mapred-default.xml文件修改
set mapreduce.job.reduces = 15;
2、
修改每个reducer处理的数据量的大小 ,默认是256M
hive.exec.reducers.bytes.per.reducer=256000000
每个任务最大的reduce数,默认是1009
hive.exec.reducers.max=1009
计算reducer数的公式,这个条件使用的前提是 mapreduce.job.reduces=-1 就是我们没有人为设置reducer数量,让hive自己推断
N=min(参数2,总输入数据量/参数1)

并行执行

hive在一个查询的时候会转化成一个或者多个阶段,可能是map-reduce阶段,抽样阶段,合并阶段,limit阶段或者其他阶段。默认情况下hive一次只会执行一个阶段。
可能存在某个job包含多个阶段,这些阶段并非相互依赖,就是说有些阶段是可以并行执行的。使得整个job的执行时间缩短。
设置:

hive.exec.parallel为true就可以并行执行,但是job并行执行阶段比较多的情况下,集群利用率就会增加,如果没资源,并行也不会并起来。
set hive.exec.parallel=true;              //打开任务并行执行
set hive.exec.parallel.thread.number=16;  //同一个sql允许最大并行度,默认为8。

严格模式

hive可以设置严格模式防止一些危险操作。
1、分区表不使用分区过滤。
hive.strict.checks.no.partition.filter设置为true时,对分区表,如果where语句没有限制分区条件限制范围,是不允许执行语句的。
开启这个通常是因为分区表有很大的数据集,且数据增加迅速,如果不限制那么查询消耗的资源非常多。
2、使用order by没有limit过滤
将hive.strict.checks.orderby.no.limit设置为true时,对使用了order by 语句的查询,要求必须使用limit。
order by操作为了执行排序会将所有结果发送到同一个reducer中处理,要求增加limit反正reducer执行过程消耗时间过长。
3、笛卡尔积
将hive.strict.checks.cartesian.product设置为true时,会限制笛卡尔积的查询。
笛卡尔积出现的时候,如果表大,这个查询会出现不可控的情况。

影响效率的语句(SQL优化)

union

尽量不要使用union(union会去重复),先union all 再group by 去重

Count(distinct) 去重统计

如果是大数据量的话,count(distinct) 会触发一个reduce task完成count(distinct),这一个reduce处理的数据量太大了,导致整个job很难完成。
一般count(distinct)使用group by 再count()的方式替换。但是要注意group by 的数据倾斜。(会多起一个job,但是在大数据量的时候,是比原来快)
select count(id) from (select id from bigtable group by id) a;

用in 代替join

如果要用另一张表的字段限制某张表的字段,用in比用join 快

select id,name
from table1 a 
join table2 b 
on a.id = b.id ;

用in 代替

select id, name
from table1 a 
where id in (select id from table2 b)

left semi join 用来代替in和exists更高效的方法

select a.id,a.name
from table1 a
left semi join table2 b 
on a.id = b.id 

限制:
1、left semi join 右边的表只能在on处设置过滤条件,在where 和select 都不能过滤
2、left semi join 只是传递表的join key给map阶段,所以left semi join 结果只能出现左表。
3、left semi join 右表遇到重复记录时候,会选择跳过,性能会更高。

jvm重用

让jvm在一个map中重新使用多次

数据倾斜

数据倾斜原因:1、task中需要处理大量相同key 的数据,大量相同key分发给1个reduce,出现倾斜。2、任务读取不可分割的大文件,只能被1以map读取,出现map阶段的倾斜。

key类型

空值引发数据倾斜

实际业务中有大量的空值或无意义的数据参与计算中,表中大量的null值,如果表join操作,必定会引发shuffle产生,使得大量空值在分配给1个reduce中。
注意:虽然一个空值一个不为空,join不上,但是shuffle阶段的hash操作时,key的hash结果都是一样的,都会被分发1个reduce中。
解决方案:
1、不让nul值参与计算,然后再把空值union回

SELECT *
FROM log a
 JOIN users b
 ON a.user_id IS NOT NULL
  AND a.user_id = b.user_id
UNION ALL
SELECT *
FROM log a
WHERE a.user_id IS NULL;

2、给空值null赋值随机值,这样hash结果不一样,就会进入到不同的reduce中。

SELECT *
FROM log a
 LEFT JOIN users b ON CASE 
   WHEN a.user_id IS NULL THEN concat('hive_', rand())
   ELSE a.user_id
  END = b.user_id;

不同数据类型引发数据倾斜

两个表join,表a的key为int,表b的key是string,当按key去join的时候,默认的hash操作会按int类型的id进行分配,然后string类型的id都会被分配到同一个id,从而进入到同一个redce数据倾斜、
解决方案:将string类型转换为int类型

join 业务key倾斜

如果业务数据本身存在热点key,即高频访问key这样的特性,key本身就分布不均匀,那么mr join操作的时候必然会引起数据倾斜。

解决方案:
默认情况下,map阶段的同一key值数据发送到同一个reducer中,当一个key数据过大时就出现数据倾斜问题。
但是并不是所有的聚合操作都必须在reducer端处理完成,很多聚合操作可以在map端进行部份聚合,然后在reduecr得出最后结果。

设置:
set hive.map.aggr = true; 是否在map端聚合
set hive.groupby.mappper.在map端聚合的数据条数
set hive.groupby.skewindata =true;有数据倾斜的时候开启负载均衡。

负载均衡group by 流程:
当选项为true的时候,生成的查询计划会有两个MR job。第一个job中,Map的输出结果会随机分配到reduce中,每个reduce做部分聚合,并输出结果。
这样的好处是相同的group by key 极有可能分发到不同的reduce中。达到负载均衡的目的。
第二个job再根据预处理的结果按照group by key 分布到reduce中,这个过程保证相同的group by key可以分布到同一个reduce,完成最终的聚合操作。
代码:

set hive.groupby.skewindata = true;
select deptno from emp group by deptno;

不可切分文件引发数据倾斜

像GZIP压缩的文件是不可被切分的,只会被一个map读取
解决方案:将GZIP等不可切分的压缩文件转换为可切分的bzip2或者zip等可切分算法。

小文件合并

产生小文件方式:

1、直接插入,insert into table value ,一次插入一个文件
2、load插入数据,load一个文件,hive就一个文件,load一个文件夹,hive文件就是文件夹里面的文件
3、基于查询加载数据insert overwrite table xx select xx :导入数据的时候会启动MR程序,有多少个reduce就有多少个文件
文件数计算公式:
非分区表:文件数=reduce数
分区表:文件数=reduce数*分区数

小文件过多影响

1、小文件过多会导致 存储小文件的hdfs中的namanode元数据特别大,占用太多内存,影响hdfs性能。
2、对于hive说,查询会被转换为MR程序计算,一个小文件都会被当初一个块然后启动一个MR,每启动一个MR任务的启动时间和初始化时间都远远大于逻辑处理时间,会造成大量资源浪费。

解决小文件过多

1、使用hive自带的concate命令,合并小文件

对于非分区表
alter table A concatenate;
对于分区表
alter table B partition(day=20201224) concatenate;
注意:concatenate 命令只支持 RCFILE 和 ORC 文件类型。 

2、调整参数 ,参考合理设置map和reduce数
3、使用hadoop的archive将小文件归档
Hadoop Archive简称HAR,是一个高效地将小文件放入HDFS块中的文件存档工具,它能将小文件打包成一个HAR文件,减少namenode的内存占用的同时,仍然允许文件的访问。

用来控制归档是否可用
set hive.archive.enabled=true;
通知Hive在创建归档时是否可以设置父目录
set hive.archive.har.parentdir.settable=true;
控制需要归档文件的大小
set har.partfile.size=1099511627776;
使用以下命令进行归档
ALTER TABLE A ARCHIVE PARTITION(dt='2020-12-24', hr='12');
对已归档的分区恢复为原文件
ALTER TABLE A UNARCHIVE PARTITION(dt='2020-12-24', hr='12');

如果是新集群

如果是新集群,没有遗留文件,建议hive用orc文件格式,启用izo压缩,这样小文件过多可以使用hive自带命令concatenate快速合并。

问做过hive优化

答:主要是一些数据倾斜优化、小文件处理优化和语句优化以及一些设置
数据倾斜的优化:包括key倾斜和读取不可切分文件倾斜,其中key倾斜包括空值倾斜和key数据类型倾斜和业务key倾斜,
小文件合并优化
sql语句优化像union all+group by 代替union,group by +count 代替count(distinct),in 代替join,left semi join 代替 in existi,以及行列过滤避免使用select *,
以及一些参数设置,像开启严格模式(禁止分区表全表查询,order by 不加limit,笛卡尔积),或者并行执行(非依赖阶段可以并行执行),本地模式(hive通过本地模式单机上处理所有的任务),fetch抓取(在全局查找、字段查找、limit查找都不走mapreduce)等

以上是关于HIVE优化和数据倾斜合并小文件的主要内容,如果未能解决你的问题,请参考以下文章

Hive 优化笔记

Hive 优化笔记

Spark SQL优化之路——Hive篇

hive优化之小文件合并

Hadoop优化

HIVE优化场景七--数据倾斜--group by 倾斜