如何将单个镶木地板文件从 s3 读入 dask 数据帧?

Posted

技术标签:

【中文标题】如何将单个镶木地板文件从 s3 读入 dask 数据帧?【英文标题】:How to read a single parquet file from s3 into a dask dataframe? 【发布时间】:2018-06-24 20:37:25 【问题描述】:

我正在尝试将具有快速压缩的单个镶木地板文件从 s3 读取到 Dask 数据帧中。没有元数据目录,因为该文件是使用 Spark 2.1 编写的

它不适用于 fastparquet 本地

import dask.dataframe as dd
dd.read_parquet('test.snappy.parquet', engine='fastparquet')

我得到了这些例外:

NotADirectoryError                        Traceback (most recent call last)
~/.pyenv/versions/3.5.4/envs/hexapodask/lib/python3.5/site-packages/fastparquet/api.py in __init__(self, fn, verify, open_with, sep, root)
     95                 self.fn = fn2
---> 96                 with open_with(fn2, 'rb') as f:
     97                     self._parse_header(f, verify)

~/.pyenv/versions/3.5.4/envs/hexapodask/lib/python3.5/site-packages/dask/bytes/core.py in __enter__(self)
    311         mode = self.mode.replace('t', '').replace('b', '') + 'b'
--> 312         f = f2 = self.myopen(self.path, mode=mode)
    313         CompressFile = merge(seekable_files, compress_files)[self.compression]

~/.pyenv/versions/3.5.4/envs/hexapodask/lib/python3.5/site-packages/dask/bytes/local.py in open(self, path, mode, **kwargs)
     60         path = self._trim_filename(path)
---> 61         return open(path, mode=mode)
     62 

NotADirectoryError: [Errno 20] Not a directory: '/home/arinarmo/test.snappy.parquet/_metadata'

During handling of the above exception, another exception occurred:

TypeError                                 Traceback (most recent call last)
~/.pyenv/versions/3.5.4/envs/hexapodask/lib/python3.5/site-packages/fastparquet/api.py in _parse_header(self, f, verify)
    118         try:
--> 119             fmd = read_thrift(f, parquet_thrift.FileMetaData)
    120         except Exception:

~/.pyenv/versions/3.5.4/envs/hexapodask/lib/python3.5/site-packages/fastparquet/thrift_structures.py in read_thrift(file_obj, ttype)
     21     obj = ttype()
---> 22     obj.read(pin)
     23 

~/.pyenv/versions/3.5.4/envs/hexapodask/lib/python3.5/site-packages/fastparquet/parquet_thrift/parquet/ttypes.py in read(self, iprot)
   1864         if iprot._fast_decode is not None and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None:
-> 1865             iprot._fast_decode(self, iprot, (self.__class__, self.thrift_spec))
   1866             return

TypeError: expecting list of size 2 for struct args

During handling of the above exception, another exception occurred:

ParquetException                          Traceback (most recent call last)
<ipython-input-21-0dc755d9917b> in <module>()
----> 1 dd.read_parquet('test.snappy.parquet', engine='fastparquet')

~/.pyenv/versions/3.5.4/envs/hexapodask/lib/python3.5/site-packages/dask/dataframe/io/parquet.py in read_parquet(path, columns, filters, categories, index, storage_options, engine)
    763 
    764     return read(fs, paths, file_opener, columns=columns, filters=filters,
--> 765                 categories=categories, index=index)
    766 
    767 

~/.pyenv/versions/3.5.4/envs/hexapodask/lib/python3.5/site-packages/dask/dataframe/io/parquet.py in _read_fastparquet(fs, paths, myopen, columns, filters, categories, index, storage_options)
    209                                          sep=fs.sep)
    210         except Exception:
--> 211             pf = fastparquet.ParquetFile(paths[0], open_with=myopen, sep=fs.sep)
    212 
    213     check_column_names(pf.columns, categories)

~/.pyenv/versions/3.5.4/envs/hexapodask/lib/python3.5/site-packages/fastparquet/api.py in __init__(self, fn, verify, open_with, sep, root)
    100                 self.fn = fn
    101                 with open_with(fn, 'rb') as f:
--> 102                     self._parse_header(f, verify)
    103         self.open = open_with
    104 

~/.pyenv/versions/3.5.4/envs/hexapodask/lib/python3.5/site-packages/fastparquet/api.py in _parse_header(self, f, verify)
    120         except Exception:
    121             raise ParquetException('Metadata parse failed: %s' %
--> 122                                    self.fn)
    123         self.head_size = head_size
    124         self.fmd = fmd

ParquetException: Metadata parse failed: test.snappy.parquet

它适用于本地 parquet 文件和 pyarrow:

dd.read_parquet('test.snappy.parquet', engine='pyarrow')

最后,尝试使用 S3 和 pyarrow 也失败了:

dd.read_parquet('s3://redacted-location/test.snappy.parquet', engine='pyarrow')

以下例外:

~/.pyenv/versions/3.5.4/envs/hexapodask/lib/python3.5/site-packages/dask/dataframe/io/parquet.py in read_parquet(path, columns, filters, categories, index, storage_options, engine)
    763 
    764     return read(fs, paths, file_opener, columns=columns, filters=filters,
--> 765                 categories=categories, index=index)
    766 
    767 

~/.pyenv/versions/3.5.4/envs/hexapodask/lib/python3.5/site-packages/dask/dataframe/io/parquet.py in _read_pyarrow(fs, paths, file_opener, columns, filters, categories, index)
    492         columns = list(columns)
    493 
--> 494     dataset = pq.ParquetDataset(paths, filesystem=get_pyarrow_filesystem(fs))
    495     schema = dataset.schema.to_arrow_schema()
    496     has_pandas_metadata = schema.metadata is not None and b'pandas' in schema.metadata

~/.pyenv/versions/3.5.4/envs/hexapodask/lib/python3.5/site-packages/pyarrow/parquet.py in __init__(self, path_or_paths, filesystem, schema, metadata, split_row_groups, validate_schema)
    703 
    704         if validate_schema:
--> 705             self.validate_schemas()
    706 
    707     def validate_schemas(self):

~/.pyenv/versions/3.5.4/envs/hexapodask/lib/python3.5/site-packages/pyarrow/parquet.py in validate_schemas(self)
    712                 self.schema = open_file(self.metadata_path).schema
    713             else:
--> 714                 self.schema = self.pieces[0].get_metadata(open_file).schema
    715         elif self.schema is None:
    716             self.schema = self.metadata.schema

IndexError: list index out of range

在this 问题中,建议使用fastparquet.writer.merge,因为它应该写入元数据目录,但它对我来说失败,并出现与以前相同的错误

【问题讨论】:

我建议在 github.com/dask/dask/issues/new 将此作为错误报告提交。 dask.pydata.org/en/latest/support.html 【参考方案1】:

fastparquet 给出的错误具有误导性:它首先尝试加载目录,但失败,然后直接将路径加载为文件。真正的错误在于节俭元数据的解码。由于this commit,您可能会发现现在解析文件确实有效。

【讨论】:

以上是关于如何将单个镶木地板文件从 s3 读入 dask 数据帧?的主要内容,如果未能解决你的问题,请参考以下文章

是否可以从 Dask 读取镶木地板元数据?

如何将镶木地板文件从 s3 导入到 postgresql rds

我如何将每个Parquet行组读入一个单独的分区?

如何使用 dask/fastparquet 从多个目录中读取多个 parquet 文件(具有相同架构)

在 s3 pyspark 作业中创建单个镶木地板文件

在python中使用s3 select解析多个镶木地板文件?