Hive 调优 小总结
Posted 职场spark哥
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Hive 调优 小总结相关的知识,希望对你有一定的参考价值。
hive 优化
以下优化的一些业务场景没有遇到也很难理解,所以可以通过我标记的着重看一下,其他的可以大概的过以下。
1 . Hadoop计算框架特性
数据量大不是问题,数据倾斜是个问题。
jobs数比较多的作业运行效率相对比较低,比如即使有几百行的表,如果多次关联多次汇总,产生十几个jobs,耗时很长。
原因是map reduce作业初始化的时间是比较长的。
sum,count,max,min等UDAF,不怕数据倾斜问题,hadoop在map端的汇总合并优化,使数据倾斜不成问题。
count(distinct ),在数据量大的情况下,效率较低,如果是多count(distinct )效率更低,因为count(distinct)是按group by 字段分组,按distinct字段排序,一般这种分布方式是很倾斜的,比如男uv,女uv,淘宝一天30亿的pv,如果按性别分组,分配2个reduce,每个reduce处理15亿数据。
备注一下 :hive底层就是map reduce 哦
2.优化的常用手段
好的模型设计事半功倍。
解决数据倾斜问题。
减少job数。
2-1 设置合理的map reduce的task数,能有效提升性能。(比如,10w+级别的计算,用160个reduce,那是相当的浪费,1个足够)。
了解数据分布,自己动手解决数据倾斜问题是个不错的选择。set hive.groupby.skewindata=true;这是通用的算法优化,但算法优化有时不能适应特定业务背景,开发人员了解业务,了解数据,可以通过业务逻辑精确有效的解决数据倾斜问题。
备注一下 :这个面试的时候可以说:"凭自己多年的经验发现此:
数据量较大的情况下,慎用count(distinct),count(distinct)容易产生倾斜问题。
2-2 对小文件进行合并,是行至有效的提高调度效率的方法,假如所有的作业设置合理的文件数,对云梯的整体调度效率也会产生积极的正向影响。
优化时把握整体,单个作业最优不如整体最优。
个人通过以上 进行进一步总结:
解决收据倾斜问题
减少job数量,设置合理的
map和reduce个数,对小文件进行合并,优化时把握整体,单个task最优不如整体最优。按照一定规则分区。
3. 全排序
Hive的排序关键字是SORT BY,它有意区别于传统数据库的ORDER BY也是为了强调两者的区别–SORT BY只能在单机范围内排序。
这就涉及到一个全排序的问题。解决的办法无外乎两种:
1.) 不分发数据,使用单个reducer:
set mapred.reduce.tasks=1;
这一方法的缺陷在于reduce端成为了性能瓶颈,而且在数据量大的情况下一般都无法得到结果。但是实践中这仍然是最常用的方法,原因是通常排序的查询是为了得到排名靠前的若干结果,因此可以用limit子句大大减少数据量。使用limit n后,传输到reduce端(单机)的数据记录数就减少到n* (map个数)。
2.) 修改Partitioner,这种方法可以做到全排序。这里可以使用Hadoop自带的TotalOrderPartitioner(来自于Yahoo!的TeraSort项目),这是一个为了支持跨reducer分发有序数据开发的Partitioner,它需要一个SequenceFile格式的文件指定分发的数据区间。区间文件需要考虑的主要问题是数据分发的均衡性,这有赖于对数据深入的理解。
4.怎样做笛卡尔积
当Hive设定为严格模式(hive.mapred.mode=strict)时,不允许在HQL语句中出现笛卡尔积,这实际说明了Hive对笛卡尔积支持较弱。因为找不到Join key,Hive只能使用1个reducer来完成笛卡尔积。
当然也可以用上面说的limit的办法来减少某个表参与join的数据量,但对于需要笛卡尔积语义的需求来说,经常是一个大表和一个小表的Join操作,结果仍然很大(以至于无法用单机处理),这时MapJoin才是最好的解决办法。
MapJoin,顾名思义,会在Map端完成Join操作。这需要将Join操作的一个或多个表完全读入内存。
MapJoin的用法是在查询/子查询的SELECT关键字后面添加/*+ MAPJOIN(tablelist) */提示优化器转化为MapJoin(目前Hive的优化器不能自动优化MapJoin)。其中tablelist可以是一个表,或以逗号连接的表的列表。tablelist中的表将会读入内存,应该将小表写在这里。
PS:有用户说MapJoin在子查询中可能出现未知BUG。在大表和小表做笛卡尔积时,规避笛卡尔积的方法是,给Join添加一个Join key,原理很简单:将小表扩充一列join key,并将小表的条目复制数倍,join key各不相同;将大表扩充一列join key为随机数。
5. 怎样写exist/in子句
Hive不支持where子句中的子查询,SQL常用的exist in子句需要改写。这一改写相对简单。考虑以下SQL查询语句:
SELECT a.key, a.value
FROM a
WHERE a.key in
(SELECT b.key
FROM B);
可以改写为
SELECT a.key, a.value
FROM a LEFT OUTER JOIN b ON (a.key =b.key)
WHERE b.key <> NULL;
一个更高效的实现是利用left semi join改写为:
SELECT a.key, a.val
FROM a LEFT SEMI JOIN b on (a.key =b.key);
left semi join是0.5.0以上版本的特性。
6 怎样决定reducer个数
Hadoop MapReduce程序中,reducer个数的设定极大影响执行效率,这使得Hive怎样决定reducer个数成为一个关键问题。遗憾的是Hive的估计机制很弱,不指定reducer个数的情况下,Hive会猜测确定一个reducer个数,基于以下两个设定:
1. hive.exec.reducers.bytes.per.reducer(默认为1000^3)
2. hive.exec.reducers.max(默认为999)
计算reducer数的公式很简单:
N=min(参数2,总输入数据量/参数1)
通常情况下,有必要手动指定reducer个数。考虑到map阶段的输出数据量通常会比输入有大幅减少,因此即使不设定reducer个数,重设参数2还是必要的。依据Hadoop的经验,可以将参数2设定为0.95*(集群中TaskTracker个数)。
7 合并MapReduce操作
Multi-group by
Multi-group by是Hive的一个非常好的特性,它使得Hive中利用中间结果变得非常方便。例如,
FROM (SELECT a.status, b.school,b.gender
FROM status_updates a JOIN profilesb
ON (a.userid = b.userid and
a.ds=’2009-03-20′ )
) subq1
INSERT OVERWRITE TABLEgender_summary
PARTITION(ds=’2009-03-20′)
SELECT subq1.gender, COUNT(1) GROUPBY subq1.gender
INSERT OVERWRITE TABLEschool_summary
PARTITION(ds=’2009-03-20′)
SELECT subq1.school, COUNT(1) GROUPBY subq1.school
上述查询语句使用了Multi-group by特性连续group by了2次数据,使用不同的groupby key。这一特性可以减少一次MapReduce操作。
Multi-distinct
Multi-distinct是淘宝开发的另一个multi-xxx特性,使用Multi-distinct可以在同一查询/子查询中使用多个distinct,这同样减少了多次MapReduce操作
8 Bucket
与sampling Bucket是指将数据以指定列的值为key进行hash,hash到指定数目的桶中。这样就可以支持高效采样了。
如下例就是以userid这一列为bucket的依据,共设置32个buckets
CREATETABLE page_view(viewTime INT, userid BIGINT,
page_url STRING,referrer_url STRING,
ip STRING COMMENT ‘IPAddress of the User’)
COMMENT ‘This is the page view table’
PARTITIONED BY(dt STRING, country STRING)
CLUSTEREDBY(userid) SORTED BY(viewTime) INTO 32 BUCKETS
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ’1′
COLLECTION ITEMS TERMINATED BY ’2′
MAP KEYS TERMINATED BY ’3′
STORED AS SEQUENCEFILE;[王黎16]
Sampling可以在全体数据上进行采样,这样效率自然就低,它还是要去访问所有数据。而如果一个表已经对某一列制作了bucket,就可以采样所有桶中指定序号的某个桶,这就减少了访问量。
如下例所示就是采样了page_view中32个桶中的第三个桶。
SELECT *FROM page_view
TABLESAMPLE(BUCKET 3 OUT OF 32);
9 Partition
Partition就是分区。分区通过在创建表时启用partition by实现,用来partition的维度并不是实际数据的某一列,具体分区的标志是由插入内容时给定的。当要查询某一分区的内容时可以采用where语句,形似where tablename.partition_key >a来实现。
创建含分区的表
CREATE TABLE page_view(viewTime INT, useridBIGINT,
page_url STRING, referrer_url STRING,
ip STRING COMMENT ‘IPAddress of the User’)
PARTITIONED BY(date STRING, country STRING)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ’1′
STORED AS TEXTFILE;[王黎17]
载入内容,并指定分区标志
LOAD DATALOCAL INPATH /tmp/pv_2008-06-08_us.txt INTO TABLE page_view
PARTITION(date=’2008-06-08′,country=’US’);
查询指定标志的分区内容
SELECTpage_views.*
FROM page_views
WHERE
page_views.date >=’2008-03-01′ AND page_views.date <= ’2008-03-31′ AND page_views.referrer_urllike ‘%’;
10 JOIN
10.1 JOIN原则(这个一定要知道)
在使用写有 Join 操作的查询语句时有一条原则:应该将条目少的表/子查询放在 Join 操作符的左边。原因是在 Join 操作的 Reduce 阶段,位于 Join 操作符左边的表的内容会被加载进内存,将条目少的表放在左边,可以有效减少发生OOM 错误的几率。对于一条语句中有多个 Join 的情况,如果 Join 的条件相同,比如查询:
INSERT OVERWRITE TABLE pv_users SELECT pv.pageid, u.age FROM page_view p
JOIN user u ON (pv.userid = u.userid)
JOIN newuser x ON (u.userid = x.userid);
如果 Join 的 key 相同,不管有多少个表,都会则会合并为一个 Map-Reduce
一个 Map-Reduce 任务,而不是 ‘n’ 个
在做 OUTER JOIN 的时候也是一样 如果 Join 的条件不相同,比如:
INSERT OVERWRITE TABLE pv_users SELECT pv.pageid, u.age FROM page_view p
JOIN user u ON (pv.userid = u.userid)
JOIN newuser x on (u.age = x.age);
Map-Reduce 的任务数目和Join 操作的数目是对应的,上述查询和以下查询是等价的:
INSERT OVERWRITE TABLE tmptable SELECT * FROM page_view p JOIN user u
ON (pv.userid = u.userid);
INSERT OVERWRITE TABLE pv_users
SELECT x.pageid, x.age FROM tmptable x
JOIN newuser y ON (x.age = y.age);
10.2 Map Join
Join 操作在 Map 阶段完成,不再需要Reduce,前提条件是需要的数据在 Map 的过程中可以访问到。比如查询:
INSERT OVERWRITE TABLE pv_users SELECT /*+ MAPJOIN(pv) */ pv.pageid, u.age
FROM page_view pv
JOIN user u ON (pv.userid = u.userid);
可以在 Map 阶段完成 Join,如图所示:
相关的参数为:
hive.join.emit.interval = 1000 How many rows in the right-most join operand Hive should buffer before emitting the join result.
hive.mapjoin.size.key = 10000
hive.mapjoin.cache.numrows = 10000
场景:如日志中,常会有信息丢失的问题,比如全网日志中的user_id,如果取其中的user_id和bmw_users关联,会碰到数据倾斜的问题。
解决方法1: user_id为空的不参与关联
Select * From log a
Join bmw_users b
On a.user_id is not null
And a.user_id = b.user_id
Union all
Select * from log a
where a.user_id is null;
解决方法2 :赋与空值分新的key值
Select *
from log a
left outer join bmw_users b
on case when a.user_id is null thenconcat(‘dp_hive’,rand() ) else a.user_id end = b.user_id;
结论:方法2比方法效率更好,不但io少了,而且作业数也少了。方法1 log读取两次,jobs是2。方法2 job数是1 。这个优化适合无效id(比如-99,’’,null等)产生的倾斜问题。把空值的key变成一个字符串加上随机数,就能把倾斜的数据分到不同的reduce上 ,解决数据倾斜问题。附上Hadoop通用关联的实现方法(关联通过二次排序实现的,关联的列为parition key,关联的列c1和表的tag组成排序的group key,根据parition key分配reduce。同一reduce内根据group
key排序)
10.3 大表Join的数据偏斜
MapReduce编程模型下开发代码需要考虑数据偏斜的问题,Hive代码也是一样。数据偏斜的原因包括以下两点:
1. Map输出key数量极少,导致reduce端退化为单机作业。
2. Map输出key分布不均,少量key对应大量value,导致reduce端单机瓶颈。
Hive中我们使用MapJoin解决数据偏斜的问题,即将其中的某个表(全量)分发到所有Map端进行Join,从而避免了reduce。这要求分发的表可以被全量载入内存。
极限情况下,Join两边的表都是大表,就无法使用MapJoin。
这种问题最为棘手,目前已知的解决思路有两种:
1. 如果是上述情况1,考虑先对Join中的一个表去重,以此结果过滤无用信息。这样一般会将其中一个大表转化为小表,再使用MapJoin 。
一个实例是广告投放效果分析,例如将广告投放者信息表i中的信息填充到广告曝光日志表w中,使用投放者id关联。因为实际广告投放者数量很少(但是投放者信息表i很大),因此可以考虑先在w表中去重查询所有实际广告投放者id列表,以此Join过滤表i,这一结果必然是一个小表,就可以使用MapJoin。
2. 如果是上述情况2,考虑切分Join中的一个表为多片,以便将切片全部载入内存,然后采用多次MapJoin得到结果。
一个实例是商品浏览日志分析,例如将商品信息表i中的信息填充到商品浏览日志表w中,使用商品id关联。但是某些热卖商品浏览量很大,造成数据偏斜。例如,以下语句实现了一个inner join逻辑,将商品信息表拆分成2个表:
select * from
(
select w.id, w.time, w.amount, i1.name, i1.loc, i1.cat
from w left outer join i sampletable(1 out of 2 on id) i1
)
union all
(
select w.id, w.time, w.amount, i2.name, i2.loc, i2.cat
from w left outer join i sampletable(1 out of 2 on id) i2
)
);
以下语句实现了left outer join逻辑:
select t1.id, t1.time, t1.amount,
coalease(t1.name,t2.name),
coalease(t1.loc, t2.loc),
coalease(t1.cat, t2.cat)
from (
select w.id, w.time,w.amount, i1.name, i1.loc, i1.cat
from w left outer join isampletable(1 out of 2 on id) i1
) t1 left outer join i sampletable(2 out of 2 on id)t2;
上述语句使用Hive的sample table特性对表做切分。
11 不同数据类型关联产生数据倾斜
场景:一张表s8的日志,每个商品一条记录,要和商品表关联。但关联却碰到倾斜的问题。s8的日志中有字符串商品id,也有数字的商品id,类型是string的,但商品中的数字id是bigint的。猜测问题的原因是把s8的商品id转成数字id做hash来分配reduce,所以字符串id的s8日志,都到一个reduce上了,解决的方法验证了这个猜测。
解决方法:把数字类型转换成字符串类型
Select * from s8_log a
Left outer join r_auction_auctions b
On a.auction_id = cast(b.auction_id asstring);
12 合并小文件
文件数目过多,会给 HDFS 带来压力,并且会影响处理效率,可以通过合并 Map 和 Reduce 的结果文件来消除这样的影响:
rge.mapfiles = true 是否和并 Map 输出文件,默认为 True
rge.mapredfiles = false 是否合并 Reduce 输出文件,默认为 False
rge.size.per.task = 256*1000*1000 合并文件的大小
13 GroupBy ·
Map 端部分聚合:
并不是所有的聚合操作都需要在 Reduce 端完成,很多聚合操作都可以先在Map 端进行部分聚合,最后在 Reduce 端得出最终结果。
基于 Hash
参数包括:
§ hive.map.aggr = true 是否在 Map 端进行聚合,默认为 True
§ hive.groupby.mapaggr.checkinterval= 100000 在 Map 端进行聚合操作的条目数目
· 有数据倾斜的时候进行负载均衡
hive.groupby.skewindata= false
当选项设定为 true,生成的查询计划会有两个 MR Job。第一个 MR Job 中,Map 的输出结果集合会随机分布到 Reduce 中,每个 Reduce 做部分聚合操作,并输出结果,这样处理的结果是相同的 Group By Key 有可能被分发到不同的Reduce 中,从而达到负载均衡的目的;第二个 MR Job再根据预处理的数据结果按照 Group By Key 分布到 Reduce 中(这个过程可以保证相同的Group By Key 被分布到同一个 Reduce
中),最后完成最终的聚合操作。
以上是关于Hive 调优 小总结的主要内容,如果未能解决你的问题,请参考以下文章