如何使用 Python Ray 在不耗尽内存的情况下并行处理大量数据?

Posted

技术标签:

【中文标题】如何使用 Python Ray 在不耗尽内存的情况下并行处理大量数据?【英文标题】:How to process massive amounts of data in parallel without using up memory with Python Ray? 【发布时间】:2019-09-25 08:43:52 【问题描述】:

我正在考虑使用 Ray 来实现数据并行处理的简单实现:

需要处理大量数据项,这些数据项可通过流/迭代器获得。每件商品的尺寸都很大 应在每个项目上运行一个函数,并将产生一个显着大小的结果 处理后的数据应该在流中传递或存储在某种接收器中,该接收器只能在某个时间段内接受一定数量的数据

我想知道这是否可以在 Ray 中完成。

目前我有以下基于pythons多处理库的简单实现:

一个进程读取流并将项目传递给队列,该队列将在 k 个项目后阻塞(这样队列所需的内存不会超过某个限制) 有几个工作进程将从输入队列中读取并处理项目。处理后的项目被传递到结果队列,该队列的大小也有限 另一个进程读取结果队列以传递项目

这样,一旦工作人员无法处理更多项目,队列将阻塞,并且不会尝试将更多工作传递给工作人员。 如果 sink 进程无法存储更多的项目,结果队列将阻塞,进而阻塞 worker,worker 反过来阻塞输入队列,直到 writer 进程可以再次写入更多结果。

那么,Ray 有抽象来做这样的事情吗?我如何确保只能将一定数量的工作传递给工作人员,我如何才能拥有像单进程输出函数这样的东西,并确保工作人员不能用如此多的结果淹没该函数,以至于内存/存储耗尽?

【问题讨论】:

【参考方案1】:

有一个适用于 Ray 的实验性流 API,您可能会发现它很有用:https://github.com/ray-project/ray/tree/master/python/ray/experimental/streaming

它为流式数据源、自定义运算符和接收器提供基本构造。您还可以通过限制队列大小为您的应用程序设置最大内存占用。

您能否分享一些有关您的应用程序的其他信息?

我们在谈论什么类型的数据?单个数据项有多大(以字节为单位)?

【讨论】:

虽然这回答了这个问题,但最好将底部的几行作为评论。一旦您获得足够的reputation,您就可以将 cmets 留在其他用户的帖子中,以寻求问题发布者的澄清。【参考方案2】:

对于这个用例,我推荐 Ray 的 parallel iterators。首先,您将创建一个生成器,该生成器从您的流式生成器中获取大对象(请参阅ray.util.iter.from_iterators())并对这些项目进行链式操作(请参阅.for_each())。至关重要的是,中间对象(本身可能很大)一旦被链中的下一个函数消耗,就会从内存中逐出,从而防止内存不足。

最后,您可以使用.take() 方法控制队列上的执行,直到您的数据接收器准备好为止。

【讨论】:

以上是关于如何使用 Python Ray 在不耗尽内存的情况下并行处理大量数据?的主要内容,如果未能解决你的问题,请参考以下文章

如何在不耗尽内存的情况下部署包含 130,000 多个条目的脚本

如何在不耗尽内存的情况下运行大型 Mahout 模糊 kmeans 聚类?

Google Big Query + PHP -> 如何在不耗尽内存的情况下获取大型数据集

在不耗尽内存的情况下检索图像

如何在不使用太多内存的情况下强制下载大文件?

如何在不耗尽电池的情况下监控 MPMoviePlayerController 播放进度?