使用 groupby 对不同的多列进行 Hive 优化

Posted

技术标签:

【中文标题】使用 groupby 对不同的多列进行 Hive 优化【英文标题】:Hive optimization on count distinct multiple columns with groupby 【发布时间】:2019-05-31 05:39:28 【问题描述】:

我正在对 MapReduce 进行 hive(1.4-cdh) 代码优化,在我的项目中,我们使用了许多带有 groupby 子句的 count distinct 操作,下面显示了一个示例 hql。

DROP TABLE IF EXISTS testdb.NewTable PURGE;
CREATE TABLE testdb.NewTable AS
SELECT a.* FROM (
SELECT col1,
COUNT(DISTINCT col2) AS col2,
COUNT(DISTINCT col3) AS col3,
COUNT(DISTINCT col4) AS col4,
COUNT(DISTINCT col5) AS col5
FROM BaseTable
GROUP BY col1) a
WHERE  a.col3 > 1 OR a.col4 > 1 OR a.col2 > 1 OR a.col5 > 1;

您能否帮我用一个更好的方法来解决这个问题,以最大限度地减少查询的处理时间。

为 CountDistinct 和 CollectSet 添加解释路径:

CountDistinct 解释计划:

OK
STAGE DEPENDENCIES:
  Stage-1 is a root stage
  Stage-0 depends on stages: Stage-1

STAGE PLANS:
  Stage: Stage-1
    Map Reduce
      Map Operator Tree:
          TableScan
            alias: BaseTable
            Statistics: Num rows: 16863109255 Data size: 2613966713222 Basic stats: COMPLETE Column stats: NONE
            Select Operator
              expressions: col1 (type: string), col2 (type: decimal(3,0)), col3 (type: string), col4 (type: string), col5 (type: string)
              outputColumnNames: col1, col2, col3, col4, col5
              Statistics: Num rows: 16863109255 Data size: 2613966713222 Basic stats: COMPLETE Column stats: NONE
              Group By Operator
                aggregations: count(DISTINCT col5), count(DISTINCT col2), count(DISTINCT col4), count(DISTINCT col3)
                keys: col1 (type: string), col5 (type: string), col2 (type: decimal(3,0)), col4 (type: string), col3 (type: string)
                mode: hash
                outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5, _col6, _col7, _col8
                Statistics: Num rows: 16863109255 Data size: 2613966713222 Basic stats: COMPLETE Column stats: NONE
                Reduce Output Operator
                  key expressions: _col0 (type: string), _col1 (type: string), _col2 (type: decimal(3,0)), _col3 (type: string), _col4 (type: string)
                  sort order: +++++
                  Map-reduce partition columns: _col0 (type: string)
                  Statistics: Num rows: 16863109255 Data size: 2613966713222 Basic stats: COMPLETE Column stats: NONE
      Reduce Operator Tree:
        Group By Operator
          aggregations: count(DISTINCT KEY._col1:0._col0), count(DISTINCT KEY._col1:1._col0), count(DISTINCT KEY._col1:2._col0), count(DISTINCT KEY._col1:3._col0)
          keys: KEY._col0 (type: string)
          mode: mergepartial
          outputColumnNames: _col0, _col1, _col2, _col3, _col4
          Statistics: Num rows: 8431554627 Data size: 1306983356533 Basic stats: COMPLETE Column stats: NONE
          Filter Operator
            predicate: ((((_col2 > 1) or (_col3 > 1)) or (_col1 > 1)) or (_col4 > 1)) (type: boolean)
            Statistics: Num rows: 8431554627 Data size: 1306983356533 Basic stats: COMPLETE Column stats: NONE
            File Output Operator
              compressed: false
              Statistics: Num rows: 8431554627 Data size: 1306983356533 Basic stats: COMPLETE Column stats: NONE
              table:
                  input format: org.apache.hadoop.mapred.TextInputFormat
                  output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
                  serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe

  Stage: Stage-0
    Fetch Operator
      limit: -1
      Processor Tree:
        ListSink

CollectSet 解释计划:

OK
STAGE DEPENDENCIES:
  Stage-1 is a root stage
  Stage-0 depends on stages: Stage-1

