Hive性能优化

Posted DataFlow范式

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Hive性能优化相关的知识,希望对你有一定的参考价值。


架构层面优化:

l  分表

l  合理利用中间结果集,避免查过就丢的资源浪费,减低Hadoop的IO负载

l  常用复杂或低效函数尽量不用或拆分成其他实现方式,如count(distinct)

l  合理设计表分区,静态分区和动态分区

l  优化时一定要把握整体,单个作业最优不如整个作业最优。

l  文件存储格式和压缩方式

l  Hadoop本身的优化

l  有些逻辑,使用系统函数可能比较复杂,可能涉及多层嵌套,建议使用自定义函数实现。

 

架构层面优化,我这里不做过多介绍了,写HQL时要时常考虑按照map-reduce执行方式来写,平时多注意一下,很多问题都可以避免的。下面的介绍的优化中,或多或少对架构层面的优化都有涉及。

参数层面优化手段

合理控制map和reduce数

合并小文件

避免数据倾斜,解决数据倾斜问题

减少job数(合并Job,大Job拆分)

Hive Job优化

合理控制mappers和reducers数

Mappers数

Mappers过多情况下:

l  Map阶段输出文件太小,产生大量小文件

l  初始化和创建Mappers进程的开销很大

 

Mappers太少情况下:

l  文件处理或查询并发度小,Job执行时间过长

l  大量作业时,容易堵塞集群

 

通常情况下,Job会通过输入文件产生一个或多个mapper数,

主要的决定因素有两个:输入的文件数,输入的文件大小。

 

举例:

a)      假设输入只有1个文件a,大小为780M,那么hadoop会将该文件a分隔成7个block(6个128M的block和1个12M的block,dfs.block.size是128M),从而产生7个mappers数。

b)      假设输入有3个文件a、b、c,大小分别为10m,20m,130m,那么hadoop会将其分隔为4个block(10m,20m,128m,2m),从而产生4个mappers数。

注释:以上两种情况均不考虑文件合并的情况。

 

两种方式控制Mapper数:即减少mapper数和增加mapper

l  减少mapper数可以通过合并小文件来实现,这点是对文件源处理。

l  增加mapper数可以通过控制上一个job的reducer数来控制(比如:一个sql中多表join会产生多个Map-Reduce任务)。

比如增大mapred.reduce.tasks数值。

 

下面介绍map端的几个控制参数:

l  set mapred.map.tasks=10;

此参数直接设置,有时并不生效,其实它是hadoop的参考数值。

 

下面我说一下直接设置不生效的原因:

默认mapper个数计算为:

# total_size为输入文件总大小,dfs_block_size为HDFS设置的数据块大小(一般为128MB)

default_mapper_num=total_size/dfs_block_size;

 

我们通过参数直接设置的期望mapper个数为:

# setmapred.map.tasks=10;

#这个参数设置只有在大于default_mapper_num的时候,才会生效

goal_mapper_num=mapred.map.tasks;

 

下面我们来计算一下,经过map端split处理的文件大小和个数:

#mapred.min.split.size(数据的最小分割单元大小)

#mapred.min.split.size 设置每个task处理的文件大小,只有在大于dfs_block_size值时才会生效

split_size=max(mapred.min.split.size,dfs_block_size);

split_num=total_size/split_size;

 

最终计算的mapper个数:

compute_mapper_num=min(split_num,max(default_mapper_num,goal_mapper_num))

        

         总结:

         其实根据我自己的实践,调整mapper数之前,我们一定要确定处理的文件大概大小以及文件的存在形式(很多小文件,还是单个大文件以及其他形式),然后合理地调整mapred.min.split.sizemapred.max.split.size的值。

比如,如果想减少mapper个数,则需要增大mapred.min.split.size的值(因为dfs_block_size一般不变)

 

         示例:

情况1:输入文件很大,但不是小文件组成的

增大mapred.min.split.size的值。

 

