python dask to_parquet 占用大量内存

Posted

技术标签:

【中文标题】python dask to_parquet 占用大量内存【英文标题】:python dask to_parquet taking a lot of memory 【发布时间】:2019-12-12 03:34:19 【问题描述】:

我正在使用带有 dask 的 python 3 来读取 parquet 文件列表,进行一些处理,然后将其全部放入一个新的联合 parquet 文件中以供以后使用。

该进程使用了​​太多内存,以至于它似乎在将所有 parquet 文件写入新的 parquet 文件之前尝试将它们读入内存。

我正在使用以下代码

def t(path):
    import dask.dataframe as dd
    ddf = dd.read_parquet(path)
    ddf["file"] = path
    return ddf

b = bag.from_sequence(parquet_files)
with ProgressBar():
       data = b.map(lambda x: t(x)).\
              map(lambda y: dd.to_parquet(y, output_parquet_file, partition_on=["file"], append=True, engine="fastparquet")).\
           compute(num_workers=1)

每次使用一个worker时,内存都会爆炸,尤其是在使用更多时。这些文件很大(每个大约 1G),我试图从 csv 文件中读取信息并将它们分成 25M 块,并遇到了同样的问题。

我在这里缺少什么?当迭代过程似乎在这里做正确的事情时,为什么它会尝试将所有内容加载到内存中?我怎样才能使用 dask 操作来做到这一点,而不会炸毁我在那台机器上的 128G 内存?

PS 我尝试使用 pyarrow 引擎,但问题是附加尚未在 dask 中实现。

编辑:尝试了建议的解决方案: 我现在试试这段代码

import dask.dataframe as dd
with ProgressBar():
    dfs = [dd.read_parquet(pfile) for pfile in parquet_files]
    for i, path in enumerate(parquet_files):
        dfs[i]["file"] = path
    df = dd.concat(dfs)
    df.to_parquet(output_parquet_file)

但内存还是会爆炸(在内存超过 200G 的系统上)

【问题讨论】:

请注意,现在在 dask master 中为 pyarrow 启用了追加 【参考方案1】:

在另一个集合的地图中使用 dask 集合方法很奇怪。您可以像这样使用 bag.map 并直接调用 fastaprquet 函数,或者更好(取决于您需要执行的处理),对所有内容使用数据帧 API:

dfs = [dd.read_parquet(pfile, ...) for pfile in parquet_files]
df = dd.concat(dfs)
df.to_parquet(...)

请注意,尽管您尝试附加到单个文件(我认为),但 parquet 格式并没有真正从中受益,您最好让 Dask 为每个分区写入一个文件。

【讨论】:

对不起,不行,看看编辑,内存还在爆炸,没有文件太大内存还... 您可能想尝试使用分布式调度程序(即使在单个节点上)【参考方案2】:

dask 支持读取多个 parquet 文件作为分区。 直接调用就好了。

import dask.dataframe as dd
ddf = dd.read_parquet("parquets/*.parquet")
ddf = ddf.map_partitions(lambda df: df*2)
ddf.to_parquet("result.parquet")

【讨论】:

以上是关于python dask to_parquet 占用大量内存的主要内容,如果未能解决你的问题,请参考以下文章

由于 parquet 文件损坏,Impala 无法创建分区表

在 for 循环中将 Python Dask 系列转换为列表或 Dask DataFrame

如何将Python Dask Dataframes合并到列中?

python Dask分布式安装

使用 Dask 从多个 Python 进程编写 Parquet 文件

在dask.distributed群集中的计算机之间共享python模块