Spark 重新分区执行器
Posted
技术标签:
【中文标题】Spark 重新分区执行器【英文标题】:Spark Repartition Executors 【发布时间】:2020-07-29 16:06:42 【问题描述】:我有一个大约 100GB 的数据源,我正在尝试使用日期列对其进行分区。
为了避免分区内出现小块,我添加了一个 repartition(5) 以使每个分区内最多有 5 个文件:
df.repartition(5).write.orc("path")
我的问题是,我分配的 30 个执行程序中只有 5 个实际运行。最终我得到了我想要的(每个分区内有 5 个文件),但是由于只有 5 个执行器在运行,因此执行时间非常长。
您对我如何使它更快有什么建议吗?
【问题讨论】:
【参考方案1】:我简单地修复了它:
df.repartition($"dateColumn").write.partitionBy("dateColumn").orc(path)
并分配与我将在输出中拥有的分区数量相同数量的执行器。
谢谢大家
【讨论】:
【参考方案2】:您可以使用 repartition 和 partitionBy 来解决问题。 有两种方法可以解决这个问题。
假设你需要按 dateColumn 进行分区
df.repartition(5, 'dateColumn').write.partitionBy('dateColumn').parquet(path)
在这种情况下,使用的执行程序的数量将等于5 * distinct(dateColumn)
,并且您的所有日期都将包含 5 个文件。
另一种方法是将数据重新分区 3 次,然后使用 maxRecordsPerFile
保存数据,这将创建相同大小的文件,但您将无法控制创建的文件数量
df.repartition(60).write.option('maxRecordsPerFile',200000).partitionBy('dateColumn').parquet(path)
【讨论】:
嗨@Shubham,谢谢你的回答,但是这两种解决方案都不适合我。第一个在最后一步中仍然使用 5 个执行程序,这花费了很多时间,第二个创建了大小相同但每个文件有 60 个文件的文件,我试图避免 一次处理多少个分区。如果您只有一个日期列,则将使用 5 个执行者【参考方案3】:Spark 可以为 RDD 或数据帧的每个分区运行 1 个并发任务(最多为集群中的核心数)。如果您的集群有 30 个核心,那么您应该至少有 30 个分区。另一方面,单个分区通常不应包含超过 128MB,并且单个 shuffle 块不能大于 2GB(请参阅 SPARK-6235)。 由于您想减少执行时间,因此最好增加分区数量,并在作业结束时减少特定作业的分区数量。 为了更好地(平等地)在分区之间分配数据,最好使用散列分区器。
【讨论】:
以上是关于Spark 重新分区执行器的主要内容,如果未能解决你的问题,请参考以下文章
在 spark DF 中使用 partitionBy 后是不是可以重新分区?