情况2:输入文件数量很多,且都是小文件,同时每个文件都小于dfs_block_size。

这种情况下通过增大mapred.min.split.size不可行。

原因:增大mapred.min.split.size会造成小文件在网络上来回传输,造成网络负载很大。

 

解决办法:需要设置下面参数,使用合并小文件方法,将多个输入文件合并后送给mapper处理,从而减少mapper的数量。

set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;

         (下个小章节会介绍小文件合并的优化)

 

l  Map端的聚合,减少Reduce处理负担:

sethive.map.aggr=true;

 

l  推测执行:

set mapred.map.tasks.speculative.execution=true;

(reduce端也有类似的参数:mapred.reduce.tasks.speculative.execution)

所谓的推测执行,就是当所有的task都开始运行之后,Job Tracker会统计所有任务的平均进度,如果某个task所在的task node节点配置比较低或者CPU负载很大,导致任务执行比总体任务的平均执行要慢,此时Job Tracker就会在其他节点启动一个新的任务(duplicatetask),原有任务和新任务哪个先执行完就把其他节点的另外一个任务kill掉。这也是我们经常在Job Tracker页面看到,虽然任务执行成功了,但是发现一些任务被kill掉了,就是这个原因。

 

reduce数

l  Reducers数过多的情况:

生成了很多个小文件(最终输出文件由reducer决定,一个reducer输出一个文件),那么如果这些小文件作为下一个Job输入,则会出现小文件过多需要进行合并的问题。而且启动和初始化reducer需要耗费时间和资源。

 

l  Reducers数过少:

执行耗时,并且可能出现数据倾斜

l  Reducer个数的决定:

           默认情况下,Hive分配reducer个数由下列参数决定:

                    参数1:hive.exec.reducers.bytes.per.reducer(默认为1G)

                    参数2:hive.exec.reducers.max(默认为999)

         计算reducer数的公式:

           N=min(参数2,总输入数据量/参数1)

           即默认一个reduce处理1G数据量。

注意:与mapred.map.tasks参数不同,如果设置了setmapred.reduce.tasks参数的数值,忽略上述计算,reducer个数可以由mapred.reduce.tasks直接指定。

 

l  以下情况只有一个reducer:

某些情况下我们会发现任务中不管数据量多大,不管怎么调整reducer相关的的参数,任务中一直都只有一个reducer任务:

1、  除了数据量小于hive.exec.reducers.bytes.per.reducer参数值的情况外

2、  用了group by的汇总

3、  用了order by

 

l  Reduce数决定中间或落地文件数,文件大小和HDFS的block大小无关。

 

l  使用场景描述:

当某个任务有多个Job时,其中某个Job的结果被后面Job多次引用时,设大该参数,以便增加后面访问的Mapper数。

比如,如果一个Job的输出被另外多个Job调用,假如最前面的Job只生成1G的一个文件,那么后面Job也只会有一个Map来处理,效率明显低很多。

 

l  推测执行

setmapred.reduce.tasks.speculative.execution =true;

sethive.mapred.reduce.tasks.speculative.execution =true;

可以看到除了Map-Reduce侧提供推测执行参数,hive侧也提供了推测执行的参数。

合并小文件

Map阶段Hive自动对小文件进行合并

参数控制:

#Map任务结束时就会合并小文件(Map-Only)

set hive.merge.mapfiles=true;

 

#在Map-Reduce的任务结束时合并小文件

set hive.merge.mapredfiles=true;

 

#合并文件的大小(256MB)

set hive.merge.size.per.task=256000000;

 

#每个mapper最大分隔大小(输入大小)

#结合上面块大小(dfs.block.size=128MB),决定拆分几个mapper数

set mapred.max.split.size=256000000;

 

#一个节点上split至少的大小

set mapred.min.split.size.per.node=100000000;

 

#执行Map前进行小文件合并

set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;

