spark数据分区数量的原理

Posted similarface

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了spark数据分区数量的原理相关的知识,希望对你有一定的参考价值。

 

 

原始RDD或数据集中的每一个分区都映射一个或多个数据文件, 该映射是在文件的一部分或者整个文件上完成的。

Spark Job RDD/datasets在执行管道中,通过根据分区到数据文件的映射读取数据输入到RDD/dataset。

 

如何根据某些参数确定spark的分区数?

影响数据分区数的参数:

(a)spark.default.parallelism (default: Total No. of CPU cores)
(b)spark.sql.files.maxPartitionBytes (default: 128 MB) 【读取文件时打包到单个分区中的最大字节数。】
(c)spark.sql.files.openCostInBytes (default: 4 MB)  【 该参数默认4M,表示小于4M的小文件会合并到一个分区中,用于减小小文件,防止太多单个小文件占一个分区情况。这个参数就是合并小文件的阈值,小于这个阈值的文件将会合并。】

 

使用这些配置参数值,一个名为maxSplitBytes的最大分割准则被计算如下:

 

maxSplitBytes = Minimum(maxPartitionBytes, bytesPerCore)

 

bytesPerCore = (文件总大小 + 文件个数 * openCostInBytes)/ default.parallelism

 

maxSplitBytes: 

for each_file in files:
    if each_file is can split:
        if each_file.size() > maxSplitBytes:
            # file 被切分为 block_number 块其中block_number-1大小为 maxSplitBytes,1块<=maxSplitBytes
            block_number = ceil(each_file.size() / maxSplitBytes)
        else:
            block_number = 1
    else:
        #文件不可分
        block_number = 1

 

数据文件计算文件块之后,将一个或多个文件块打包到一个分区中。

打包过程从初始化一个空分区开始,然后对每个文件块进行迭代:

1. 如果没有当前分区要打包,请初始化要打包的新分区,然后将迭代的文件块分配给该分区。 分区大小成为块大小与“ openCostInBytes”的额外开销的总和。

2.如果块大小的增加不超过当前分区(正在打包)的大小超过‘ maxSplitBytes ‘,那么文件块将成为当前分区的一部分。分区大小是由块大小和“openCostInBytes”额外开销的总和增加的。

3.如果块大小的增加超过了当前分区被打包的大小超过了‘ maxSplitBytes ‘,那么当前分区被声明为完整并启动一个新分区。迭代的文件块成为正在初始化的新分区的一部分,而新分区大小成为块大小和‘openCostInBytes’额外开销的总和。

打包过程结束后,将获得用于读取相应数据文件的数据集的分区数。

 

技术图片

 

 

 

 

尽管获得分区数量的过程似乎有点复杂,但基本的思想是,如果文件是可分拆的,那么首先在maxSplitBytes边界处拆分单个文件。

在此之后,将文件的分割块或不可分割的文件打包到一个分区中,这样,在将块打包到一个分区中期间,

如果分区大小超过maxSplitBytes,则认为该分区已经打包完成,然后采用一个新分区进行打包。因此,最终从包装过程中得到一定数量的分区。

e.g:

core设置为10

(a) 54 parquet files, 65 MB each, 默认参数 。  

bytesPerCore = (54*65 + 54 * 4)/ 10 = 372M

maxSplitBytes = Minimum(maxPartitionBytes, bytesPerCore) = min(128M,372M)=128

65 < 128 && 2*65 > 128 ==> 54分区

 

(b)54 parquet files, 63 MB each, 默认参数。

bytesPerCore = (54*63 + 54 * 4)/ 10 = 361M

maxSplitBytes = Minimum(maxPartitionBytes, bytesPerCore) = min(128M,361M)=128

63 < 128 &&   4 + 2*63=126+4=130 > 128=maxPartitionBytes  ==> 54 (看起来 1分区可以容纳2个块,但是存在一个openCostInBytes开销4M,2个63+4大于了 128M,故一个分区只能一个块)

 

(c)54 parquet files, 40 MB each, 默认参数。

bytesPerCore = (54*40 + 54 * 4)/ 10 = 237M

maxSplitBytes = Minimum(maxPartitionBytes, bytesPerCore) = min(128M,237M)=128

40 < 128 && (4+3*  40) = 124 < 128 (故一个分区可以装3个块) = 54/3 = 18分区

 

(d)54 parquet files, 40 MB each, maxPartitionBytes=88M 其余默认

bytesPerCore = (54*40 + 54 * 4)/ 10 = 237M

maxSplitBytes = Minimum(maxPartitionBytes, bytesPerCore) = min(88M,237M)=88

40 < 88 && (4+2*40) = 84 < 88 (一个分区2个) = 27个分区

 

(e) 54 parquet files, 40 MB each ; spark.default.parallelism set to 400

bytesPerCore = (54*40 + 54 * 4)/ 400 = 5M

maxSplitBytes = Minimum(maxPartitionBytes, bytesPerCore) = min(88M,5M)=5

总工块数:  54* 40 / 5

 

 

 

以上是关于spark数据分区数量的原理的主要内容,如果未能解决你的问题,请参考以下文章

spark04-文件读取分区数据分配原理

spark02-内存数据分区分配原理

Spark---并行度和分区

Apache Spark 内存不足,分区数量较少

为啥 Spark DataFrame 会创建错误数量的分区?

Spark-sql读取hive分区表限制分区过滤条件及限制分区数量