读取大量 parquet 文件:read_parquet vs from_delayed

Posted

技术标签:

【中文标题】读取大量 parquet 文件:read_parquet vs from_delayed【英文标题】:Reading large number of parquet files: read_parquet vs from_delayed 【发布时间】:2020-05-13 00:09:43 【问题描述】:

我正在将大量(100 到 1000 个)镶木地板文件读入单个 dask 数据帧(单台机器,所有本地)。我意识到了

files = ['file1.parq', 'file2.parq', ...]
ddf = dd.read_parquet(files, engine='fastparquet')
ddf.groupby(['col_A', 'col_B']).value.sum().compute()

效率低很多
from dask import delayed
from fastparquet import ParquetFile

@delayed
def load_chunk(pth):
    return ParquetFile(pth).to_pandas()

ddf = dd.from_delayed([load_chunk(f) for f in files])
ddf.groupby(['col_A', 'col_B']).value.sum().compute()

对于我的特定应用程序,第二种方法 (from_delayed) 需要 6 秒才能完成,第一种方法需要 39 秒。在dd.read_parquet 案例中,在工人开始做某事之前似乎有很多开销,并且有很多transfer-... 操作分散在任务流图中。我想了解这里发生了什么。 read_parquet 方法慢得多的原因可能是什么?它与仅仅读取文件并将它们分成块有什么不同?

【问题讨论】:

我看到了同样的行为 - 大量文件中的 read_parquet 需要几分钟才能启动 Dask 集群。就挂在那里。上面的@delayed 版本更快。我尝试了 gather_statistics=False,但它什么也没做。 【参考方案1】:

您正在体验客户端尝试建立数据列的最小/最大统计信息,从而为数据帧建立良好的索引。索引对于防止读取特定工作不需要的数据文件非常有用。

在很多情况下,这是一个好主意,因为文件中的数据量很大,而文件的总数很少。在其他情况下,相同的信息可能包含在特殊的“_metadata”文件中,因此无需先读取所有文件。

为了防止扫描文件的页脚,你应该调用

dd.read_parquet(..,. gather_statistics=False)

这应该是 dask 下一版本的默认设置。

【讨论】:

谢谢!将 gather_statistics 设置为 False 时,时间几乎相等。我应该优化我编写镶木地板文件的方式...

以上是关于读取大量 parquet 文件:read_parquet vs from_delayed的主要内容,如果未能解决你的问题,请参考以下文章

从目录读取镶木地板文件时,pyspark不保存

如何使用 Parquet.net 从 Parquet 文件中仅读取列的一部分?

读取 PySpark 中的所有分区 parquet 文件

Hive 不读取 Spark 生成的分区 parquet 文件

如何通过python读取30G parquet文件

Impala 无法从 Parquet 文件中读取无日期时间戳