sparksql比hivesql优化的点(窗口函数)

Posted 小萝卜算子

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了sparksql比hivesql优化的点(窗口函数)相关的知识,希望对你有一定的参考价值。

有时候,一个 select 语句中包含多个窗口函数,它们的窗口定义(OVER 子句)可能相同、也可能不同。

对于相同的窗口,完全没必要再做一次分区和排序,我们可以将它们合并成一个 Window 算子。

select 
    id,
    sq,
    cell_type,
    rank,
    row_number() over(partition by id  order by rank ) naturl_rank,
    rank() over(partition by id order by rank) as r,
    dense_rank() over(partition by  cell_type order by id) as dr  
 from window_test_table 
 group by id,sq,cell_type,rank;

row_number() rank() 的窗口一样,可以放在一次分区和排序中完成,这一块hive sql与spark sql的表现是一致的。

但对于另外一种情况:

select
    id,
    rank,
    row_number() over(partition by id  order by rank ) naturl_rank,
    sum(rank) over(partition by id) as snum
 from window_test_table

​虽然这 2 个窗口并非完全一致,但是 sum(rank) 不关心分区内的顺序,完全可以复用 row_number() 的窗口。

从下面执行计划可以看出,spark sql sum(rank) 和row_number() 复用了同一个窗口,而hive sql没有。

spark sql的执行计划:

spark-sql>  explain select  id,rank,row_number() over(partition by id  order by rank ) naturl_rank,sum(rank) over(partition by id) as snum from window_test_table;
      
== Physical Plan ==
*(3) Project [id#13, rank#16, naturl_rank#8, snum#9L]
+- Window [row_number() windowspecdefinition(id#13, rank#16 ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS naturl_rank#8], [id#13], [rank#16 ASC NULLS FIRST]
   +- *(2) Sort [id#13 ASC NULLS FIRST, rank#16 ASC NULLS FIRST], false, 0
      +- Window [sum(cast(rank#16 as bigint)) windowspecdefinition(id#13, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS snum#9L], [id#13]
         +- *(1) Sort [id#13 ASC NULLS FIRST], false, 0
            +- Exchange hashpartitioning(id#13, 200)
               +- Scan hive tmp.window_test_table [id#13, rank#16], HiveTableRelation `tmp`.`window_test_table`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [id#13, sq#14, cell_type#15, rank#16]
Time taken: 0.278 seconds, Fetched 1 row(s)

​hive sql执行计划:

hive> explain select  id,rank,row_number() over(partition by id  order by rank ) naturl_rank,sum(rank) over(partition by id) as snum from window_test_table;
​
OK
STAGE DEPENDENCIES:
  Stage-1 is a root stage
  Stage-2 depends on stages: Stage-1
  Stage-0 depends on stages: Stage-2
​
STAGE PLANS:
  Stage: Stage-1
    Map Reduce
      Map Operator Tree:
          TableScan
            alias: window_test_table
            Statistics: Num rows: 13 Data size: 104 Basic stats: COMPLETE Column stats: NONE
            Reduce Output Operator
              key expressions: id (type: int), rank (type: int)
              sort order: ++
              Map-reduce partition columns: id (type: int)
              Statistics: Num rows: 13 Data size: 104 Basic stats: COMPLETE Column stats: NONE
      Reduce Operator Tree:
        Select Operator
          expressions: KEY.reducesinkkey0 (type: int), KEY.reducesinkkey1 (type: int)
          outputColumnNames: _col0, _col3
          Statistics: Num rows: 13 Data size: 104 Basic stats: COMPLETE Column stats: NONE
          PTF Operator
            Function definitions:
                Input definition
                  input alias: ptf_0
                  output shape: _col0: int, _col3: int
                  type: WINDOWING
                Windowing table definition
                  input alias: ptf_1
                  name: windowingtablefunction
                  order by: _col3 ASC NULLS FIRST
                  partition by: _col0
                  raw input shape:
                  window functions:
                      window function definition
                        alias: row_number_window_0
                        name: row_number
                        window function: GenericUDAFRowNumberEvaluator
                        window frame: PRECEDING(MAX)~FOLLOWING(MAX)
                        isPivotResult: true
            Statistics: Num rows: 13 Data size: 104 Basic stats: COMPLETE Column stats: NONE
            Select Operator
              expressions: _col0 (type: int), _col3 (type: int), row_number_window_0 (type: int)
              outputColumnNames: _col0, _col3, row_number_window_0
              Statistics: Num rows: 13 Data size: 104 Basic stats: COMPLETE Column stats: NONE
              File Output Operator
                compressed: false
                table:
                    input format: org.apache.hadoop.mapred.SequenceFileInputFormat
                    output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
                    serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe
​
  Stage: Stage-2
    Map Reduce
      Map Operator Tree:
          TableScan
            Reduce Output Operator
              key expressions: _col0 (type: int)
              sort order: +
              Map-reduce partition columns: _col0 (type: int)
              Statistics: Num rows: 13 Data size: 104 Basic stats: COMPLETE Column stats: NONE
              value expressions: row_number_window_0 (type: int), _col3 (type: int)
      Reduce Operator Tree:
        Select Operator
          expressions: VALUE._col0 (type: int), KEY.reducesinkkey0 (type: int), VALUE._col3 (type: int)
          outputColumnNames: _col0, _col1, _col4
          Statistics: Num rows: 13 Data size: 104 Basic stats: COMPLETE Column stats: NONE
          PTF Operator
            Function definitions:
                Input definition
                  input alias: ptf_0
                  output shape: _col0: int, _col1: int, _col4: int
                  type: WINDOWING
                Windowing table definition
                  input alias: ptf_1
                  name: windowingtablefunction
                  order by: _col1 ASC NULLS FIRST
                  partition by: _col1
                  raw input shape:
                  window functions:
                      window function definition
                        alias: sum_window_1
                        arguments: _col4
                        name: sum
                        window function: GenericUDAFSumLong
                        window frame: PRECEDING(MAX)~FOLLOWING(MAX)
            Statistics: Num rows: 13 Data size: 104 Basic stats: COMPLETE Column stats: NONE
            Select Operator
              expressions: _col1 (type: int), _col4 (type: int), _col0 (type: int), sum_window_1 (type: bigint)
              outputColumnNames: _col0, _col1, _col2, _col3
              Statistics: Num rows: 13 Data size: 104 Basic stats: COMPLETE Column stats: NONE
              File Output Operator
                compressed: false
                Statistics: Num rows: 13 Data size: 104 Basic stats: COMPLETE Column stats: NONE
                table:
                    input format: org.apache.hadoop.mapred.SequenceFileInputFormat
                    output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
                    serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
​
  Stage: Stage-0
    Fetch Operator
      limit: -1
      Processor Tree:
        ListSink
​
Time taken: 0.244 seconds, Fetched: 106 row(s)

以上是关于sparksql比hivesql优化的点(窗口函数)的主要内容,如果未能解决你的问题,请参考以下文章

oracleSQL 转 SPARKSQL(hiveSql) 及常用优化

Hive sql及窗口函数

HiveSql/SparkSQL常用函数

HiveSql&SparkSql —— 使用left semi join做inexists类型子查询优化

HiveSql&SparkSql —— 自定义UDFUDAFUDTF函数实战总结

求问怎么设置sparksql读取hive的数据库