限制 spark.read pyspark 的分区数

Posted

技术标签:

【中文标题】限制 spark.read pyspark 的分区数【英文标题】:Limit number of partitions for spark.read pyspark 【发布时间】:2021-05-20 07:22:54 【问题描述】:

在我使用 spark 读取 xml 文件后:

df = spark.read\
        .format("xml")\
        .options(**options)\
        .load("s3a://.../.../")

我用df.rdd.getNumPartitions()查看了分区数,得到了20081。

如何在开始时限制分区的数量,这样我以后就不用coalesce()了?有这么多分区的问题是由于每个分区在df.write 期间创建一个文件,并且每次运行此进程时在 s3 中创建 20081 个新的非常小的文件是非常糟糕的做法。

【问题讨论】:

【参考方案1】:

spark.read 生成的 Dataframe 将始终与分区数和文件数相匹配,因为每个文件都将由专用任务读取。

如果您需要更频繁地运行此过程,我宁愿使用 coalescerepartition 将那些原始 20000 个文件消耗并一次复制到较小的文件中。然后,对这些文件的所有后续读取都将产生一个具有较小分区的 Dataframe。

【讨论】:

【参考方案2】:

分区数由DataSourceScanExec通过一个有点复杂的公式计算得出。但是为了简化它,尝试增加这个值spark.sql.files.maxPartitionBytes,默认是134217728 (128 MB)。试着把它变大,你会看到不同的。

spark.conf.set('spark.sql.files.maxPartitionBytes', '1073741824') # 1 GB

【讨论】:

以上是关于限制 spark.read pyspark 的分区数的主要内容,如果未能解决你的问题,请参考以下文章

在 pyspark 中过滤 Hive 分区表

逻辑和物理计划如何工作时读蜂巢分区表在兽人pyspark dataframe吗

手动选择镶木地板分区与在 pyspark 中过滤它们

如何使用 spark.read.jdbc 读取不同 Pyspark 数据帧中的多个文件

Pyspark spark.read.csv().collect() 返回一个空列表

spark.read.options(header=True, delimiter="|").csv("mycsv") PySpark 中的 3 行花费了太多时