如何使用 dask/fastparquet 从多个目录中读取多个 parquet 文件(具有相同架构)

Posted

技术标签:

【中文标题】如何使用 dask/fastparquet 从多个目录中读取多个 parquet 文件(具有相同架构)【英文标题】:How to read multiple parquet files (with same schema) from multiple directories with dask/fastparquet 【发布时间】:2018-03-04 09:57:16 【问题描述】:

我需要使用 dask 将具有相同架构的多个镶木地板文件加载到单个数据帧中。这在它们都在同一个目录中时有效,但在它们位于不同目录时无效。

例如:

import fastparquet
pfile = fastparquet.ParquetFile(['data/data1.parq', 'data/data2.parq'])

工作正常,但如果我将data2.parq 复制到不同的目录,以下内容将不起作用:

pfile = fastparquet.ParquetFile(['data/data1.parq', 'data2/data2.parq'])

我得到的回溯如下:

---------------------------------------------------------------------------
ValueError                                Traceback (most recent call last)
<ipython-input-11-b3d381f14edc> in <module>()
----> 1 pfile = fastparquet.ParquetFile(['data/data1.parq', 'data2/data2.parq'])

~/anaconda/envs/hv/lib/python3.6/site-packages/fastparquet/api.py in __init__(self, fn, verify, open_with, sep)
     82         if isinstance(fn, (tuple, list)):
     83             basepath, fmd = metadata_from_many(fn, verify_schema=verify,
---> 84                                                open_with=open_with)
     85             self.fn = sep.join([basepath, '_metadata'])  # effective file
     86             self.fmd = fmd

~/anaconda/envs/hv/lib/python3.6/site-packages/fastparquet/util.py in metadata_from_many(file_list, verify_schema, open_with)
    164     else:
    165         raise ValueError("Merge requires all PaquetFile instances or none")
--> 166     basepath, file_list = analyse_paths(file_list, sep)
    167 
    168     if verify_schema:

~/anaconda/envs/hv/lib/python3.6/site-packages/fastparquet/util.py in analyse_paths(file_list, sep)
    221     if len(tuple([p.split('=')[0] for p in parts[l:-1]])
    222             for parts in path_parts_list) > 1:
--> 223         raise ValueError('Partitioning directories do not agree')
    224     for path_parts in path_parts_list:
    225         for path_part in path_parts[l:-1]:

ValueError: Partitioning directories do not agree

我在使用 dask.dataframe.read_parquet 时遇到同样的错误,我假设它使用相同的 ParquetFile 对象。

如何从不同目录加载多个文件?将我需要加载的所有文件放到同一个目录中不是一种选择。

【问题讨论】:

【参考方案1】:

一种解决方法是分别读取每个块并传递给dask.dataframe.from_delayed。这与 read_parquet 所做的元数据处理并不完全相同('index' 下方应该是索引),但在其他方面应该可以工作。

import dask.dataframe as dd    
from dask import delayed    
from fastparquet import ParquetFile

@delayed
def load_chunk(pth):
    return ParquetFile(pth).to_pandas()

files = ['temp/part.0.parquet', 'temp2/part.1.parquet']
df = dd.from_delayed([load_chunk(f) for f in files])

df.compute()
Out[38]: 
   index  a
0      0  1
1      1  2
0      2  3
1      3  4

【讨论】:

【参考方案2】:

如果使用绝对路径或显式相对路径,这确实适用于 master 上的 fastparquet:

pfile = fastparquet.ParquetFile(['./data/data1.parq', './data2/data2.parq'])

需要领先的./ 应该被视为一个错误 - 请参阅问题。

【讨论】:

【参考方案3】:

Dask API documentation 声明:

要从多个文件中读取,您可以传递一个 globstring 或路径列表 [...]。

以下解决方案允许在单个 parquet 文件中使用不同的列,这对于 this answer 是不可能的。它将被并行化,因为它是一个原生 dask 命令。

import dask.dataframe as dd


files = ['temp/part.0.parquet', 'temp2/part.1.parquet']
df = dd.read_parquet(files)

df.compute()

【讨论】:

虽然此代码可以解决 OP 的问题,但最好包含关于您的代码如何解决 OP 问题的说明。这样,未来的访问者可以从您的帖子中学习,并将其应用到他们自己的代码中。 SO 不是编码服务,而是知识资源。此外,高质量、完整的答案更有可能获得支持。这些功能,以及所有帖子都是独立的要求,是 SO 作为一个平台的一些优势,使其与论坛区分开来。您可以编辑以添加其他信息和/或使用源文档补充您的解释。

以上是关于如何使用 dask/fastparquet 从多个目录中读取多个 parquet 文件(具有相同架构)的主要内容,如果未能解决你的问题,请参考以下文章

如何使用 Where 子句从多个表中删除多行?

如何使用 PHP 表单从多个表中检索数据?

如何使用多个包从命令行运行 Selenium testNG 文件?

如何从命令行使用多个 AWS 账户?

如何使用javascript从字符串末尾删除多个逗号?

使用多个 AuthenticationProvider 时如何从 PreAuthenticatedAuthenticationProvider 重定向 UsernameNotFoundExceptio