HIVE优化场景七--数据倾斜--group by 倾斜

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了HIVE优化场景七--数据倾斜--group by 倾斜相关的知识,希望对你有一定的参考价值。

参考技术A HIVE优化场景七--数据倾斜:

GROUP BY 场景下的数据倾斜

JOIN 场景下的数据倾斜

1) 由于空值导致的数据倾斜问题

2) 由于数据类型不一致,导致的转换问题,导致的数据倾斜

3) 业务数据本身分布不均,导致的数据倾斜,下面4个小场景

i.大表与小表JOIN (Map JOIN)

ii.大表与大表JOIN, 一张表数据分布均匀,另一张表数据特定的KEY(有限几个) 分布不均

iii.大表与大表JOIN, 一张表数据分布均匀,另一张表大量的KEY 分布不均

iiii.大表与大表JOIN, 桶表,进行表拆分

group by 场景下的其实比较简单,我们只需要在 HIVE 中设置如下两个参数即可 :

set hive.map.aggr=true;

set hive.groupby.skewindata=true;

我们看下,设置这两个参数为什么能解决 GROUP BY 的数据倾斜问题

set hive.map.aggr=true; (默认 : true)   第一个参数表示在 Map 端进行预聚。 因为传到数据量小了,所以效率高了,可以缓解数据倾斜问题。

最主要的参数,其实是 set hive.groupby.skewindata=true;  

这个参数有什么作用呢。这场来说 GROUP BY 流程只会产生一个MR JOB。但是,设置这个参数为 true 以后, 原来 GROUP BY 的 MR JOB 会由原来的一个变为两个。

流程如下:

JOB1 .第一个作业会进行预处理,将数据进行预聚合,并随机分发到 不同的 Reducer 中。

      Map流程 : 会生成两个job来执行group by,第一个job中,各个map是平均读取分片的,在map阶段对这个分片中的数据根据group by 的key进行局部聚合操作,这里就相当于Combiner操作。

     Shuffle流程:在第一次的job中,map输出的结果随机分区,这样就可以平均分到reduce中  

     Reduce流程:  在第一次的job中,reduce中按照group by的key进行分组后聚合,这样就在各个reduce中又进行了一次局部的聚合。

JOB2.读取上一个阶段MR的输出作为Map输入,并局部聚合。按照key分区,将数据分发到 Reduce 中,进行统计。

      Map流程 : 因为第一个job中分区是随机的,所有reduce结果的数据的key也是随机的,所以第二个job的map读取的数据也是随机的key,所以第二个map中不存在数据倾斜的问题。

在第二个job的map中,也会进行一次局部聚合。

    Shuffle流程 :  第二个job中分区是按照group by的key分区的,这个地方就保证了整体的group by没有问题,相同的key分到了同一个reduce中。

    Reduce流程 :经过前面几个聚合的局部聚合,这个时候的数据量已经大大减少了,在最后一个reduce里进行最后的整体聚合。

SQL:

SELECT 

 pt,COUNT(1)

FROM datacube_salary_org

GROUP BY pt

;

开启前

STAGE DEPENDENCIES:                               

  Stage-1 is a root stage                         

  Stage-0 depends on stages: Stage-1             

STAGE PLANS:                                     

  Stage: Stage-1                                 

    Map Reduce                                   

      Map Operator Tree:                         

          TableScan                               

            alias: datacube_salary_org           

            Statistics: Num rows: 7 Data size: 1628 Basic stats: COMPLETE Column stats: COMPLETE

            Select Operator                       

              expressions: pt (type: string)     

              outputColumnNames: pt               

              Statistics: Num rows: 7 Data size: 1628 Basic stats: COMPLETE Column stats: COMPLETE

              Group By Operator                   

                aggregations: count(1)           

                keys: pt (type: string)           

                mode: hash                       

                outputColumnNames: _col0, _col1   

                Statistics: Num rows: 3 Data size: 576 Basic stats: COMPLETE Column stats: COMPLETE

                Reduce Output Operator           

                  key expressions: _col0 (type: string)

                  sort order: +                   

                  Map-reduce partition columns: _col0 (type: string)

                  Statistics: Num rows: 3 Data size: 576 Basic stats: COMPLETE Column stats: COMPLETE

                  value expressions: _col1 (type: bigint)

      Reduce Operator Tree:                       

        Group By Operator                         

          aggregations: count(VALUE._col0)       

          keys: KEY._col0 (type: string)         

          mode: mergepartial                     

          outputColumnNames: _col0, _col1         

          Statistics: Num rows: 3 Data size: 576 Basic stats: COMPLETE Column stats: COMPLETE

          File Output Operator                   

            compressed: false                     

            Statistics: Num rows: 3 Data size: 576 Basic stats: COMPLETE Column stats: COMPLETE

            table:                               

                input format: org.apache.hadoop.mapred.SequenceFileInputFormat

                output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat

                serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe

  Stage: Stage-0                                 

    Fetch Operator                               

      limit: -1                                   

      Processor Tree:                             

        ListSink                                 

开启后:

STAGE DEPENDENCIES:                               

  Stage-1 is a root stage                         

  Stage-2 depends on stages: Stage-1             

  Stage-0 depends on stages: Stage-2             

