为啥在 Spark 中重新分区比 partitionBy 快?

Posted

技术标签:

【中文标题】为啥在 Spark 中重新分区比 partitionBy 快?【英文标题】:Why is repartition faster than partitionBy in Spark?为什么在 Spark 中重新分区比 partitionBy 快? 【发布时间】:2021-11-15 06:25:46 【问题描述】:

我正在尝试将 Spark 用于一个非常简单的用例:给定大量文件 (90k),其中包含数百万台设备的设备时间序列数据,将给定设备的所有时间序列读取分组为一个一组文件(分区)。现在假设我们的目标是 100 个分区,给定的设备数据显示在同一个输出文件中并不重要,只是同一个分区。

鉴于此问题,我们提出了两种方法来解决此问题 - repartition 然后 writewritepartitionBy 应用于 Writer。其中任何一个的代码都非常简单:

repartition(添加了哈希列以确保与下面的partitionBy代码的比较是一对一的):


df = spark.read.format("xml") \
  .options(rowTag="DeviceData") \
  .load(file_path, schema=meter_data) \
  .withColumn("partition", hash(col("_DeviceName")).cast("Long") % num_partitions) \
  .repartition("partition") \
  .write.format("json") \
  .option("codec", "org.apache.hadoop.io.compress.GzipCodec") \
  .mode("overwrite") \
  .save(output_path)

partitionBy:


df = spark.read.format("xml") \
  .options(rowTag="DeviceData") \
  .load(file_path, schema=meter_data) \
  .withColumn("partition", hash(col("_DeviceName")).cast("Long") % num_partitions) \
  .write.format("json") \
  .partitionBy(“partition”) \
  .option("codec", "org.apache.hadoop.io.compress.GzipCodec") \
  .mode("overwrite") \
  .save(output_path)

在我们的测试中,repartitionpartitionBy 快 10 倍。这是为什么呢?

根据我的理解,repartition 会引发洗牌,我的 Spark 学习告诉我要尽可能避免这种洗牌。另一方面,partitionBy(根据我的理解)只对每个节点进行本地排序操作 - 不需要随机播放。我是否误解了某些东西,让我认为partitionBy 会更快?

【问题讨论】:

【参考方案1】:

TLDR:当您调用 partitionBy 时,Spark 会触发排序,而不是哈希重新分区。这就是为什么它在你的情况下要慢得多。

我们可以用一个玩具例子来验证一下:

