HIVE优化场景七--数据倾斜--group by 倾斜
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了HIVE优化场景七--数据倾斜--group by 倾斜相关的知识,希望对你有一定的参考价值。
参考技术A HIVE优化场景七--数据倾斜:GROUP BY 场景下的数据倾斜
JOIN 场景下的数据倾斜
1) 由于空值导致的数据倾斜问题
2) 由于数据类型不一致,导致的转换问题,导致的数据倾斜
3) 业务数据本身分布不均,导致的数据倾斜,下面4个小场景
i.大表与小表JOIN (Map JOIN)
ii.大表与大表JOIN, 一张表数据分布均匀,另一张表数据特定的KEY(有限几个) 分布不均
iii.大表与大表JOIN, 一张表数据分布均匀,另一张表大量的KEY 分布不均
iiii.大表与大表JOIN, 桶表,进行表拆分
group by 场景下的其实比较简单,我们只需要在 HIVE 中设置如下两个参数即可 :
set hive.map.aggr=true;
set hive.groupby.skewindata=true;
我们看下,设置这两个参数为什么能解决 GROUP BY 的数据倾斜问题
set hive.map.aggr=true; (默认 : true) 第一个参数表示在 Map 端进行预聚。 因为传到数据量小了,所以效率高了,可以缓解数据倾斜问题。
最主要的参数,其实是 set hive.groupby.skewindata=true;
这个参数有什么作用呢。这场来说 GROUP BY 流程只会产生一个MR JOB。但是,设置这个参数为 true 以后, 原来 GROUP BY 的 MR JOB 会由原来的一个变为两个。
流程如下:
JOB1 .第一个作业会进行预处理,将数据进行预聚合,并随机分发到 不同的 Reducer 中。
Map流程 : 会生成两个job来执行group by,第一个job中,各个map是平均读取分片的,在map阶段对这个分片中的数据根据group by 的key进行局部聚合操作,这里就相当于Combiner操作。
Shuffle流程:在第一次的job中,map输出的结果随机分区,这样就可以平均分到reduce中
Reduce流程: 在第一次的job中,reduce中按照group by的key进行分组后聚合,这样就在各个reduce中又进行了一次局部的聚合。
JOB2.读取上一个阶段MR的输出作为Map输入,并局部聚合。按照key分区,将数据分发到 Reduce 中,进行统计。
Map流程 : 因为第一个job中分区是随机的,所有reduce结果的数据的key也是随机的,所以第二个job的map读取的数据也是随机的key,所以第二个map中不存在数据倾斜的问题。
在第二个job的map中,也会进行一次局部聚合。
Shuffle流程 : 第二个job中分区是按照group by的key分区的,这个地方就保证了整体的group by没有问题,相同的key分到了同一个reduce中。
Reduce流程 :经过前面几个聚合的局部聚合,这个时候的数据量已经大大减少了,在最后一个reduce里进行最后的整体聚合。
SQL:
SELECT
pt,COUNT(1)
FROM datacube_salary_org
GROUP BY pt
;
开启前
STAGE DEPENDENCIES:
Stage-1 is a root stage
Stage-0 depends on stages: Stage-1
STAGE PLANS:
Stage: Stage-1
Map Reduce
Map Operator Tree:
TableScan
alias: datacube_salary_org
Statistics: Num rows: 7 Data size: 1628 Basic stats: COMPLETE Column stats: COMPLETE
Select Operator
expressions: pt (type: string)
outputColumnNames: pt
Statistics: Num rows: 7 Data size: 1628 Basic stats: COMPLETE Column stats: COMPLETE
Group By Operator
aggregations: count(1)
keys: pt (type: string)
mode: hash
outputColumnNames: _col0, _col1
Statistics: Num rows: 3 Data size: 576 Basic stats: COMPLETE Column stats: COMPLETE
Reduce Output Operator
key expressions: _col0 (type: string)
sort order: +
Map-reduce partition columns: _col0 (type: string)
Statistics: Num rows: 3 Data size: 576 Basic stats: COMPLETE Column stats: COMPLETE
value expressions: _col1 (type: bigint)
Reduce Operator Tree:
Group By Operator
aggregations: count(VALUE._col0)
keys: KEY._col0 (type: string)
mode: mergepartial
outputColumnNames: _col0, _col1
Statistics: Num rows: 3 Data size: 576 Basic stats: COMPLETE Column stats: COMPLETE
File Output Operator
compressed: false
Statistics: Num rows: 3 Data size: 576 Basic stats: COMPLETE Column stats: COMPLETE
table:
input format: org.apache.hadoop.mapred.SequenceFileInputFormat
output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
Stage: Stage-0
Fetch Operator
limit: -1
Processor Tree:
ListSink
开启后:
STAGE DEPENDENCIES:
Stage-1 is a root stage
Stage-2 depends on stages: Stage-1
Stage-0 depends on stages: Stage-2
STAGE PLANS:
Stage: Stage-1
Map Reduce
Map Operator Tree:
TableScan
alias: datacube_salary_org
Statistics: Num rows: 7 Data size: 1628 Basic stats: COMPLETE Column stats: COMPLETE
Select Operator
expressions: pt (type: string)
outputColumnNames: pt
Statistics: Num rows: 7 Data size: 1628 Basic stats: COMPLETE Column stats: COMPLETE
Group By Operator
aggregations: count(1)
keys: pt (type: string)
mode: hash
outputColumnNames: _col0, _col1
Statistics: Num rows: 3 Data size: 576 Basic stats: COMPLETE Column stats: COMPLETE
Reduce Output Operator
key expressions: _col0 (type: string)
sort order: +
Map-reduce partition columns: rand() (type: double)
Statistics: Num rows: 3 Data size: 576 Basic stats: COMPLETE Column stats: COMPLETE
value expressions: _col1 (type: bigint)
Reduce Operator Tree:
Group By Operator
aggregations: count(VALUE._col0)
keys: KEY._col0 (type: string)
mode: partials
outputColumnNames: _col0, _col1
Statistics: Num rows: 3 Data size: 576 Basic stats: COMPLETE Column stats: COMPLETE
File Output Operator
compressed: false
table:
input format: org.apache.hadoop.mapred.SequenceFileInputFormat
output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe
Stage: Stage-2
Map Reduce
Map Operator Tree:
TableScan
Reduce Output Operator
key expressions: _col0 (type: string)
sort order: +
Map-reduce partition columns: _col0 (type: string)
Statistics: Num rows: 3 Data size: 576 Basic stats: COMPLETE Column stats: COMPLETE
value expressions: _col1 (type: bigint)
Reduce Operator Tree:
Group By Operator
aggregations: count(VALUE._col0)
keys: KEY._col0 (type: string)
mode: final
outputColumnNames: _col0, _col1
Statistics: Num rows: 3 Data size: 576 Basic stats: COMPLETE Column stats: COMPLETE
File Output Operator
compressed: false
Statistics: Num rows: 3 Data size: 576 Basic stats: COMPLETE Column stats: COMPLETE
table:
input format: org.apache.hadoop.mapred.SequenceFileInputFormat
output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
Stage: Stage-0
Fetch Operator
limit: -1
Processor Tree:
ListSink
可以明显的看到开启优化后。增加了一层 JOB
对行进行排序时优化 Hive GROUP BY
【中文标题】对行进行排序时优化 Hive GROUP BY【英文标题】:Optimizing Hive GROUP BY when rows are sorted 【发布时间】:2016-12-20 17:15:48 【问题描述】:我有以下(非常简单的)Hive 查询:
select user_id, event_id, min(time) as start, max(time) as end,
count(*) as total, count(interaction == 1) as clicks
from events_all
group by user_id, event_id;
表格结构如下:
user_id event_id time interaction
Ex833Lli36nxTvGTA1Dv juCUv6EnkVundBHSBzQevw 1430481530295 0
Ex833Lli36nxTvGTA1Dv juCUv6EnkVundBHSBzQevw 1430481530295 1
n0w4uQhOuXymj5jLaCMQ G+Oj6J9Q1nI1tuosq2ZM/g 1430512179696 0
n0w4uQhOuXymj5jLaCMQ G+Oj6J9Q1nI1tuosq2ZM/g 1430512217124 0
n0w4uQhOuXymj5jLaCMQ mqf38Xd6CAQtuvuKc5NlWQ 1430512179696 1
我知道行首先按user_id
排序,然后按event_id
排序。
问题是:如果行已排序,有没有办法“提示”Hive 引擎来优化查询?优化的目的是避免将所有组保留在内存中,因为一次只需要保留一个组。
目前,在包含大约 300 GB 数据的 6 节点 16 GB Hadoop 集群中运行此查询大约需要 30 分钟,并且使用了大部分 RAM,导致系统阻塞。我知道每个组都会很小,每个 (user_id, event_id)
元组不超过 100 行,所以我认为优化的执行可能会占用非常小的内存并且速度更快(因为不需要循环组键)。
【问题讨论】:
附带说明,count(interaction == 1)
没有按我的预期工作,只计算具有 1 的行,而是返回与 count(*)
相同的行。
是的。聚合函数,包括 COUNT,忽略(仅)NULL 值并且 FALSE 不是 NULL
【参考方案1】:
创建一个分桶排序表。优化器会知道它是从元数据中排序的。 请参阅此处的示例(官方文档):https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL#LanguageManualDDL-BucketedSortedTables
Count only interaction = 1:count(case when interaction=1 then 1 end) as clicks
- case 会将所有行标记为 1 或 null,并且只计算 1。
【讨论】:
感谢@leftjoin。有几件事:首先,我的表是一个外部表,有没有办法让它工作?其次,由于它是一个外部表,因此格式是固定的(制表符分隔值,\n 终止行),并且它没有不同的组终止符等。如果问的不多,您能否提供一个使用示例我发布的具体结构? @Alejandro Piad 另请阅读:grokbase.com/t/hive/user/133xgs10cb/bucketing-external-tables 很抱歉,您似乎必须创建分桶表并插入覆盖它,如果您在现有文本文件上创建外部表,它将无法正常工作。而且移动数据需要很长时间。 是的@leftjoin,我读过的所有内容都指向这一点。我接受这是正确的答案。以上是关于HIVE优化场景七--数据倾斜--group by 倾斜的主要内容,如果未能解决你的问题,请参考以下文章