在开启了org.apache.hadoop.hive.ql.io.CombineHiveInputFormat之后,一个datanode节点上多个小文件会进行合并,合并文件数由mapred.max.split.size限制的大小决定。

mapred.min.split.size.per.node决定多个datanode上的文件是否需要合并,即多个节点上的文件也可以合并,大小由此决定。

 

l  Job合并输入小文件

set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat

多个小文件由一个map执行。

合并文件数由mapred.max.split.size限制的大小决定。

 

l  Job合并输出小文件

sethive.merge.smallfiles.avgsize=256000000;

当输出文件平均大小小于该值,启动新job用于合并文件。

对于多个job时,前一个job输出很多大小不均匀的数据文件,对后续的job处理会造成数据倾斜的问题。

如果输出文件大小均匀,则后续处理的mapper数比较合理。

sethive.merge.size.per.task=64000000;

合并之后的文件大小。

 

案例(那我们自己的开发环境来测试,我们的环境dfs.block.size为64MB):

环境如下(默认配置):

set dfs.block.size=134217728;

sethive.merge.mapfiles=true;

sethive.merge.mapredfiles=false;

sethive.merge.size.per.task=256000000;

setmapred.max.split.size=256000000;

setmapred.min.split.size.per.node=256000000;

setmapred.min.split.size.per.rack=256000000;

sethive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;

 

我构造的表如下:

hive (annuity_safe)> desc formattedtest_data;

OK

col_name        data_type       comment

# col_name              data_type               comment

 

deptno                  string

polno                   string

certno                  string

brno                    decimal(2,0)

………………………………

set_of_books_id         string

is_return               string

pk_serial               string

op_month                string

 

# Detailed Table Information

Database:               annuity_safe

Owner:                  hduser0103

CreateTime:             Tue Jul 28 11:41:20 CST 2015

LastAccessTime:         UNKNOWN

Protect Mode:           None

Retention:              0

Location:               hdfs://dev-l002781.app.paic.com.cn:9000/user/hive/warehouse/annuity_safe.db/test_data

Table Type:             MANAGED_TABLE

Table Parameters:

       COLUMN_STATS_ACCURATE   true

       numFiles                5

       numRows                 0

       rawDataSize             0

       totalSize               279752108

       transient_lastDdlTime   1438055637

 

# Storage Information

SerDe Library:         org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe

InputFormat:           org.apache.hadoop.mapred.TextInputFormat

OutputFormat:          org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat

Compressed:             No

Num Buckets:            -1

Bucket Columns:         []

Sort Columns:           []

Storage Desc Params:

       serialization.format    1

Time taken: 0.083 seconds, Fetched: 95row(s)

 

构造的表对应的文件路径下有5个文件,文件大小除了xae文件为11MB,其他都大概为64MB。

hive (annuity_safe)> ! hadoop fs -lsrhdfs://dev-l002781.app.paic.com.cn:9000/user/hive/warehouse/annuity_safe.db/test_data;

-rw-r-----   3hduser0103 hduser0103   671088652015-07-28 11:53 /user/hive/warehouse/annuity_safe.db/test_data/xaa

-rw-r-----   3hduser0103 hduser0103   671088642015-07-28 11:53 /user/hive/warehouse/annuity_safe.db/test_data/xab

-rw-r-----   3hduser0103 hduser0103   671088642015-07-28 11:53 /user/hive/warehouse/annuity_safe.db/test_data/xac

-rw-r-----   3hduser0103 hduser0103   671088642015-07-28 11:53 /user/hive/warehouse/annuity_safe.db/test_data/xad

-rw-r-----   3hduser0103 hduser0103   113166512015-07-28 11:53 /user/hive/warehouse/annuity_safe.db/test_data/xae

 

执行查询(未优化参数情况下):

hive (annuity_safe)> selectdeptno,count(1),min(fcd),max(fcd) from test_data group by deptno;

Total jobs = 1

Launching Job 1 out of 1