spark.range(1000).withColumn("partition", 'id % 100)
    .repartition('partition).write.csv("/tmp/test.csv")

不要关注灰色阶段,它被跳过,因为它是在之前的作业中计算的。

然后,partitionBy:

spark.range(1000).withColumn("partition", 'id % 100)
    .write.partitionBy("partition").csv("/tmp/test2.csv")

您可以检查是否可以在partitionBy 之前添加repartition,排序仍然存在。那么发生了什么?请注意,第二个 DAG 中的排序不会触发洗牌。它是一个地图分区。事实上,当您调用partitionBy 时,spark 不会像人们一开始所期望的那样对数据进行洗牌。 Spark 单独对每个分区进行排序,然后每个执行程序将其数据写入相应分区的单独文件中。因此,请注意,使用partitionBy,您不是在编写num_partitions 文件,而是在num_partitionsnum_partitions * num_executors 文件之间。每个分区的每个执行程序都有一个文件,其中包含属于该分区的数据。

【讨论】:

@Oli 对我来说,每个执行程序排序比与哈希重新分区相关的随机排序慢得多。对此有何想法以及如何使其更快? 我在写答案时正在考虑它,这确实很奇怪。你是说你有 90k 个文件。每个文件有多少条记录?还是总共? 是的,90k 个文件。文件大小中位数为 5MB,包含 4267 条记录。我在partitionBy 之前尝试过coalesce,这确实提高了性能,但仍然比repartition 慢3 倍。 spark 是在本地运行还是在集群上运行? 12节点集群。【参考方案2】:

我认为@Oli 已经在他的 cmets 中完美地解释了这个问题的主要答案。我只想加上我的 2 美分并尝试解释一下。

假设当您读取 XML 文件 [90K 文件] 时,spark 会将其读取到 N 个分区中。这取决于spark.sql.files.maxPartitionBytes文件格式压缩类型等因素的数量。

假设它是 10K 个分区。这发生在下面的代码部分。

df = spark.read.format("xml") \
  .options(rowTag="DeviceData") \
  .load(file_path, schema=meter_data) \

假设您使用 num_partitions = 100,您将添加一个名为 partition 的新列,其值为 0-99。 Spark 只是向现有数据帧 [或 rdd] 添加一个新列,该列被拆分为 10K 分区。

.withColumn("partition", hash(col("_DeviceName")).cast("Long") % num_partitions) \

到目前为止,两个代码都是一样的。

现在,让我们比较一下 repartition v/s partitionBy 发生了什么

案例一:重新分区

.repartition("partition") \
.write.format("json") \

在这里,您根据具有 100 个不同值的列 "partition" 对现有数据框进行重新分区。因此,现有数据帧将导致完全洗牌,将分区数量从 10K 减少到 100。这个阶段将是计算量很大的,因为涉及到一个完整的洗牌。如果一个特定分区的大小真的很大[倾斜分区],这也可能会失败。

但这里的优势在于,在写入发生的下一阶段,Spark 只需将 100 个文件写入 output_path。每个文件将只有一个数据对应列 "partition"

的一个值

案例 2:partitionBy

.write.format("json") \
.partitionBy("partition") \

在这里,您要求 spark 将 现有 数据帧写入 output_path,并按列 “partition” 的不同值进行分区。您无处可要求 spark 减少数据帧的现有分区数。

因此 spark 将在 output_path 内创建新文件夹 并在其中写入每个分区对应的数据。

output_path + "\partition=0\"
output_path + "\partition=1\"
output_path + "\partition=99\"

现在,由于您在现有数据帧上有 10K 个 spark 分区,并假设 最坏情况其中每个 10K 分区都具有列 “partition”的所有不同值>,Spark 将不得不写入 10K * 100 = 1M 个文件。 即,所有 10K 分区的一部分将被写入由列 “partition” 创建的所有 100 个文件夹。这样,spark 将通过在其中创建子目录将 1M 个文件 写入 output_path。优点是我们使用这种方法跳过了一次完全随机播放。

现在与 案例 1 中的内存计算密集型 shuffle 相比,这将慢得多,因为 Spark 必须创建 1M 文件并将它们写入持久存储。 这也是,最初是一个临时文件夹,然后是 output_path

如果写入发生在 AWS S3 或 GCP Blob 等对象存储上,这会慢得多

案例3:coalesce + partitionBy

.coalesce(num_partitions) \
.write.format("json") \
.partitionBy("partition") \

在这种情况下,您将使用 coalesce() 将 spark 分区的数量从 10K 减少到 100 个,并将其写入按列 分区的 output_path"分区”

因此,假设 最坏情况,这 100 个分区中的每一个都具有列 “partition” 的所有不同值,spark 将不得不写 100 * 100 = 10K 个文件。

这仍将比 Case 2 快,但会比 Case 1 慢。 这是因为您正在使用 coalesce() 进行部分洗牌,但最终仍将 10K 文件 写入 output_path

案例4:repartition+partitionBy

.repartition("partition") \
.write.format("json") \
.partitionBy("partition") \

在这种情况下,您将使用 repartition() 将 spark 分区的数量从 10K 减少到 100 [列 “partition” 的不同值] 并编写它到按列 “partition” 分区的 output_path

因此,这 100 个分区中的每一个只有一个列 “partition” 的不同值,spark 将不得不写入 100 * 1 = 100 个文件。 partitionBy() 创建的每个子文件夹里面只有一个文件。

这将与 案例 1 花费相同的时间,因为这两个案例都涉及完全随机播放,然后写入 100 个文件。此处唯一的区别是 100 个文件将位于 output_path 下的子文件夹中。

此设置对于在通过 spark 或 hive 读取 output_path 时过滤器的谓词下推很有用。

结论:

尽管 partitionByrepartition 快,但取决于数据帧分区的数量和这些分区内的数据分布,仅使用 partitionBy 可能最终代价高昂。

【讨论】:

将此标记为已接受的答案,因为我认为它更好地定义了 partitionBy 较慢的真正原因。 Oli 的回答很有帮助,但我并没有像阅读本文后那样得到我想要的令人满意的感觉。

以上是关于为啥在 Spark 中重新分区比 partitionBy 快?的主要内容,如果未能解决你的问题,请参考以下文章

为啥文件拆分的大小不会随着我重新分区数据而减少?

在同一个 Spark 作业中设置每个 shuffle 的 shuffle 分区数

在Oracle中,为啥删除表分区时公共同义词会失效

为啥 spark 没有在多个节点上重新分配我的数据帧?

如何使用合并更改分区数?

为啥今天集群/分区上的查询成本远高于以前的日期?