重新分区 pyspark 数据帧失败以及如何避免初始分区大小
Posted
技术标签:
【中文标题】重新分区 pyspark 数据帧失败以及如何避免初始分区大小【英文标题】:Repartitioning a pyspark dataframe fails and how to avoid the initial partition size 【发布时间】:2019-02-25 11:51:14 【问题描述】:我正在尝试通过在 spark 数据帧上使用分区来调整 spark 的性能。代码如下:
file_path1 = spark.read.parquet(*paths[:15])
df = file_path1.select(columns) \
.where((func.col("organization") == organization))
df = df.repartition(10)
#execute an action just to make spark execute the repartition step
df.first()
在执行first()
期间,我检查了 Spark UI 中的作业阶段,并在此处找到了以下内容:
repartition
步骤?
为什么还有第 8 阶段? 我只请求了first()
的一项操作。是不是因为repartition
引起的shuffle?
有没有办法改变 parquet 文件的重新分区而不必进行此类操作? 最初当我阅读df
时,您可以看到它已分区超过 43k 分区,这确实很多(与我将其保存到 csv 文件时的大小相比:4 MB,13k 行)并在进一步的步骤中产生问题,这就是我想重新分区的原因。
我应该在重新分区后使用cache()
吗? df = df.repartition(10).cache()
?当我第二次执行df.first()
时,我也得到了一个有 43k 分区的预定阶段,尽管df.rdd.getNumPartitions()
返回了 10。
编辑:分区的数量只是为了尝试。我的问题旨在帮助我了解如何进行正确的重新分区。
注意:最初,Dataframe 是从 Hadoop 中的一系列 parquet 文件中读取的。
我已经阅读了这个作为参考How does Spark partition(ing) work on files in HDFS?
【问题讨论】:
你的 spark.default.parallelism 是什么?你的 parquet 文件中有多少个分区? 我不明白第二个问题..如果你的意思是我的镶木地板文件的大小,我不知道如何检查。否则 default.parallelism 没有设置,因此使用默认的..total number of cores on all executor nodes or 2, whichever is larger
并且 CPU 内核的分配是动态的。
你可以通过hdfs文件目录中“partXXX”文件的数量来查看parquet文件的分区数量。这是您在读取文件后将拥有的起始分区数。阅读文件后,您始终可以执行 rdd.coalesce(10)。
你知道任何可以帮助我计算文件“partXXX”数量的命令行吗?我做了一个小的谷歌搜索,但找不到。
如果 parquet 文件是 50 个分区,您将拥有文件 part-0000 到 part-0049 。所以你只需 ls 目录并按文件名排序。
【参考方案1】:
只要有洗牌,就会有一个新的阶段。和
重新分区会导致洗牌,这就是为什么你有两个阶段。
当您要多次使用数据帧时使用缓存
避免读两遍。
使用合并而不是重新分配。我认为它会减少洗牌,因为它只会减少分区的数量。
【讨论】:
该数据的大小是初始的。不是df
一个,因为它被选中(选择一些行)。我正在尝试对选定的行进行分区,而不是 3.1 TB 的数据..
很遗憾,您的回答没有回答我的问题。对于这个测试用例,我想使用 repartition
而不是 coalesce
。如果由于重新分区而发生改组,为什么我没有按照在 DataFrame 上执行的第二个操作中的要求重新分区数据?以上是关于重新分区 pyspark 数据帧失败以及如何避免初始分区大小的主要内容,如果未能解决你的问题,请参考以下文章
pyspark:在日期和时间上重新采样 pyspark 数据帧