在 csv 与 delta 表中使用 df.coalesce(1)

Posted

技术标签:

【中文标题】在 csv 与 delta 表中使用 df.coalesce(1)【英文标题】:use of df.coalesce(1) in csv vs delta table 【发布时间】:2021-05-14 19:48:06 【问题描述】:

当保存到增量表时,我们避免使用“df.coalesce(1)”,但是当保存到 csv 或 parquet 时,我们(我的团队)添加“df.coalesce(1)”。这是一种常见的做法吗?为什么?是强制性的吗?

【问题讨论】:

根据我的看法,Delta 格式有 auto optimize and auto compact 内置。使用coalesce(1) 会将所有文件合并为一个,保存后,自动压缩将执行相同的工作。您必须编写的任何其他格式,您只想保留 1 个文件。 【参考方案1】:

在我看到df.coalesce(1) 的大多数情况下,它只生成一个文件,例如,将 CSV 文件导入 Excel,或将 Parquet 文件导入基于 Pandas 的程序。但是如果你在做.coalesce(1),那么写入是通过单个任务发生的,它会成为性能瓶颈,因为你需要从其他执行者那里获取数据并写入。

如果您使用来自 Spark 或其他分布式系统的数据,拥有多个文件将有助于提高性能,因为您可以并行写入和读取它们。默认情况下,Spark 会将 N 个文件写入目录,其中 N 是分区数。正如@pltc 注意到的那样,这可能会生成大量通常不受欢迎的文件,因为访问它们会导致性能开销。所以我们需要在文件数量和它们的大小之间取得平衡——对于 Parquet 和 Delta(基于 Parquet)来说,拥有更大的文件会带来一些性能优势——你读取的文件更少,你可以更好地压缩里面的数据文件等。

特别是对于 Delta,.coalesce(1) 与其他文件格式存在相同的问题 - 您正在通过一项任务编写。从性能的角度来看,依赖默认的 Spark 行为并写入多个文件是有益的 - 每个节点都在并行写入其数据,但是您可以获得太多的小文件(因此您可以使用 .coalesce(N) 来写入更大的文件)。对于 Databricks Delta,正如 @Kafels 正确指出的那样,有 some optimizations 将允许删除 .coalesce(N) 并进行自动调整以实现最佳吞吐量(所谓的“优化写入”),并创建更大的文件( "Auto compaction") - 但应谨慎使用。

总体而言,Delta 的最佳文件大小主题是一个有趣的主题 - 如果您有大文件(OPTIMIZE 命令默认使用 1Gb),您可以获得更好的读取吞吐量,但如果您使用 MERGE 重写它们/UPDATE/DELETE,那么从性能角度来看,大文件是不好的,最好有更小的(16-64-128Mb)文件,这样你可以重写更少的数据。

【讨论】:

【参考方案2】:

TL;DR:这不是强制性的,这取决于您的数据框的大小。

长答案

如果您的数据帧为 10Mb,并且您有 1000 个分区,则每个文件大约为 10Kb。拥有这么多小文件会大大降低 Spark 的性能,更不用说当你有太多文件时,你最终会达到OS limitation of the number of files。无论如何,当您的数据集足够小时,您应该将它们合并到 coalesce 的几个文件中。

但是,如果您的数据帧为 100G,从技术上讲,您仍然可以使用 coalesce(1) 并保存到单个文件中,但稍后您在读取它时将不得不处理较少的并行度。

【讨论】:

以上是关于在 csv 与 delta 表中使用 df.coalesce(1)的主要内容,如果未能解决你的问题,请参考以下文章

WriteStream 无法在 Delta 表中写入数据

Pyspark 从 csv 文件中读取 delta/upsert 数据集

Delta Lake:如何在下一个版本的 delta 表中不携带已删除的记录?

更新 hive 表中的增量记录

Delta E (CIE Lab) 在 SQL 中计算和排序的性能

Spark 从另一个表更新 Delta 中的多个列