STAGE PLANS:
  Stage: Stage-1
    Map Reduce
      Map Operator Tree:
          TableScan
            alias: BaseTable
            Statistics: Num rows: 16863109255 Data size: 2613966713222 Basic stats: COMPLETE Column stats: NONE
            Select Operator
              expressions: col1 (type: string), col2 (type: decimal(3,0)), col3 (type: string), col4 (type: string), col5 (type: string)
              outputColumnNames: col1, col2, col3, col4, col5
              Statistics: Num rows: 16863109255 Data size: 2613966713222 Basic stats: COMPLETE Column stats: NONE
              Group By Operator
                aggregations: collect_set(col5), collect_set(col2), collect_set(col4), collect_set(col3)
                keys: col1 (type: string)
                mode: hash
                outputColumnNames: _col0, _col1, _col2, _col3, _col4
                Statistics: Num rows: 16863109255 Data size: 2613966713222 Basic stats: COMPLETE Column stats: NONE
                Reduce Output Operator
                  key expressions: _col0 (type: string)
                  sort order: +
                  Map-reduce partition columns: _col0 (type: string)
                  Statistics: Num rows: 16863109255 Data size: 2613966713222 Basic stats: COMPLETE Column stats: NONE
                  value expressions: _col1 (type: array<string>), _col2 (type: array<decimal(3,0)>), _col3 (type: array<string>), _col4 (type: array<string>)
      Reduce Operator Tree:
        Group By Operator
          aggregations: collect_set(VALUE._col0), collect_set(VALUE._col1), collect_set(VALUE._col2), collect_set(VALUE._col3)
          keys: KEY._col0 (type: string)
          mode: mergepartial
          outputColumnNames: _col0, _col1, _col2, _col3, _col4
          Statistics: Num rows: 8431554627 Data size: 1306983356533 Basic stats: COMPLETE Column stats: NONE
          Select Operator
            expressions: _col0 (type: string), size(_col1) (type: int), size(_col2) (type: int), size(_col3) (type: int), size(_col4) (type: int)
            outputColumnNames: _col0, _col1, _col2, _col3, _col4
            Statistics: Num rows: 8431554627 Data size: 1306983356533 Basic stats: COMPLETE Column stats: NONE
            Filter Operator
              predicate: ((((_col2 > 1) or (_col3 > 1)) or (_col1 > 1)) or (_col4 > 1)) (type: boolean)
              Statistics: Num rows: 8431554627 Data size: 1306983356533 Basic stats: COMPLETE Column stats: NONE
              File Output Operator
                compressed: false
                Statistics: Num rows: 8431554627 Data size: 1306983356533 Basic stats: COMPLETE Column stats: NONE
                table:
                    input format: org.apache.hadoop.mapred.TextInputFormat
                    output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
                    serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe

  Stage: Stage-0
    Fetch Operator
      limit: -1
      Processor Tree:
        ListSink

【问题讨论】:

【参考方案1】:

尝试使用collect_set,它将收集不包括空值的不同值。

CREATE TABLE testdb.NewTable AS
SELECT a.* FROM (
SELECT col1,
size(collect_set(col2)) AS col2,
size(collect_set(col3)) AS col3,
size(collect_set(col4)) AS col4,
size(collect_set(col5)) AS col5
FROM BaseTable
GROUP BY col1) a
WHERE  a.col3 > 1 OR a.col4 > 1 OR a.col2 > 1 OR a.col5 > 1;

【讨论】:

感谢您的回复,我尝试了您的回答,与 count(distinct) 相比,它增加了更多的处理时间。 @nilesh1212 什么是慢,请提供执行日志和计划(解释输出) --in collect_set stg-1: Stage-1的Hadoop作业信息:映射器数量:660; reducer 数量:400 stg-2:Stage-2 的 Hadoop 作业信息:映射器数量:313;减速器数量:297 stg-3:第 4 阶段的 Hadoop 作业信息:映射器数量:1;减速器数量:0 总时间:3159.383 秒 --in count(distinct) Stage-1 的 Hadoop 作业信息:映射器数量:659; reducer 数量:400 阶段 3 的 Hadoop 作业信息:映射器数量:1;减速器数量:0 总时间:2306.815 秒 @nilesh1212 两个计划似乎相同。这个参数 hive.map.aggr 设置为 true?看起来是这样,因为它也是映射器上的聚合。我注意到,在 collect_set 的情况下,映射器和减速器的数量更少。尝试增加映射器和减速器的并行度:***.com/a/48296562/2700344 减少数字,直到您运行更多的映射器和减速器,这可能会有所帮助 是 hive.map.aggr=true ,是否需要将其设置为 false 并尝试您的答案?

以上是关于使用 groupby 对不同的多列进行 Hive 优化的主要内容,如果未能解决你的问题,请参考以下文章

使用spark对hive表中的多列数据判重

hive如何实现多行转多列

groupby java

Groupby对python中的多列求和并计数

如何使用 group by(基于一列)从表中选择多列,在 hive 查询中具有和计数

多列上的多个聚合