使用 dask 和多处理优化内存使用

Posted

技术标签:

【中文标题】使用 dask 和多处理优化内存使用【英文标题】:Memory usage optimization using dask and multiprocessing 【发布时间】:2021-06-08 19:13:54 【问题描述】:

我正在使用 dask 处理来自许多参数变化的数据,我的目标是通过对 dask 数组的操作构建 600 000(案例或列数)的最终 dask 数据帧,该数组由形状小于 2000 的小数组构成. 在这里,我的最终数据帧计算了 6400 个案例

dd_final.compute()
0_95_euclidean  1_95_euclidean  2_95_euclidean  3_95_euclidean  ...  96_80_l1  97_80_l1  98_80_l1  99_80_l1
0           0.005670        0.010449        0.010756        0.009914  ...  0.007422  0.002066  0.009693  0.003475
1           0.006255        0.009970        0.007987        0.007785  ...  0.006119  0.002104  0.009638  0.004142
2           0.011956        0.018662        0.016426        0.015260  ...  0.013276  0.003897  0.019816  0.007479
3           0.021639        0.037590        0.036749        0.028090  ...  0.029751  0.009725  0.038956  0.011870
4           0.014482        0.022963        0.025416        0.017909  ...  0.017033 -0.002616  0.026231  0.000978
...              ...             ...             ...             ...  ...       ...       ...       ...       ...
1289        0.597443        1.044522        0.898732        0.940219  ...  0.914094  0.792133  0.744501  0.632575
1290        0.594463        1.041562        0.894501        0.935068  ...  0.913409  0.790555  0.742357  0.628366
1291        0.592523        1.035600        0.891222        0.932510  ...  0.907414  0.786722  0.738844  0.627611
1292        0.606415        1.059642        0.912963        0.951523  ...  0.922719  0.800610  0.751161  0.640515
1293        0.601242        1.049654        0.903112        0.942681  ...  0.915391  0.794133  0.744752  0.636788

[1294 rows x 6400 columns]

第一种方法:我对每个函数都使用池星图来加速 8 核 CPU 的操作,并将结果放入 dask 数组中。

def MP_a_func(func,iterable,proc,chunk):
    #
    pool=multiprocessing.Pool(processes=proc)
    Result=pool.starmap_async(func,iterable,chunksize=chunk)
    #
    return Result

if __name__ == '__main__':
    performances=MP_a_func(Post_processing_Weights,iterable,proc,chunk)

da_arr=da.from_array(performances.get(),chunks=chunk)
#... Some operations
#...
dd_final=dd.from_dask_array(da_arr).repartition(chunk)

此方法失败,因为在存储到 dask 数组之前内存不足以存储 MP 对象。

第二种方法:我想使用池星图,但将我的可迭代切片并在每个切片上附加 dask 数组

for iterable in [iter1,iter2,...,iter10000]:
    if __name__ == '__main__':
        performances=MP_a_func(Post_processing_Weights,iterable,proc,chunk)
        partial_da_arr=da.from_array(performances.get(),chunks=chunk)
        # append or assign to da_arr ??

如何在每个步骤中使用 append 或分配给 dask 数组而不加载内存,或者有更好的方法吗?

感谢您的帮助

【问题讨论】:

我已经使用 dask delayed API 更新了上面的代码,其中import dask.delayed as delayed if __name__ == '__main__': performances=delayed(MP_a_func(Post_processing_Weights,iterable,proc,chunk) ) 【参考方案1】:

600 000(案例或列数)

根据您在示例中提供的列名称判断,您的工作流程可能会受益于更好的数据重组,这也可能会简化计算。

如何在每个步骤中使用 append 或分配给 dask 数组而不加载内存,或者有更好的方法吗?

您可能对delayed API 感兴趣,但问题/问题不够清楚,无法提供进一步的建议。

【讨论】:

谢谢,我会用延迟的API重写它

以上是关于使用 dask 和多处理优化内存使用的主要内容,如果未能解决你的问题,请参考以下文章

使用 Dask 将大于内存的数据帧缓存到本地磁盘

python dask to_parquet 占用大量内存

Dask-Rapids 数据移动和内存不足问题

即使有块,Dask也会耗尽内存

使用默认调度程序进行Dask内存管理

带有pyarrow内存的dask read_parquet爆炸