Number of reduce tasks not specified.Estimated from input data size: 1

In order to change the average load for areducer (in bytes):

  sethive.exec.reducers.bytes.per.reducer=<number>

In order to limit the maximum number ofreducers:

  sethive.exec.reducers.max=<number>

In order to set a constant number ofreducers:

  setmapred.reduce.tasks=<number>

Starting Job = job_201507191627_38369,Tracking URL =http://dev-l002781.app.paic.com.cn:50030/jobdetails.jsp?user.name=hadoop&jobid=job_201507191627_38369

Kill Command =/appcom/HadoopInstall/hadoop-1.2.1/libexec/../bin/hadoop job  -kill job_201507191627_38369

Hadoop job information for Stage-1: number of mappers:4; number of reducers: 1

2015-07-28 13:44:01,643 Stage-1 map =0%,  reduce = 0%

2015-07-28 13:44:13,786 Stage-1 map =50%,  reduce = 0%, Cumulative CPU 12.15sec

2015-07-28 13:44:14,800 Stage-1 map = 75%,  reduce = 0%, Cumulative CPU 13.89 sec

2015-07-28 13:44:37,214 Stage-1 map =100%,  reduce = 0%, Cumulative CPU 18.61sec

2015-07-28 13:44:38,228 Stage-1 map =100%,  reduce = 33%, Cumulative CPU 18.61sec

2015-07-28 13:44:40,254 Stage-1 map =100%,  reduce = 100%, Cumulative CPU22.29 sec

MapReduce Total cumulative CPU time: 22seconds 290 msec

Ended Job = job_201507191627_38369

MapReduce Jobs Launched:

Job 0: Map: 4  Reduce: 1  Cumulative CPU: 22.29 sec   HDFSRead: 279753293 HDFS Write: 6927 SUCCESS

Total MapReduce CPU Time Spent: 22 seconds290 msec

OK

deptno _c1     _c2     _c3

-08 01:20:15.0  1      NULL    NULL

0000   1       NULL    NULL

01000592864     1      G000000000      G000000000

71679  1       G000000000      G000000000

G01    1       2001-07-19 15:46:42.0   2001-07-19 15:46:42.0

G010103 9       2002-09-17 14:08:45.0   2002-09-17 14:10:44.0

G0123  6161    2006-08-16 14:59:06.0   2010-01-18 15:06:14.0

G014205 51893   2010-10-08 01:05:01.0   2010-10-08 01:23:55.0

G014302 4       2011-09-26 19:12:36.0   2011-09-26 19:12:36.0

G02    32      2000-07-18 09:53:52.0   2002-06-10 13:14:24.0

G020105 10      2000-06-20 14:08:23.0   2000-08-08 10:37:10.0

G020301 5       2000-08-15 10:35:00.0   2000-08-21 11:29:07.0

G020302 2       2000-08-17 09:31:00.0   2000-09-08 10:45:39.0

…………………………………………………

 

可以发现,出现了4个mappers来处理数据文件。

我们查看页面查看4个task的Counters发现,4个task读取的文件字节数为:

67,109,107

201,327,048

11,316,894

244

加起来为268MB左右,但是每个task处理的数据不均匀,其中有一个task处理了约200MB的数据,一个task处理了244字节的数据,便会出现木桶效应。

 

 

案例优化:

1、 环境的参数优化:

set hive.merge.mapfiles=true;

set hive.merge.mapredfiles=true;

sethive.merge.size.per.task=64000000;

set mapred.min.split.size=64000000;

set mapred.max.split.size=64000000;

setmapred.min.split.size.per.node=64000000;

set mapred.min.split.size.per.rack=64000000;

sethive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;

因为合并小文件默认为true,而dfs.block.size与hive.merge.size.per.task的搭配使得合并后的绝大部分文件都在64MB左右。

 

         我们不用执行上面的查询语句,就大概可以分析如下:

