在类中使用 multiprocess.Pool.map

Posted

技术标签:

【中文标题】在类中使用 multiprocess.Pool.map【英文标题】:use multiprocess.Pool.map in a class 【发布时间】:2022-01-02 15:48:39 【问题描述】:
from multiprocessing import Pool

class Acc:
    def __init__(self):
        self.count = 0

    def multiprocess(self):
        pool = Pool(processes=4)
        result = pool.map(self.run, [1]*30)
        pool.close()
        pool.join()

    def run(self, i):
        self.count += i
        return self.count

a = Acc()
a.multiprocess()
print(a.count)

我想输出应该是30,但它是0。我不知道multiprocess.Pool.map 是如何工作的,以及它是如何与班级合作的。请详细告诉我。

顺便说一句,如果我在里面打印 self.count

    def run(self, i):
        print(self.count)
        self.count += i
        return self.count

它给了

0
1
0
1
00

1
10

1
00

11

0
1
00

1001



11

0
10

10

1

更令人困惑,为什么会有 0 和 1 混合。

【问题讨论】:

Thread safety 可能会给你帮助 多处理通过在进程之间复制对象来工作。池进程中的self与主进程中的self不一样。 【参考方案1】:

首先让我们通过在 print 语句中添加flush=True 让打印输出更加有序,这样每个打印输出都占据自己的行:

from multiprocessing import Pool

class Acc:
    def __init__(self):
        self.count = 0

    def multiprocess(self):
        pool = Pool(processes=4)
        result = pool.map(self.run, [1]*30)
        pool.close()
        pool.join()

    def run(self, i):
        print('i =', self.count, flush=True)
        self.count += i
        return self.count

if __name__ == '__main__':
    a = Acc()
    a.multiprocess()
    print('a.count =', a.count)

打印:

i = 0
i = 1
i = 0
i = 1
i = 0
i = 1
i = 0
i = 0
i = 1
i = 0
i = 1
i = 0
i = 1
i = 0
i = 0
i = 1
i = 0
i = 1
i = 1
i = 0
i = 1
i = 0
i = 0
i = 1
i = 0
i = 1
i = 1
i = 0
i = 1
i = 1
a.count = 0

分析

现在让我们分析发生了什么。 a = Acc() 的创建由主进程完成。正在执行的多处理池进程是不同的地址空间,因此当它们执行您的工作函数时,self.run,对象a 必须序列化/反序列化到将执行工作函数的进程的地址空间。在那个新的地址空间self.count 遇到初始值 0,它被打印出来,然后递增到 1 并返回。同时,并行地,对象a 被序列化/反序列化 3 次,因此其他 3 个进程可以执行相同的处理,它们也将打印 0 并返回值 1。但是由于所有这些递增都在发生对于存在于主进程地址空间以外的地址空间中的a 的副本,主进程中的原始a 保持不变。所以随着map函数继续执行,a被进一步从主进程复制到处理池,它总是和self.count = 0在一起。

那么问题就变成了为什么有时会打印i = 1 而不是i = 0

当您使用 iterable 执行 map 并指定 30 个元素时,默认情况下,这 30 个任务根据 chunksize 参数划分为“块”你提供的。由于我们采用默认 chunksize=Nonemap 函数会根据 iterable 的长度和池大小计算默认的 chunksize 值:

chunksize, remainder = divmod(len(iterable), 4 * pool_size)
if remainder:
    chunksize += 1

在这种情况下,池大小为 4,因此 chunksize 将被计算为 2。这意味着多处理池中的每个进程一次执行两个任务队列中的任务,因此它们正在处理同一个对象两次使用不同的 i 值(被忽略)。

如果我们指定 chunksize 为 1,以便每个进程一次只处理一个对象,那么我们有:

from multiprocessing import Pool

class Acc:
    def __init__(self):
        self.count = 0

    def multiprocess(self):
        pool = Pool(processes=4)
        result = pool.map(self.run, [1]*30, chunksize=1)
        pool.close()
        pool.join()

    def run(self, i):
        print('i =', self.count, flush=True)
        self.count += i
        return self.count

if __name__ == '__main__':
    a = Acc()
    a.multiprocess()
    print('a.count =', a.count)

打印;

i = 0
i = 0
i = 0
i = 0
i = 0
i = 0
i = 0
i = 0
i = 0
i = 0
i = 0
i = 0
i = 0
i = 0
i = 0
i = 0
i = 0
i = 0
i = 0
i = 0
i = 0
i = 0
i = 0
i = 0
i = 0
i = 0
i = 0
i = 0
i = 0
i = 0
a.count = 0

如果我们将 chunksize 指定为 30,以便单个进程针对单个对象处理所有任务:

from multiprocessing import Pool

class Acc:
    def __init__(self):
        self.count = 0

    def multiprocess(self):
        pool = Pool(processes=4)
        result = pool.map(self.run, [1]*30, chunksize=30)
        pool.close()
        pool.join()

    def run(self, i):
        print('i =', self.count, flush=True)
        self.count += i
        return self.count

if __name__ == '__main__':
    a = Acc()
    a.multiprocess()
    print('a.count =', a.count)

打印:

i = 0
i = 1
i = 2
i = 3
i = 4
i = 5
i = 6
i = 7
i = 8
i = 9
i = 10
i = 11
i = 12
i = 13
i = 14
i = 15
i = 16
i = 17
i = 18
i = 19
i = 20
i = 21
i = 22
i = 23
i = 24
i = 25
i = 26
i = 27
i = 28
i = 29
a.count = 0

在最后一种情况下,当然不会发生多处理,因为多处理池的单个进程处理了所有提交的任务。

【讨论】:

感谢您的解释。我不明白。【参考方案2】:

如果强制使用多处理,我将遵循以下方法。 由于我们希望并行运行我们的代码,我不希望在 map 中传递实例方法。 我会将run 转换为函数而不是方法。它将采用 and 参数并返回相同的值。

def run(i):
    return i

然后在multiprocess 方法中,我将循环获取pool.map 的返回值并将then 添加到self.count

def multiprocess(self):
    pool = Pool(processes=4)
    for r_value in pool.map(run, [1]*30):
        self.count += r_value
    pool.close()
    pool.join()

输出为

30

Process finished with exit code 0

完整代码:

from multiprocessing import Pool

def run(i):
    return i

class Acc:
    def __init__(self):
        self.count = 0

    def multiprocess(self):
        pool = Pool(processes=4)
        for r_value in pool.map(run, [1]*30):
            self.count += r_value
        pool.close()
        pool.join()



if __name__ =='__main__':
    a = Acc()
    a.multiprocess()
    print(a.count)

【讨论】:

你有一些严重的缩进错误,你没有解释为什么 OP 得到他们得到的输出。

以上是关于在类中使用 multiprocess.Pool.map的主要内容,如果未能解决你的问题,请参考以下文章

在类中使用联合

如何在类中使用 Pyomo 装饰器

友元在类中的使用

在类中使用自定义排序时出现编译错误 [重复]

如何在类中使用结构

在类中使用 malloc/free