限制 spark.read pyspark 的分区数
Posted
技术标签:
【中文标题】限制 spark.read pyspark 的分区数【英文标题】:Limit number of partitions for spark.read pyspark 【发布时间】:2021-05-20 07:22:54 【问题描述】:在我使用 spark 读取 xml 文件后:
df = spark.read\
.format("xml")\
.options(**options)\
.load("s3a://.../.../")
我用df.rdd.getNumPartitions()
查看了分区数,得到了20081。
如何在开始时限制分区的数量,这样我以后就不用coalesce()
了?有这么多分区的问题是由于每个分区在df.write
期间创建一个文件,并且每次运行此进程时在 s3 中创建 20081 个新的非常小的文件是非常糟糕的做法。
【问题讨论】:
【参考方案1】:spark.read
生成的 Dataframe 将始终与分区数和文件数相匹配,因为每个文件都将由专用任务读取。
如果您需要更频繁地运行此过程,我宁愿使用 coalesce
或 repartition
将那些原始 20000 个文件消耗并一次复制到较小的文件中。然后,对这些文件的所有后续读取都将产生一个具有较小分区的 Dataframe。
【讨论】:
【参考方案2】:分区数由DataSourceScanExec通过一个有点复杂的公式计算得出。但是为了简化它,尝试增加这个值spark.sql.files.maxPartitionBytes
,默认是134217728
(128 MB)。试着把它变大,你会看到不同的。
spark.conf.set('spark.sql.files.maxPartitionBytes', '1073741824') # 1 GB
【讨论】:
以上是关于限制 spark.read pyspark 的分区数的主要内容,如果未能解决你的问题,请参考以下文章
逻辑和物理计划如何工作时读蜂巢分区表在兽人pyspark dataframe吗
如何使用 spark.read.jdbc 读取不同 Pyspark 数据帧中的多个文件
Pyspark spark.read.csv().collect() 返回一个空列表
spark.read.options(header=True, delimiter="|").csv("mycsv") PySpark 中的 3 行花费了太多时