如何使用 python Ray 在一个大列表上并行化?

Posted

技术标签:

【中文标题】如何使用 python Ray 在一个大列表上并行化?【英文标题】:How to use python Ray to parallelise over a large list? 【发布时间】:2021-01-22 14:58:09 【问题描述】:

我想使用ray 对列表的每个元素执行函数的并行操作。下面是一个简化的sn-p

import numpy as np
import time

import ray
import psutil
num_cpus = psutil.cpu_count(logical=False)
ray.init(num_cpus=num_cpus)


@ray.remote
def f(a, b, c):
    return a * b - c


def g(a, b, c):
    return a * b - c


def my_func_par(large_list):
    # arguments a and b are constant just to illustrate
    # argument c is is each element of a list large_list
    [f.remote(1.5, 2, i) for i in large_list]


def my_func_seq(large_list):
    # arguments a anf b are constant just to illustrate
    # argument c is is each element of a list large_list
    [g(1.5, 2, i) for i in large_list]

my_list = np.arange(1, 10000)

s = time.time()
my_func_par(my_list)
print(time.time() - s)
>>> 2.007

s = time.time()
my_func_seq(my_list)
print(time.time() - s)
>>> 0.0372

问题是,当我计时my_func_par 时,它比my_func_seq 慢得多(如上图所示~54x)。 ray 的一位作者确实回答了关于 this blog 的评论,这似乎解释了我正在做的是设置 len(large_list) 不同的任务,这是不正确的。

如何使用 ray 并修改上面的代码以并行运行? (也许通过将large_list拆分成块,块的数量等于cpu的数量)

编辑:这个问题有两个重要标准

函数f需要接受多个参数 可能需要使用ray.put(large_list),以便larg_list 变量可以存储在共享内存中,而不是复制到每个处理器中

【问题讨论】:

【参考方案1】:

补充上面桑所说的:

Ray Distributed multiprocessing.Pool 支持固定大小的 Ray Actor 池,以便于并行化。

import numpy as np
import time

import ray
from ray.util.multiprocessing import Pool
pool = Pool()

def f(x):
    # time.sleep(1)
    return 1.5 * 2 - x

def my_func_par(large_list):
    pool.map(f, large_list)

def my_func_seq(large_list):
    [f(i) for i in large_list]

my_list = np.arange(1, 10000)

s = time.time()
my_func_par(my_list)
print('Parallel time: ' + str(time.time() - s))

s = time.time()
my_func_seq(my_list)
print('Sequential time: ' + str(time.time() - s))

使用上面的代码,my_func_par 运行得更快(大约 0.1 秒)。如果您使用代码并让f(x) 变慢time.sleep 之类的东西,您会看到多处理的明显优势。

【讨论】:

如果f 有多个参数,你将如何更改pool.map() 应该和原版的map()一样。 geeksforgeeks.org/…【参考方案2】:

parallized 版本较慢的原因是运行 ray 任务不可避免地需要运行开销(尽管它付出了很多努力来优化它)。这是因为并行运行需要有进程间通信、序列化等。

话虽如此,如果您的函数真的很快(运行函数所花费的时间比分布式计算中的其他开销更少,在这种情况下,您的代码就是完美的情况,因为函数 f 真的很小。我假设它运行该函数将花费不到一微秒的时间)。

这意味着您应该使 f 函数的计算量更大,以便从并行化中受益。您提出的解决方案可能不起作用,因为即使在那之后,函数 f 可能仍然足够轻量级,具体取决于您的列表大小。

【讨论】:

以上是关于如何使用 python Ray 在一个大列表上并行化?的主要内容,如果未能解决你的问题,请参考以下文章

使用 Ray 并行化大型程序的正确方法

在 Python 中使用 Ray 并行化任务,得到“Aborted (core dumped)”

使用ray进行最近邻搜索的并行化

如何在并行进程(python)中将项目附加到列表中?

如何修复 ray 不断增长的内存使用量?

Python:如何并行获取请求以最小化时间?