如何在 python 的类中使用光线并行性?

Posted

技术标签:

【中文标题】如何在 python 的类中使用光线并行性?【英文标题】:How to use ray parallelism within a class in python? 【发布时间】:2021-01-26 23:04:48 【问题描述】:

我想使用 ray task 方法而不是 ray actor 方法来并行化类中的方法。后者的原因似乎需要改变类的实例化方式(如here 所示)。下面是一个玩具代码示例,以及错误

import numpy as np
import ray


class MyClass(object):
    
    def __init__(self):
        ray.init(num_cpus=4)
    
    @ray.remote
    def func(self, x, y):
        return x * y
    
    def my_func(self):
        a = [1, 2, 3]
        b = np.random.normal(0, 1, 10000)
        result = []
        # we wish to parallelise over the array `a`
        for sub_array in np.array_split(a, 3):
            result.append(self.func.remote(sub_array, b))
        return result

mc = MyClass()
mc.my_func()
>>> TypeError: missing a required argument: 'y'

出现错误是因为 ray 似乎没有“意识到”该类,因此它需要一个参数 self

如果我们不使用类,代码可以正常工作:

@ray.remote
def func(x, y):
    return x * y

def my_func():
    a = [1, 2, 3, 4]
    b = np.random.normal(0, 1, 10000)
    result = []
    # we wish to parallelise over the list `a`
    # split `a` and send each chunk to a different processor
    for sub_array in np.array_split(a, 4):
        result.append(func.remote(sub_array, b))
    return result


res = my_func()
ray.get(res)
>>> [array([-0.41929678, -0.83227786, -2.69814232, ..., -0.67379119,
        -0.79057845, -0.06862196]),
 array([-0.83859356, -1.66455572, -5.39628463, ..., -1.34758239,
        -1.5811569 , -0.13724391]),
 array([-1.25789034, -2.49683358, -8.09442695, ..., -2.02137358,
        -2.37173535, -0.20586587]),
 array([ -1.67718712,  -3.32911144, -10.79256927, ...,  -2.69516478,
         -3.1623138 ,  -0.27448782])]```

正如预期的那样,我们看到输出是一个包含 4 个数组的列表。如何让MyClass 使用 ray 进行并行处理?

【问题讨论】:

【参考方案1】:

一些提示:

    一般建议你只在python中的函数或类上使用ray.remote装饰器(不是绑定方法)。

    在函数的构造函数中调用ray.init 时应该非常小心,因为ray.init 不是 幂等的(这意味着如果您实例化多个@987654326 实例,您的程序将会失败@)。相反,您应该确保 ray.init 在您的程序中只运行一次。

我认为这里有 2 种方法可以达到你想要的结果。

您可以将func 移出类,因此它成为一个函数而不是绑定方法。请注意,在这种方法中MyClass 将被序列化,这意味着funcMyClass 所做的更改将不会反映在函数之外的任何地方。在您的简化示例中,这似乎不是问题。这种方法最容易实现最大并行度。

@ray.remote
def func(obj, x, y):
    return x * y


class MyClass(object):
    def my_func(self):
        ...
        # we wish to parallelise over the array `a`
        for sub_array in np.array_split(a, 3):
            result.append(func.remote(self, sub_array, b))
        return result

您可以考虑的另一种方法是使用async actors。在这种方法中,ray actor 将通过 asyncio 处理并发,但是这个comes with the limitations of asyncio。

@ray.remote(num_cpus=4)
class MyClass(object):
    async def func(self, x, y):
        return x * y
    
    def my_func(self):
        a = [1, 2, 3]
        b = np.random.normal(0, 1, 10000)
        result = []
        # we wish to parallelise over the array `a`
        for sub_array in np.array_split(a, 3):
            result.append(self.func.remote(sub_array, b))
        return result

【讨论】:

以上是关于如何在 python 的类中使用光线并行性?的主要内容,如果未能解决你的问题,请参考以下文章

如何在 Python 的 Tkinter 的类中使用 slider.on_changed()

在 boost.python 中;如何公开包含在另一个类中的类(通过组合)?

在 boost.python 中;如何公开包含在另一个类中的类(通过组合)?

如何在单独的类中引用我的窗口变量?

如何在ScalaTest中按顺序运行类中的测试?

如何在 C 中使用线程显示并行性?