Spark 重新分区执行器

Posted

技术标签:

【中文标题】Spark 重新分区执行器【英文标题】:Spark Repartition Executors 【发布时间】:2020-07-29 16:06:42 【问题描述】:

我有一个大约 100GB 的数据源,我正在尝试使用日期列对其进行分区。

为了避免分区内出现小块,我添加了一个 repartition(5) 以使每个分区内最多有 5 个文件:

df.repartition(5).write.orc("path")

我的问题是,我分配的 30 个执行程序中只有 5 个实际运行。最终我得到了我想要的(每个分区内有 5 个文件),但是由于只有 5 个执行器在运行,因此执行时间非常长。

您对我如何使它更快有什么建议吗?

【问题讨论】:

【参考方案1】:

我简单地修复了它:

df.repartition($"dateColumn").write.partitionBy("dateColumn").orc(path)

并分配与我将在输出中拥有的分区数量相同数量的执行器。

谢谢大家

【讨论】:

【参考方案2】:

您可以使用 repartition 和 partitionBy 来解决问题。 有两种方法可以解决这个问题。

假设你需要按 dateColumn 进行分区

df.repartition(5, 'dateColumn').write.partitionBy('dateColumn').parquet(path)

在这种情况下,使用的执行程序的数量将等于5 * distinct(dateColumn),并且您的所有日期都将包含 5 个文件。

另一种方法是将数据重新分区 3 次,然后使用 maxRecordsPerFile 保存数据,这将创建相同大小的文件,但您将无法控制创建的文件数量

df.repartition(60).write.option('maxRecordsPerFile',200000).partitionBy('dateColumn').parquet(path)

【讨论】:

嗨@Shubham,谢谢你的回答,但是这两种解决方案都不适合我。第一个在最后一步中仍然使用 5 个执行程序,这花费了很多时间,第二个创建了大小相同但每个文件有 60 个文件的文件,我试图避免 一次处理多少个分区。如果您只有一个日期列,则将使用 5 个执行者【参考方案3】:

Spark 可以为 RDD 或数据帧的每个分区运行 1 个并发任务(最多为集群中的核心数)。如果您的集群有 30 个核心,那么您应该至少有 30 个分区。另一方面,单个分区通常不应包含超过 128MB,并且单​​个 shuffle 块不能大于 2GB(请参阅 SPARK-6235)。 由于您想减少执行时间,因此最好增加分区数量,并在作业结束时减少特定作业的分区数量。 为了更好地(平等地)在分区之间分配数据,最好使用散列分区器。

【讨论】:

以上是关于Spark 重新分区执行器的主要内容,如果未能解决你的问题,请参考以下文章

在 spark DF 中使用 partitionBy 后是不是可以重新分区?

在 Spark 中处理压缩文件:重新分区可以提高还是降低性能

Spark重新分区不均匀分布记录

Spark DataFrame重新分区:未保留的分区数

Spark中的最佳重新分区方式

为啥在 Spark 中重新分区比 partitionBy 快?