Spark:加入时设置最大分区大小

Posted

技术标签:

【中文标题】Spark:加入时设置最大分区大小【英文标题】:Spark: set maximum partition size when joining 【发布时间】:2018-12-03 13:21:32 【问题描述】:

在spark中做join的时候,或者一般来说shuffle操作,我可以设置最大partition数,我希望spark在哪个partition中执行这个操作。

根据文档:

spark.sql.shuffle.partitions 200 配置在为连接或聚合打乱数据时要使用的分区数。

如果我想减少每个任务中必须完成的工作量,我将不得不估计数据的总大小并相应地调整此参数(更多分区意味着在单个任务中完成的工作更少,但更多任务)。

我想知道,我可以告诉 spark 简单地根据数据量调整分区数量吗? IE。在连接操作期间设置最大分区大小?

附加问题 - 在重新划分为 200 个大致相等的分区时,Spark 如何知道要处理的数据集的总大小是多少?

提前致谢!

【问题讨论】:

【参考方案1】:

AFAIK,没有这样的选项可以针对特定输出大小的 shuffle 分区。所以这个调音是留给你的...... 在某些情况下,这在某种程度上可以在下游读取路径上解决。假设您通过 hdfs 连接数据并将输出写入 parquet。您可以将查询结果重新分区为 1(或非常少的分区数)。将其视为一个漏斗 - 使用 200 个分区执行一些聚合,然后进一步降低聚合数据的并行度(这应该涉及相对较小的 IO)。假设您的目标是 256 MB 块大小。选项是输出在它周围的某个地方,低于它或高于它。对于前两种情况,您基本上实现了您的目标,那就是避免数据过于碎片化(在 hdfs 的情况下,namenode 中的块过多)。 但是,如果您的输出远高于目标块大小,这显然会影响下游作业的执行时间,您可以使用spark.sql.files.maxPartitionBytes 来控制读取此数据的分区数。因此,即使您有 2GB 的输出,将此参数设置为 128MB 也会在读取路径上产生 16 个分区。

关于您的第二个问题,spark 只需使用哈希分区器,并计算连接列的哈希值。当然,您可以使用distribute by 来影响分区。

【讨论】:

感谢 Lior 的回答!

以上是关于Spark:加入时设置最大分区大小的主要内容,如果未能解决你的问题,请参考以下文章

spark 高层通用调优

Spark:查找 RDD 的每个分区大小

spark:区分大小写的 partitionBy 列

你如何控制输出文件的大小?

如何确定 Apache Spark 数据帧中的分区大小

在 Apache Spark 中使用 join 时,数据集大小的组织是不是重要?