STAGE PLANS:                                     

  Stage: Stage-1                                 

    Map Reduce                                   

      Map Operator Tree:                         

          TableScan                               

            alias: datacube_salary_org           

            Statistics: Num rows: 7 Data size: 1628 Basic stats: COMPLETE Column stats: COMPLETE

            Select Operator                       

              expressions: pt (type: string)     

              outputColumnNames: pt               

              Statistics: Num rows: 7 Data size: 1628 Basic stats: COMPLETE Column stats: COMPLETE

              Group By Operator                   

                aggregations: count(1)           

                keys: pt (type: string)           

                mode: hash                       

                outputColumnNames: _col0, _col1   

                Statistics: Num rows: 3 Data size: 576 Basic stats: COMPLETE Column stats: COMPLETE

                Reduce Output Operator           

                  key expressions: _col0 (type: string)

                  sort order: +                   

                  Map-reduce partition columns: rand() (type: double)

                  Statistics: Num rows: 3 Data size: 576 Basic stats: COMPLETE Column stats: COMPLETE

                  value expressions: _col1 (type: bigint)

      Reduce Operator Tree:                       

        Group By Operator                         

          aggregations: count(VALUE._col0)       

          keys: KEY._col0 (type: string)         

          mode: partials                         

          outputColumnNames: _col0, _col1         

          Statistics: Num rows: 3 Data size: 576 Basic stats: COMPLETE Column stats: COMPLETE

          File Output Operator                   

            compressed: false                     

            table:                               

                input format: org.apache.hadoop.mapred.SequenceFileInputFormat

                output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat

                serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe

  Stage: Stage-2                                 

    Map Reduce                                   

      Map Operator Tree:                         

          TableScan                               

            Reduce Output Operator               

              key expressions: _col0 (type: string)

              sort order: +                       

              Map-reduce partition columns: _col0 (type: string)

              Statistics: Num rows: 3 Data size: 576 Basic stats: COMPLETE Column stats: COMPLETE

              value expressions: _col1 (type: bigint)

      Reduce Operator Tree:                       

        Group By Operator                         

          aggregations: count(VALUE._col0)       

          keys: KEY._col0 (type: string)         

          mode: final                             

          outputColumnNames: _col0, _col1         

          Statistics: Num rows: 3 Data size: 576 Basic stats: COMPLETE Column stats: COMPLETE

          File Output Operator                   

            compressed: false                     

            Statistics: Num rows: 3 Data size: 576 Basic stats: COMPLETE Column stats: COMPLETE

            table:                               

                input format: org.apache.hadoop.mapred.SequenceFileInputFormat

                output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat

                serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe

  Stage: Stage-0                                 

    Fetch Operator                               

      limit: -1                                   

      Processor Tree:                             

        ListSink                                 

可以明显的看到开启优化后。增加了一层 JOB 

对行进行排序时优化 Hive GROUP BY

【中文标题】对行进行排序时优化 Hive GROUP BY【英文标题】:Optimizing Hive GROUP BY when rows are sorted 【发布时间】:2016-12-20 17:15:48 【问题描述】:

我有以下(非常简单的)Hive 查询:

select user_id, event_id, min(time) as start, max(time) as end,
       count(*) as total, count(interaction == 1) as clicks
from events_all
group by user_id, event_id;

表格结构如下:

user_id                 event_id                time            interaction 
Ex833Lli36nxTvGTA1Dv    juCUv6EnkVundBHSBzQevw  1430481530295   0
Ex833Lli36nxTvGTA1Dv    juCUv6EnkVundBHSBzQevw  1430481530295   1
n0w4uQhOuXymj5jLaCMQ    G+Oj6J9Q1nI1tuosq2ZM/g  1430512179696   0
n0w4uQhOuXymj5jLaCMQ    G+Oj6J9Q1nI1tuosq2ZM/g  1430512217124   0
n0w4uQhOuXymj5jLaCMQ    mqf38Xd6CAQtuvuKc5NlWQ  1430512179696   1

我知道行首先按user_id 排序,然后按event_id 排序。

问题是:如果行已排序,有没有办法“提示”Hive 引擎来优化查询?优化的目的是避免将所有组保留在内存中,因为一次只需要保留一个组。

目前,在包含大约 300 GB 数据的 6 节点 16 GB Hadoop 集群中运行此查询大约需要 30 分钟,并且使用了大部分 RAM,导致系统阻塞。我知道每个组都会很小,每个 (user_id, event_id) 元组不超过 100 行,所以我认为优化的执行可能会占用非常小的内存并且速度更快(因为不需要循环组键)。

【问题讨论】:

附带说明,count(interaction == 1) 没有按我的预期工作,只计算具有 1 的行,而是返回与 count(*) 相同的行。 是的。聚合函数,包括 COUNT,忽略(仅)NULL 值并且 FALSE 不是 NULL 【参考方案1】:

创建一个分桶排序表。优化器会知道它是从元数据中排序的。 请参阅此处的示例(官方文档):https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL#LanguageManualDDL-BucketedSortedTables

Count only interaction = 1:count(case when interaction=1 then 1 end) as clicks - case 会将所有行标记为 1 或 null,并且只计算 1。

【讨论】:

感谢@leftjoin。有几件事:首先,我的表是一个外部表,有没有办法让它工作?其次,由于它是一个外部表,因此格式是固定的(制表符分隔值,\n 终止行),并且它没有不同的组终止符等。如果问的不多,您能否提供一个使用示例我发布的具体结构? @Alejandro Piad 另请阅读:grokbase.com/t/hive/user/133xgs10cb/bucketing-external-tables 很抱歉,您似乎必须创建分桶表并插入覆盖它,如果您在现有文本文件上创建外部表,它将无法正常工作。而且移动数据需要很长时间。 是的@leftjoin,我读过的所有内容都指向这一点。我接受这是正确的答案。

以上是关于HIVE优化场景七--数据倾斜--group by 倾斜的主要内容,如果未能解决你的问题,请参考以下文章

Hive数仓-数据倾斜优化

HIVE SQL 优化之数据倾斜

大数据面试题----HIVE的调优及数据倾斜

hive sql 优化 - 2.0

对行进行排序时优化 Hive GROUP BY

hive distribute by 和group by 的区别