对于上面表对应的5个数据文件:4个为64MB,1个为11MB,那么上面的查询,会有5个mapper,其中4个mapper分布处理64MB的数据,其他1个mapper分布处理11MB的数据。

2、  运行上面的HQL,出现的结果与我们上面分析的一致。

 

总结:这里我只是提供了一种优化的思路,其实这里还不是最优,我们可以实际的Hadoop环境,hdfsblock大小以及表对应的数据文件,来调整上面的参数(比如将64MB全部修改为128MB,或许更快一点),最终每个task处理的数据大致相同,均衡IO负载,以达到资源最佳使用。

数据倾斜

1、   什么是数据倾斜?

Hadoop框架的特性决定最怕数据倾斜。

JobTracker和TaskTracker关系相当于老师和学生的关系,JobTracker分发任务给TaskTracker去处理各自的工作,但是不是平均分配的,还得根据TaskTracker本地的数据量多少去做判断。如果每个节点数据分配不均匀,势必造成有的TaskTracker处理的数据量大,有的处理的数据量小。

由于数据分布不均匀,可能会造成数据大量集中到一个节点或极少数个节点,造成数据热点。

 

2、   数据倾斜造成的症状:

map阶段快,reduce阶段非常慢,

          某些mapper很快,某些mapper很慢,

某些reducer很快,某些reducer奇慢。

 

3、   数据倾斜可能在如下场景中出现:

A、 数据在节点上分布不均匀(无法避免)

B、 join时on关键词中个别值量很大(如null值)

C、 count(distinct)在数据量大的情况下,容易数据倾斜,因为count(distinct)是按group by字段分组,按distinct字段排序(有时无法避免)。

其中A无法避免,B见后边介绍的Join优化部分,C语法上有时无法避免。

 

关键词

情形

后果

Join

其中一个表较小,

但是key集中

分发到某一个或几个Reducer上的数据远高于平均值

大表与大表,但是分桶的判断字段0值或空值过多

这些空值都由一个reducer处理,非常慢

group by

group by 维度过小,

某值的数量过多

处理某值的reducer灰常耗时

Count Distinct

某特殊值过多

处理此特殊值的reducer耗时

 

减少job数(合并Job,大Job拆分)

1、   减少Job数

当源表相同时,如下可以合并Job,从而减少job数:

l  Join时,On 字段相同

多表join on条件相同时,合并为一个Map-Reduce。

select pt.page_id,count(t.url) pv

from rpt_page_type pt

join

(select refer_page_id,url_page_id,url from trackinfo where ds = ‘2013-10-11’)t

on pt.page_id = t.url_page_id

join

(select page_id from rpt_page_kpi_new where ds = ‘2013-10-11’)r

on t.url_page_id= r.page_id

group by  pt.page_id;

 

利用这个特性,可以把相同join on条件的放在一个job处理。

 

l  union all

对同一个表的union all只查询一次源表,Hive本身对这种union all做过优化。

selecturl,session_id from

(selecturl,session_id

from trackinfowhere ds=’2013-11-01’

union all

selecturl,session_id

from trackinfowhere ds=’2013-11-02’

)t;

 

 

l  Multi-insert(Multi-group by一定会和Multi-insert一起使用,同一源表,可按照不同where、不同group by进行计算)

条件:源表相同,上方的SQL等同于:

from trackinfo

 insert overwrite table tmp_testpartition(step=1)

select url,session_id where ds=’2013-11-01’

insert overwrite table tmp_test partition(step=2)

selecturl,session_id where ds=’2013-11-02’;

 

Hive Job优化

l  并行化执行

每个查询被hive转化成多个阶段,有些阶段关联性不大,则可以并行化执行,减少执行时间。

sethive.exec.parallel=true;

sethive.exec.parallel.thread.number=15;

 

实例:

select num

from

(selectcount(city) as num

from city

union all

selectcount(province) as num

fromprovince)tmp;

 

