我可以使用池回调实现多处理计数器吗?

Posted

技术标签:

【中文标题】我可以使用池回调实现多处理计数器吗?【英文标题】:Can I implement a counter for multiprocessing using pool callback? 【发布时间】:2015-08-27 09:47:31 【问题描述】:

我在 Google 上搜索了一下,以了解如何正确构建计数器以跟踪已完成工作的进度。到目前为止,似乎所有答案都涉及使用lockValue

我想知道是否可以使用回调来实现它。似乎回调是在主进程中执行的,而不是工人所在的子进程。我可以进一步假设它是在同一个线程中执行的,因此根本没有竞争条件吗?

import time
import multiprocessing
import os

Pool = multiprocessing.Pool

def sqr(a):
    time.sleep(0.5)
    print 'local '.format(os.getpid())
    return a * a

pool = Pool(processes=4)


class Counter(object):
    def __init__(self):
        self.value = 0

    def incr(self, x):
        self.value += 1
        print 'count '.format(self.value)
        print 'callback '.format(os.getpid())


counter = Counter()

r = [pool.apply_async(sqr, (x,), callback=counter.incr) for x in range(10)]
pool.close()
pool.join()

local 27155local 27154local 27156


count 1
callback 27152
count 2
callback 27152
count 3
callback 27152
local 27153
count 4
callback 27152
local 27155
count 5
callback 27152
local 27156
local 27154
count 6
callback 27152
count 7
callback 27152
local 27153
count 8
callback 27152
local 27155
count 9
callback 27152
local 27156
count 10
callback 27152
main 27152
main count 10

Process finished with exit code 0

更新

好的,看来link 解释了一些回调背后的机制。

所以实际上它在主进程的不同线程上运行。

但是,我仍然可以以相同的方式实现计数器吗,因为只有 1 个线程可以修改计数器?

【问题讨论】:

Who runs the callback when using apply_async method of a multiprocessing pool?的可能重复 【参考方案1】:

从@ami-tavory 评论中的 SO 链接来看,似乎回调可能都在同一个线程上调用。但是,由于文档或 api 中未指定此内容,因此我不会依赖它,因为它可能会在将来发生变化或取决于实现。

Python 没有原子增量(除了一些itertools trick that relies on the GIL),所以为了确保你是线程安全的,你必须使用锁或其他形式的同步。你为什么要避免它?它可以用作上下文管理器,这使得代码非常少:

from threading import Lock

class Counter(object):
    def __init__(self):
        self.value = 0
        self.lock = Lock()

    def incr(self, x):
        with self.lock:
            self.value += 1

另一种方法是使用imap_unordered,在结果可用时循环(在主线程中)并在那里更新您的进度/计数器。

【讨论】:

【参考方案2】:

或者您可以使用 imap_unordered 作为 bj0 提到的循环计数:

results = []
for count, result in enumerate(pool.imap_unordered(sqr, range(10)), 1):
    results.append(result)
    print(count)

就个人而言,我发现处理 imap_unordered() 返回的原始结果比处理 apply_async() 返回的 Result 对象更直接。

【讨论】:

以上是关于我可以使用池回调实现多处理计数器吗?的主要内容,如果未能解决你的问题,请参考以下文章

python 管道 事件 信号量 进程池(map/同步/异步)回调函数

单例模式

带计数器的多处理嵌套 for 循环

java并发

5种方法,教你判断线程池是不是全部完成

限流降级方案