Hive Join 分析和优化

Posted 数据与算法联盟

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Hive Join 分析和优化相关的知识,希望对你有一定的参考价值。


来自实战

1 ·.    背景

Sku对应品牌进行关联,大表对应非大表(这里的非大表并不能用小表来定义)


2 ·.     问题分析

进行表左关联时,最后一个reduce任务卡到99%,运行时间很长,发生了严重的数据倾斜。

什么是数据倾斜?数据倾斜主要表现在,map /reduce程序执行时,reduce节点大部分执行完毕,但是有一个或者几个reduce节点运行很慢,导致整个程序的处理时间很长,这是因为某一个key的条数比其他key多很多(有时是百倍或者千倍之多),这条key所在的reduce节点所处理的数据量比其他节点就大很多,从而导致某几个节点迟迟运行不完。


3 ·.  Hive join

MapReduce流程图

Hive Join 分析和优化

Join流程

HQL:SELECT a.id, a.dept, b.age FROM a join b ON a.id = b.id

Hive Join 分析和优化

Map阶段

1:读取源表的数据,Map输出时候以Join on条件中的列为key,如果Join有多个关联键,则以这些关联键的组合作为key; 

2:Map输出的value为join之后所关心的(select或者where中需要用到的)列;同时在value中还会包含表的Tag信息,用于标明此value对应哪个表; 

3:按照key进行排序;

4:map的个数取决于数据量和hdfs的分片大小,比如说hdfs分片配置的是128M(也可能是256M),然后数据目录下有一个文件为300M,那么对于的map个数为3个;如果目录下有三个文件,分别为20M,40M,200M,则对应的map个数为4个。

5:Map个数是不是越多越好?答案是否定的。如果一个任务有很多小文件(远远小于块大小128m),则每个小文件也会被当做一个块,用一个map任务来完成,而一个map任务启动和初始化的时间远远大于逻辑处理的时间,就会造成很大的资源浪费。而且,同时可执行的map数是受限的。

6:是不是保证每个map处理接近128m的文件块,就可以了? 答案也是不一定。比如有一个127m的文件,正常会用一个map去完成,但这个文件只有一个或者两个小字段,却有几千万的记录,如果map处理的逻辑比较复杂,用一个map任务去做,肯定也比较耗时。


Shuffle阶段

根据key的值进行hash,并将key/value按照hash值推送至不同的reduce中,这样确保两个表中相同的key位于同一个reduce中


Reduce阶段

1:根据key的值完成join操作,期间通过Tag来识别不同表中的数据

2:reduce的个数在不进行指定的情况下,hive会猜测一个reduce个数,基于以下两个设定:

Hive Join 分析和优化

计算reducer数的公式很简单N=min(参数2,总输入数据量/参数1) 

即,如果reduce的输入(map的输出)总大小不超过1G,那么只会有一个reduce任务

3:reduce个数设置

1) 调整hive.exec.reducers.bytes.per.reducer参数的值; 

set hive.exec.reducers.bytes.per.reducer=500000000; (500M) 

2) set mapred.reduce.tasks = 15; 

4:reduce个数是不是越多越好?同map一样,启动和初始化reduce也会消耗时间和资源; 另外,有多少个reduce,就会有多少个输出文件,如果生成了很多个小文件,那么如果这些小文件作为下一个任务的输入,则也会出现小文件过多的问题;


Hive Join类型

参考:http://lxw1234.com/archives/2015/06/315.htm


4 ·.     优化

1:hive启动的set设置

set hive.exec.compress.output = true; 
-- map/reduce 输出压缩(一般采用序列化文件存储)
set mapred.output.compression.codec = com.hadoop.compression.lzo.LzopCodec; -- 压缩文件格式为 lzo

set hive.map.aggr = true; -- map端是否聚合

set hive.merge.mapfiles = true; -- map输出是否合并

set hive.merge.mapredfiles = true; -- reduce输出是否合并

set hive.merge.size.per.task = 256000000; -- 设置的合并文件的大小

set hive.merge.smallfiles.avgsize = 134000000; -- 当输出文件的平均大小小于该值时,启动一个独立的map-reduce任务进行文件merge

set hive.groupby.mapaggr.checkinterval = 100000;
--这个是group的键对应的记录条数超过这个值则会进行分拆,值根据具体数据量设置

set hive.groupby.skewindata = true;
-- 数据倾斜优化,true,查询计划生产两个mapreduce,第一个mr随机处理,第二个按照业务主键聚合

set hive.optimize.skewjoin = true;
-- 如果是join 过程出现倾斜 应该设置为true

set hive.skewjoin.key = 100000;
--这个是join的键对应的记录条数超过这个值则会进行分拆,值根据具体数据量设置

set hive.exec.parallel = true; -- 同一个sql中的不同的job是否可以同时运行

set mapred.reduce.tasks = 2000设置reduce 个数

set mapred.max.split.size = 100000000; -- 一个split 最大的大小

set mapred.min.split.size.per.node = 100000000; -- 一个节点上(datanodesplit至少的大小

set mapred.min.split.size.per.rack = 100000000; -- 同一个交换机(rack locality)split至少的大小

set hive.input.format = org.apache.hadoop.hive.ql.io.CombineHiveInputFormat; -- 执行Map前进行小文件合并

set mapred.queue.names = xxxxxxxx -- 设置队列


2:HQL本身注意事项

1):大小表 内关联时,小表放前,加载到内存中

2):两表join的key,做空值、不需要的值,不合格的值过滤,然后用hash 或者随机数 结合case when then else进行处理

eg:需要关联的key 为 brand_id,然后表中大部分brand_id ='0',这是时候在关联时就要进行过滤,但仅仅过滤的话,这些为0的brand_id就会分配到一个reduce中,所以还要加个rand函数

on

    case

    when online.brand_code != '0'

    then online.brand_code

    else cast(ceiling(rand() * - 65535) as string)

    end = brand.brand_idcase

3):如果关联的某个key是必须的,但是下边又有很多条数据记录,然后导致数据倾斜

hash即可,比如说我要以三级品类和品牌为key进行join,但某个三级品类对于的品牌下的sku个数特别多,就会发生数据倾斜。


5 ·.     参考资料

http://shiyanjun.cn/archives/588.html

https://blog.csdn.net/B11050101/article/details/78754652



◆  ◆  ◆  ◆  

往期精彩回顾



以上是关于Hive Join 分析和优化的主要内容,如果未能解决你的问题,请参考以下文章

Hive Join 优化之 Map Join

hive的join优化

Hive性能优化之计算Job执行优化

hive的join优化

hive优化方式总结

hive join 优化 --小表join大表