union all两侧的查询语句会同时执行。

 

l  本地化执行(感觉生产环境用处不大)

sethive.exec.mode.local.auto=true;

当一个job满足如下条件才能真正使用本地模式:

a、   job的输入数据大小必须小于参数:

hive.exec.mode.local.auto.inputbytes.max(默认128MB)

 

b、   job的map数必须小于参数:

hive.exec.mode.local.auto.tasks.max(默认为4)

 

c、  jobreducer数必须为0或者1

 

如果你的环境不满足上述条件时,执行过程会提示原因,如Input size大于hive.exec.mode.local.auto.inputbytes.max值,或input files个数大于hive.exec.mode.local.auto.tasks.max值,同时会取消本地化执行,改为其他方式执行。

 

l  JVM重利用

set mapred.job.reuse.jvm.num.tasks=15;

JVM重利用可以是Job长时间保留slot,直到作业结束,这对于有较多任务和较多小文件的任务是非常有意义的,因为减少了JVM的启动和初始化时间,从而减少执行时间。当然这个值不能设置过大,因为有些作业会有reduce任务,如果reduce任务没有完成,则map任务占用的slot不能释放,其他作业可能就需要等待。

 

l  Hive压缩数据

中间压缩就是处理hive查询的多个job之间的数据,对于中间压缩,最好选择一个节省CPU耗时的压缩方式(即压缩率比较适中)。

set hive.exec.compress.intermediate=true;

set hive.intermediate.compression.codec=

org.apache.hadoop.io.compress.SnappyCodec;

sethive.intermediate.compression.type=BLOCK;

 

Hive查询最终的输出结果文件采用压缩(落地文件的压缩率可以选择较高的压缩率)

set hive.exec.compress.output=true;

set mapred.output.compression.codec=

org.apache.hadoop.io.compress.GzipCodec;

setmapred.output.compression.type=BLOCK;

语法(包含参数)层面优化

Join

Mapjoin

Bucket join

Group by

Count(distinct)

笛卡尔积

提前裁剪数据,避免资源浪费

Hive表的优化

Join优化

l  数据按照join的key进行分发,而在join左边的表的数据会首先部分或全部读入内存,如果左边表的key相对分散(单个key值数据量小,或者说相同key的数据量小),读入内存的数据会比较小,join任务执行会比较快,而如果左边的表key比较集中,而这张表的数据量又很大,那么数据倾斜就会比较严重。

            Map阶段同一key数据会分发给同一个reducer计算。

 

l  join原则:

1)       小表join大表

在join操作的Reduce阶段(不是map阶段),位于join左边的表的内容会被加载进内存,将条目少的表放在左边,可以有效减少发生内存进出的几率。

                解决办法:

多个表关联时,最好分拆成几个小段,避免大sql(无法控制中间job)。

 

2)       大表join大表

大表关联中,如果join的key中含有大量null,在使用key进行hash分发时,会将数据文件中key为空的数据都分到一个节点,造成了数据倾斜。

解决办法:

把空值的key变成一个字符串加上随机数,把倾斜的数据分发到不同的reduce上,由于null值关联不上,处理后并不影响最终结果。

 

l  Join中对join key存在大量空值的优化演示:

end_user_id中存在大量null值。

原始HQL:

select u.id, t.url, t.track_time

from end_user u

join

(select end_user_id,url,track_time from trackinfo where ds= ‘2013-12-01‘)t

on u.id = t.end_user_id limit 2;

 

优化后HQL为:

select u.id, t.url, t.track_time

from end_user u

join

(select case when end_user_id = ‘null‘ or end_user_id is null

                            then cast(concat(‘00000000’,floor(rand()*1000000))as bigint)

                            else end_user_id endend_user_id,

                            url,track_time

from trackinfo where ds= ‘2013-12-01‘) t

on u.id = t.end_user_id limit 2;

 

Join对数据倾斜的参数优化:

