为啥在 Spark 中重新分区比 partitionBy 快?
Posted
技术标签:
【中文标题】为啥在 Spark 中重新分区比 partitionBy 快?【英文标题】:Why is repartition faster than partitionBy in Spark?为什么在 Spark 中重新分区比 partitionBy 快? 【发布时间】:2021-11-15 06:25:46 【问题描述】:我正在尝试将 Spark 用于一个非常简单的用例:给定大量文件 (90k),其中包含数百万台设备的设备时间序列数据,将给定设备的所有时间序列读取分组为一个一组文件(分区)。现在假设我们的目标是 100 个分区,给定的设备数据显示在同一个输出文件中并不重要,只是同一个分区。
鉴于此问题,我们提出了两种方法来解决此问题 - repartition
然后 write
或 write
将 partitionBy
应用于 Writer
。其中任何一个的代码都非常简单:
repartition
(添加了哈希列以确保与下面的partitionBy
代码的比较是一对一的):
df = spark.read.format("xml") \
.options(rowTag="DeviceData") \
.load(file_path, schema=meter_data) \
.withColumn("partition", hash(col("_DeviceName")).cast("Long") % num_partitions) \
.repartition("partition") \
.write.format("json") \
.option("codec", "org.apache.hadoop.io.compress.GzipCodec") \
.mode("overwrite") \
.save(output_path)
partitionBy
:
df = spark.read.format("xml") \
.options(rowTag="DeviceData") \
.load(file_path, schema=meter_data) \
.withColumn("partition", hash(col("_DeviceName")).cast("Long") % num_partitions) \
.write.format("json") \
.partitionBy(“partition”) \
.option("codec", "org.apache.hadoop.io.compress.GzipCodec") \
.mode("overwrite") \
.save(output_path)
在我们的测试中,repartition
比 partitionBy
快 10 倍。这是为什么呢?
根据我的理解,repartition
会引发洗牌,我的 Spark 学习告诉我要尽可能避免这种洗牌。另一方面,partitionBy
(根据我的理解)只对每个节点进行本地排序操作 - 不需要随机播放。我是否误解了某些东西,让我认为partitionBy
会更快?
【问题讨论】:
【参考方案1】:TLDR:当您调用 partitionBy
时,Spark 会触发排序,而不是哈希重新分区。这就是为什么它在你的情况下要慢得多。
我们可以用一个玩具例子来验证一下:
spark.range(1000).withColumn("partition", 'id % 100)
.repartition('partition).write.csv("/tmp/test.csv")
不要关注灰色阶段,它被跳过,因为它是在之前的作业中计算的。
然后,partitionBy
:
spark.range(1000).withColumn("partition", 'id % 100)
.write.partitionBy("partition").csv("/tmp/test2.csv")
您可以检查是否可以在partitionBy
之前添加repartition
,排序仍然存在。那么发生了什么?请注意,第二个 DAG 中的排序不会触发洗牌。它是一个地图分区。事实上,当您调用partitionBy
时,spark 不会像人们一开始所期望的那样对数据进行洗牌。 Spark 单独对每个分区进行排序,然后每个执行程序将其数据写入相应分区的单独文件中。因此,请注意,使用partitionBy
,您不是在编写num_partitions
文件,而是在num_partitions
和num_partitions * num_executors
文件之间。每个分区的每个执行程序都有一个文件,其中包含属于该分区的数据。
【讨论】:
@Oli 对我来说,每个执行程序排序比与哈希重新分区相关的随机排序慢得多。对此有何想法以及如何使其更快? 我在写答案时正在考虑它,这确实很奇怪。你是说你有 90k 个文件。每个文件有多少条记录?还是总共? 是的,90k 个文件。文件大小中位数为 5MB,包含 4267 条记录。我在partitionBy
之前尝试过coalesce
,这确实提高了性能,但仍然比repartition
慢3 倍。
spark 是在本地运行还是在集群上运行?
12节点集群。【参考方案2】:
我认为@Oli 已经在他的 cmets 中完美地解释了这个问题的主要答案。我只想加上我的 2 美分并尝试解释一下。
假设当您读取 XML 文件 [90K 文件] 时,spark 会将其读取到 N 个分区中。这取决于spark.sql.files.maxPartitionBytes、文件格式、压缩类型等因素的数量。
假设它是 10K 个分区。这发生在下面的代码部分。
df = spark.read.format("xml") \
.options(rowTag="DeviceData") \
.load(file_path, schema=meter_data) \
假设您使用 num_partitions = 100,您将添加一个名为 partition 的新列,其值为 0-99。 Spark 只是向现有数据帧 [或 rdd] 添加一个新列,该列被拆分为 10K 分区。
.withColumn("partition", hash(col("_DeviceName")).cast("Long") % num_partitions) \
到目前为止,两个代码都是一样的。
现在,让我们比较一下 repartition v/s partitionBy 发生了什么
案例一:重新分区
.repartition("partition") \
.write.format("json") \
在这里,您根据具有 100 个不同值的列 "partition" 对现有数据框进行重新分区。因此,现有数据帧将导致完全洗牌,将分区数量从 10K 减少到 100。这个阶段将是计算量很大的,因为涉及到一个完整的洗牌。如果一个特定分区的大小真的很大[倾斜分区],这也可能会失败。
但这里的优势在于,在写入发生的下一阶段,Spark 只需将 100 个文件写入 output_path。每个文件将只有一个数据对应列 "partition"
的一个值案例 2:partitionBy
.write.format("json") \
.partitionBy("partition") \
在这里,您要求 spark 将 现有 数据帧写入 output_path,并按列 “partition” 的不同值进行分区。您无处可要求 spark 减少数据帧的现有分区数。
因此 spark 将在 output_path 内创建新文件夹 并在其中写入每个分区对应的数据。
output_path + "\partition=0\"
output_path + "\partition=1\"
output_path + "\partition=99\"
现在,由于您在现有数据帧上有 10K 个 spark 分区,并假设 最坏情况其中每个 10K 分区都具有列 “partition”的所有不同值>,Spark 将不得不写入 10K * 100 = 1M 个文件。 即,所有 10K 分区的一部分将被写入由列 “partition” 创建的所有 100 个文件夹。这样,spark 将通过在其中创建子目录将 1M 个文件 写入 output_path。优点是我们使用这种方法跳过了一次完全随机播放。
现在与 案例 1 中的内存计算密集型 shuffle 相比,这将慢得多,因为 Spark 必须创建 1M 文件并将它们写入持久存储。 这也是,最初是一个临时文件夹,然后是 output_path。
如果写入发生在 AWS S3 或 GCP Blob 等对象存储上,这会慢得多
案例3:coalesce + partitionBy
.coalesce(num_partitions) \
.write.format("json") \
.partitionBy("partition") \
在这种情况下,您将使用 coalesce() 将 spark 分区的数量从 10K 减少到 100 个,并将其写入按列 分区的 output_path"分区”。
因此,假设 最坏情况,这 100 个分区中的每一个都具有列 “partition” 的所有不同值,spark 将不得不写 100 * 100 = 10K 个文件。
这仍将比 Case 2 快,但会比 Case 1 慢。 这是因为您正在使用 coalesce() 进行部分洗牌,但最终仍将 10K 文件 写入 output_path。
案例4:repartition+partitionBy
.repartition("partition") \
.write.format("json") \
.partitionBy("partition") \
在这种情况下,您将使用 repartition() 将 spark 分区的数量从 10K 减少到 100 [列 “partition” 的不同值] 并编写它到按列 “partition” 分区的 output_path。
因此,这 100 个分区中的每一个只有一个列 “partition” 的不同值,spark 将不得不写入 100 * 1 = 100 个文件。 partitionBy() 创建的每个子文件夹里面只有一个文件。
这将与 案例 1 花费相同的时间,因为这两个案例都涉及完全随机播放,然后写入 100 个文件。此处唯一的区别是 100 个文件将位于 output_path 下的子文件夹中。
此设置对于在通过 spark 或 hive 读取 output_path 时过滤器的谓词下推很有用。
结论:
尽管 partitionBy 比 repartition 快,但取决于数据帧分区的数量和这些分区内的数据分布,仅使用 partitionBy 可能最终代价高昂。
【讨论】:
将此标记为已接受的答案,因为我认为它更好地定义了 partitionBy 较慢的真正原因。 Oli 的回答很有帮助,但我并没有像阅读本文后那样得到我想要的令人满意的感觉。以上是关于为啥在 Spark 中重新分区比 partitionBy 快?的主要内容,如果未能解决你的问题,请参考以下文章