并行执行列表中每个对象的方法

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了并行执行列表中每个对象的方法相关的知识,希望对你有一定的参考价值。

我有一个对象列表,我想并行执行每个对象中的方法。该方法修改对象的属性。例如:

class Object:
    def __init__(self, a):
        self.a = a
    def aplus(self):
        self.a += 1

object_list = [Object(1), Object(2), Object(3)]

# I want to execute this in parallel
for i in range(len(object_list)):
    object_list[i].aplus() 

我尝试了以下方法:

from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor

executor = ProcessPoolExecutor(max_workers=3)
res = executor.map([obj.aplus for obj in object_list])

哪个不起作用,使对象保持不变。我认为这是因为只能通过多处理来复制和访问对象。任何的想法?

非常感谢!

编辑:据说对象非常大,所以最好避免将它们复制到每个进程。这些方法也被认为非常占用CPU,因此应该使用多个进程而不是线程。在这些条件下,我相信没有解决方案,因为多处理不能共享内存,线程不能使用多个CPU。我想显示错误。

答案

这是我的答案,使用threading

from threading import Thread

class Object:
    def __init__(self, a):
        self.a = a
    def aplus(self):
        self.a += 1

object_list = [Object(1), Object(2), Object(3)]

# A list containing all threads we will create
threads = []

# Create a thread for every objects
for obj in object_list:
    thread = Thread(target=obj.aplus)
    thread.daemon = True
    thread.start()
    threads.append(thread)

# Wait for all threads to finish before continuing
for thread in threads:
    thread.join();

# prints results
for obj in object_list:
    print(obj.a)
另一答案

这是一个使用Pool.map的工作示例:

import multiprocessing

class Object:
    def __init__(self, a):
        self.a = a

    def aplus(self):
        self.a += 1

    def __str__(self):
        return str(self.a)

def worker(obj):
    obj.aplus()
    return obj

if __name__ == "__main__":
    object_list = [Object(1), Object(2), Object(3)]

    try:
        processes = multiprocessing.cpu_count()
    except NotImplementedError:
        processes = 2

    pool = multiprocessing.Pool(processes=processes)
    modified_object_list = pool.map(worker, object_list)

    for obj in modified_object_list:
        print(obj)

打印:

2
3
4
另一答案

我认为这是因为只能通过多处理来复制和访问对象。

这是完全正确的,也是答案的一半。因为这些过程是孤立的,所以每个过程都有自己的object_list副本。这里的一个解决方案是使用ThreadPoolExecutor(线程都共享相同的object_list)。

使用它的语法与您尝试使用的语法略有不同,但这可以按预期工作:

executor = ThreadPoolExecutor(max_workers=3)
res = executor.map(Object.aplus, object_list)

如果你真的想使用ProcessPoolExecutor那么你需要以某种方式从过程中获取数据。最简单的方法是使用返回值的函数:

from concurrent.futures import ProcessPoolExecutor


class Object:
    def __init__(self, a):
        self.a = a

    def aplus(self):
        self.a += 1
        return self.a


if __name__ == '__main__':

    object_list = [Object(1), Object(2), Object(3)]

    executor = ProcessPoolExecutor(max_workers=3)
    for result in executor.map(Object.aplus, object_list):
        print("I got: " + str(result))

您甚至可以使用maping函数返回self,然后将这些返回的对象放回到object_list中。所以完整的多处理解决方案看起来像:

from concurrent.futures import ProcessPoolExecutor


class Object:
    def __init__(self, a):
        self.a = a

    def aplus(self):
        self.a += 1
        return self


if __name__ == '__main__':

    object_list = [Object(1), Object(2), Object(3)]

    executor = ProcessPoolExecutor(max_workers=3)
    object_list = list(executor.map(Object.aplus, object_list))

以上是关于并行执行列表中每个对象的方法的主要内容,如果未能解决你的问题,请参考以下文章

Sleep() 方法后的代码片段没有被执行

如何在 python 中并行化以下代码片段?

在每个用户的Rails中使用片段缓存

ViewPager的两个片段并行运行

jQuery的$.ajax方法支持同时创建多个异步ajax请求吗?而且这些ajax对象必须是并行处理的

如何在列表中添加附加字典的每个并行进程的时间戳?