set hive.optimize.skewjoin=true;

                   如果在join过程中出现倾斜,参数值应该设置为true。

                  

set hive.skewjoin.key=1000000;

                   这个是join的键(key)对应的记录条数超过这个值则会进行join自动优化。

                   上面两个参数设置后的优化原理是:

                            没优化之前,join会启动一个job,但是设置优化参数后,会启动两个job。

第一个job会将键(key)超过hive.skewjoin.key记录的键加上一些随机数等,将这些相同的key打乱,然后跑到不同的节点上面进行计算(reduce阶段)。然后再启动一个job,在第一个job处理的基础上(即第一个job的reduce输出结果)再进行计算,将相同的key分发到相同的节点上处理。

 

l  Join时的关联键key的数据类型一定要相同,否则会产生数据倾斜问题

由于test_a表中的id为字符串型,所以我们将test_b表数字类型转换成字符串类型

select a.* fromtest_a a

left outer jointest_b b

On a.id =cast(b.id as string);

Mapjoin

Mapjoin(map端执行join操作):

mapjoin的计算原理:

mapjoin会把小表全部读入内存中,在map阶段直接拿另外一个表的数据和内存中表数据做匹配。

 

l  Join 操作在Map阶段完成,如果需要的数据在Map的过程中可以处理掉,则不再需要Reduce阶段,加快了执行效率。

小表关联一个超大表时,容易发生数据倾斜,可以用Mapjoin把小表全部加载到内存,并在map端进行join操作,避免reducer处理。

如:

insert overwrite table page_pv

select /*+ MapJoin(pt)*/

                   pt.page_id,count(t.url) pv

            from rpt_page_type pt

join

(select url_page_id, url from trackinfo where ds = ‘2013-10-11‘) t

on pt.page_id = t.url_page_id;

 

l  mapjoin的使用场景:

1)      关联操作中有一张表非常小

2)      不等值的连接操作

 

l  Mapjoin两种使用方式:

1)      通过参数设置,Hive自动选择执行Mapjoin操作

hive.auto.convert.join=true;

                      hive.mapjoin.smalltable.filesize=25000000;-------默认为25MB

原理:将小于hive.mapjoin.smalltable.filesize数值的表加载到分布式缓存中,这样整个集群节点上map端任务都可以访问缓存中的数据。

                           

2)      另外一种方式,可以不设置参数,通过hint方式指定:

select /*+ mapjoin(test_b) */a.key,a.value fromtest_a a join test_b b on a.key = b.key;

 

l  Mapjoin其他参数设置

set hive.mapjoin.cache.numrows=25000;

说明:mapjoin存在内存里的数据量。

 

set hive.mapjoin.followby.gby.localtask.max.memory.usage=0.55;

说明:map join做group by操作时,可以使用多大的内存来存储数据,如果数据太大,则不会保存在内存里。

         

set hive.mapjoin.localtask.max.memory.usage=0.90;

说明:本地任务可以使用内存的百分比

 

bucket join

l  使用bucket join需要满足下面两个条件

(1)      两个表以相同方式(key)划分桶

(2)      两个表的桶个数是倍数关系

create table order(cidint,price decimal(18,2)) clustered by (cid) into 32 buckets;

create tablecustomer(id int,first string,last string) clustered by (id) into 32(or 64……)buckets;

select pricefrom order o join customer c on o.cid = c.id;

说明:

查询语法与普通表一样,但是底层执行却不一样。根据key只会查找对应的桶即可,比如:如果cid=1,那么只会从customer中查找id=1的数据,这些数据都位于一个桶中,所以只需访问一个桶即可。

 

group by

l  Map端部分聚合:

                   并不是所有的聚合操作都需要在Reduce端完成,很多聚合操作都可以先在Map端进行部分聚合,最后在Reduce端得出最终结果。

 

l  Map端部分聚合参数:

    #是否在map端进行聚合

                   hive.map.aggr=true;

 

