是否可以从 Dask 读取镶木地板元数据?

Posted

技术标签:

【中文标题】是否可以从 Dask 读取镶木地板元数据?【英文标题】:Is it possible to read parquet metadata from Dask? 【发布时间】:2020-02-01 19:24:52 【问题描述】:

我有数以千计的镶木地板文件需要处理。在处理文件之前,我尝试使用 parquet 元数据获取有关文件的各种信息,例如每个分区中的行数、最小值、最大值等。

我尝试使用 dask.delayed 读取元数据,希望在我的集群中分发元数据收集任务,但这似乎会导致 Dask 不稳定。请参阅下面的示例代码 sn-p 和节点超时错误。

有没有办法从 Dask 读取 parquet 元数据?我知道 Dask 的“read_parquet”函数有一个“gather_statistics”选项,您可以将其设置为 false 以加快文件读取速度。但是,如果设置为 true,我看不到访问所有 parquet 元数据/统计信息的方法。

示例代码:


@dask.delayed
def get_pf(item_to_read):
     pf = fastparquet.ParquetFile(item_to_read)
     row_groups = pf.row_groups.copy()
     all_stats = pf.statistics.copy()
     col = pf.info['columns'].copy()
     return [row_groups, all_stats, col]

stats_arr = get_pf(item_to_read)

示例错误:

2019-10-03 01:43:51,202 - INFO - 192.168.0.167 - distributed.worker - ERROR - Worker stream died during communication: tcp://192.168.0.223:34623

2019-10-03 01:43:51,203 - INFO - 192.168.0.167 - Traceback (most recent call last):

2019-10-03 01:43:51,204 - INFO - 192.168.0.167 -   File "/usr/local/lib/python3.7/dist-packages/distributed/comm/core.py", line 218, in connect

2019-10-03 01:43:51,206 - INFO - 192.168.0.167 -     quiet_exceptions=EnvironmentError,

2019-10-03 01:43:51,207 - INFO - 192.168.0.167 -   File "/usr/local/lib/python3.7/dist-packages/tornado/gen.py", line 729, in run

2019-10-03 01:43:51,210 - INFO - 192.168.0.167 -     value = future.result()

2019-10-03 01:43:51,211 - INFO - 192.168.0.167 - tornado.util.TimeoutError: Timeout

2019-10-03 01:43:51,212 - INFO - 192.168.0.167 -

2019-10-03 01:43:51,213 - INFO - 192.168.0.167 - During handling of the above exception, another exception occurred:

2019-10-03 01:43:51,214 - INFO - 192.168.0.167 -

2019-10-03 01:43:51,215 - INFO - 192.168.0.167 - Traceback (most recent call last):

2019-10-03 01:43:51,217 - INFO - 192.168.0.167 -   File "/usr/local/lib/python3.7/dist-packages/distributed/worker.py", line 1841, in gather_dep

2019-10-03 01:43:51,218 - INFO - 192.168.0.167 -     self.rpc, deps, worker, who=self.address

2019-10-03 01:43:51,219 - INFO - 192.168.0.167 -   File "/usr/local/lib/python3.7/dist-packages/tornado/gen.py", line 729, in run

2019-10-03 01:43:51,220 - INFO - 192.168.0.167 -     value = future.result()

2019-10-03 01:43:51,222 - INFO - 192.168.0.167 -   File "/usr/local/lib/python3.7/dist-packages/tornado/gen.py", line 736, in run

2019-10-03 01:43:51,223 - INFO - 192.168.0.167 -     yielded = self.gen.throw(*exc_info)  # type: ignore

2019-10-03 01:43:51,224 - INFO - 192.168.0.167 -   File "/usr/local/lib/python3.7/dist-packages/distributed/worker.py", line 3029, in get_data_from_worker

2019-10-03 01:43:51,225 - INFO - 192.168.0.167 -     comm = yield rpc.connect(worker)

2019-10-03 01:43:51,640 - INFO - 192.168.0.167 -   File "/usr/local/lib/python3.7/dist-packages/tornado/gen.py", line 729, in run

2019-10-03 01:43:51,641 - INFO - 192.168.0.167 -     value = future.result()

2019-10-03 01:43:51,643 - INFO - 192.168.0.167 -   File "/usr/local/lib/python3.7/dist-packages/tornado/gen.py", line 736, in run

2019-10-03 01:43:51,644 - INFO - 192.168.0.167 -     yielded = self.gen.throw(*exc_info)  # type: ignore

2019-10-03 01:43:51,645 - INFO - 192.168.0.167 -   File "/usr/local/lib/python3.7/dist-packages/distributed/core.py", line 866, in connect

2019-10-03 01:43:51,646 - INFO - 192.168.0.167 -     connection_args=self.connection_args,

2019-10-03 01:43:51,647 - INFO - 192.168.0.167 -   File "/usr/local/lib/python3.7/dist-packages/tornado/gen.py", line 729, in run

2019-10-03 01:43:51,649 - INFO - 192.168.0.167 -     value = future.result()

2019-10-03 01:43:51,650 - INFO - 192.168.0.167 -   File "/usr/local/lib/python3.7/dist-packages/tornado/gen.py", line 736, in run

2019-10-03 01:43:51,651 - INFO - 192.168.0.167 -     yielded = self.gen.throw(*exc_info)  # type: ignore

2019-10-03 01:43:51,652 - INFO - 192.168.0.167 -   File "/usr/local/lib/python3.7/dist-packages/distributed/comm/core.py", line 230, in connect

2019-10-03 01:43:51,653 - INFO - 192.168.0.167 -     _raise(error)

2019-10-03 01:43:51,654 - INFO - 192.168.0.167 -   File "/usr/local/lib/python3.7/dist-packages/distributed/comm/core.py", line 207, in _raise

2019-10-03 01:43:51,656 - INFO - 192.168.0.167 -     raise IOError(msg)

2019-10-03 01:43:51,657 - INFO - 192.168.0.167 - OSError: Timed out trying to connect to 'tcp://192.168.0.223:34623' after 10 s: connect() didn't finish in time

【问题讨论】:

【参考方案1】:

dd.read_parquet 需要很长时间吗?如果没有,那么您可以按照其中的任何策略在客户端中进行阅读。

如果数据在根目录中有一个 _metadata 文件,那么您可以简单地使用 fastparquet 打开它,这正是 Dask 所做的。它包含所有数据片段的所有详细信息。

分发读取的元数据没有什么特别的原因,但您应该注意,在某些情况下,元数据项的总数可能会达到相当大的规模。

【讨论】:

谢谢,@mdurant。 dd.read_parquet 本身很快,并且直接给了我很多信息(例如,列名),但是获取像每个分区中的行数这样的信息,比直接读取元数据要慢得多,因为你基本上必须坚持/计算整个 dask 读取操作。因此,听起来像我一直在做的那样延迟 fastparquet 元数据读取操作并计算这些操作以分发操作是最好的方法。也许我还有其他导致不稳定的问题。

以上是关于是否可以从 Dask 读取镶木地板元数据?的主要内容,如果未能解决你的问题,请参考以下文章

使用 to_parquet() 将 dask 数据帧写入镶木地板结果“RuntimeError:文件元数据仅在写入器关闭后可用”

是否可以将巨大的 dask 数据框保存到镶木地板中?

在读取镶木地板文件时刷新 Dataframe 的元数据

如何确保镶木地板文件包含元数据中的行数?

使用 pyspark 比较镶木地板文件的元数据

如何使用 dask/fastparquet 从多个目录中读取多个 parquet 文件(具有相同架构)