SPARK Expand问题的解决(由count distinctgroup setscuberollup引起的)

Posted 鸿乃江边鸟

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了SPARK Expand问题的解决(由count distinctgroup setscuberollup引起的)相关的知识,希望对你有一定的参考价值。

背景

本文基于spark 3.1.2
我们知道spark对于count(distinct)/group sets 以及cube、rollup的处理都是采用转换为Expand的方法处理,
这样做的优点就是在数据量小的情况下,能有以空间换时间,从而达到加速的目的。
但是弊端也是很明显,就是在数据量较大的情况下,而且expand的倍数达到上百倍或者千倍的时候,这任务运行的时间很长(这在数分中是非常常见的)。

分析

先来看一组图:

是不是很刺激,数据从2,635,978,109直接扩张到了168,702,598,976,将近80倍。
该sql就是简单的读取表让后group by cube,如下:

该sql运行的时长达到了5个小时,如下:

经过优化后,该sql只需要49分钟,如下:

其实解决方法很简单,因为我们读取的是parquet的文件,且依赖的表的文件个数有400个,但是优化前的任务数是99个,所以我们可以设置spark.sql.files.maxPartitionBytes的值来控制每个task任务读取的数据大小,笔者是设置为20MB。具体spark是怎么读取parquet文件的可以参考Spark-读取Parquet-为什么task数量会多于Row Group的数量

结论

这种expand问题解决的思路也是有的:

  1. 设置spark.sql.files.maxPartitionBytes为合适的值,这种只适合直接依赖于表的情况(不适用子查询)
  2. 参考SPARK-32542,这种只适合group sets的情况,有可能会导致ExchangeExec过多的问题
  3. repartition 中间结果表,再拿中间临时结果作为依赖表,这种如果依赖的表很多,需要建立很多的临时表,比较繁琐
     create table temp_a select /*+ repartition(1000) */ from fackt_table
     select columns from temp_a group by cube()
    
  4. 修改spark源码从源码底层支持(后续文章会说到)

以上是关于SPARK Expand问题的解决(由count distinctgroup setscuberollup引起的)的主要内容,如果未能解决你的问题,请参考以下文章

如何解决嵌套地图函数中的 SPARK-5063

在spark中遇到了奇怪的错误,找到了奇怪的解决方法

使用 hive 和 spark 选择 count(*) 问题

spark-配置

是否有任何性能问题迫使在 spark 中使用 count 进行急切评估?

Windows上的Spark--rdd.count()不起作用[重复]