l  有数据倾斜的时候进行负载均衡

#如果group by过程中出现倾斜,应该设置为true

    hive.groupby.skewindata=true;

                  

#在map端进行聚合操作的条目数目

                   #这个是group by的键对应的记录条数超过这个值则会进行优化

                   hive.groupby.mapaggr.checkinterval=100000;

 

和mapjoin类似,group by优化后也会启动两个Job。

当选项设为true时,生成的查询计划会有两个MR job,第一个MR job中,Map的输出结果会随机分布到Reduce中,每个Reduce做部分聚合操作,并输出结果,这样处理的结果是相同的Group by key有可能被分发到不同的Reduce中,从而达到负载均衡的目的。

第二个MR job再根据预处理的数据结果按照group bykey分布到Reduce中(这个过程可以保证相同的group by 被分发到同一个Reduce中),最后完成最终的聚合操作。

 

Count(distinct)

l  当该字段存在大量值为null或空的记录时容易造成倾斜。

解决思路:

1)  count(distinct)时,将值为空的数据在where里过滤掉,在最后结果中加1。

2)  如果还有其他计算,需要进行group by,可以先将值为空的记录单独处理,再和其他计算结果进行union

3)  如果group by维度过小,则可以  采用count和group by的方式来替换count(distinct)完成计算

l  特殊情况特殊处理:

在业务逻辑优化效果不大情况下,有些时候是可以将倾斜的数据单独拿出来处理,最后union回去。

 

countdistinct优化:

实例1

         优化前:

         selectcount(distinct id) from student;

         只有一个job任务,而且只有一个reduce,处理的工作量比较大。

        

优化后:

         selectcount(1) from (select distinct id from student) tmp;

         或

         selectcount(1) from (select id from student group by id) tmp;

          可以通过设置set mapred.reduce.tasks的值,加快(select distinct id from student) tmp部分的处理。

 

实例2

优化前:

selecta,sum(b),count(distinct c),count(distinct d)

from test

group by a;

优化后:

select a,sum(b)as b,count(c) as c,count(d) as d

from (

select a,0 as b,c,null as d from test group by a,c

union all

select a,0 as b,null as c,d from test group by a,d

union all

select a,b,null as c,null as d from test

)tmp group bya;

 

笛卡尔积

尽量避免笛卡尔积,join的时候不加on条件,或者无效的on条件,Hive只能使用1个reducer来完成笛卡尔积。

 

之前我有遇到过一种情况,不得不使用笛卡尔积,表关联的条件为不等式,还好两张表不大。如果你不得不使用笛卡尔积,那么一定要看一下其中的表是否符合Mapjoin的要求,如果符合,那么一定要使用Mapjoin。

提前裁剪数据,避免资源浪费

join优化前:

select o.cid,c.id

from order o

join customer c

on o.cid = c.id

where o.dt = ‘2015-07-26‘;

 

join优化后:

select o.cid,c.id

from

(select cid

from order

where dt = ‘2015-07-26‘

)o

join customer c

on o.cid = c.id;

 

对一些过滤条件,能尽早过滤的就尽早过滤,减少IO资源浪费。

这个需要个人工作中注意就好了。

Hive表的优化

分区:

1)      静态分区

2)      动态分区

set hive.exec.dynamic.partition=true;

set hive.exec.dynamic.partition.mode=nonstrict;

 

分桶

sethive.enforce.bucketing=true;

sethive.enforce.sorting=true;

 

数据

相同数据尽量聚集在一起,和分桶原理类似,尽量减少网络数据传输

 

以上是关于Hive性能优化的主要内容,如果未能解决你的问题,请参考以下文章

Hive架构原理和性能优化

Ceph架构及性能优化

数据库优化从架构优层面化

从代码层面优化系统性能应该怎么做?

零零好车代码层面优化系统开发性能应该怎么做?

阿里P8架构师(花名:霍州)Java程序性能优化“学习日记”