我可以使用池回调实现多处理计数器吗?
Posted
技术标签:
【中文标题】我可以使用池回调实现多处理计数器吗?【英文标题】:Can I implement a counter for multiprocessing using pool callback? 【发布时间】:2015-08-27 09:47:31 【问题描述】:我在 Google 上搜索了一下,以了解如何正确构建计数器以跟踪已完成工作的进度。到目前为止,似乎所有答案都涉及使用lock
和Value
。
我想知道是否可以使用回调来实现它。似乎回调是在主进程中执行的,而不是工人所在的子进程。我可以进一步假设它是在同一个线程中执行的,因此根本没有竞争条件吗?
import time
import multiprocessing
import os
Pool = multiprocessing.Pool
def sqr(a):
time.sleep(0.5)
print 'local '.format(os.getpid())
return a * a
pool = Pool(processes=4)
class Counter(object):
def __init__(self):
self.value = 0
def incr(self, x):
self.value += 1
print 'count '.format(self.value)
print 'callback '.format(os.getpid())
counter = Counter()
r = [pool.apply_async(sqr, (x,), callback=counter.incr) for x in range(10)]
pool.close()
pool.join()
local 27155local 27154local 27156
count 1
callback 27152
count 2
callback 27152
count 3
callback 27152
local 27153
count 4
callback 27152
local 27155
count 5
callback 27152
local 27156
local 27154
count 6
callback 27152
count 7
callback 27152
local 27153
count 8
callback 27152
local 27155
count 9
callback 27152
local 27156
count 10
callback 27152
main 27152
main count 10
Process finished with exit code 0
更新
好的,看来link 解释了一些回调背后的机制。
所以实际上它在主进程的不同线程上运行。
但是,我仍然可以以相同的方式实现计数器吗,因为只有 1 个线程可以修改计数器?
【问题讨论】:
Who runs the callback when using apply_async method of a multiprocessing pool?的可能重复 【参考方案1】:从@ami-tavory 评论中的 SO 链接来看,似乎回调可能都在同一个线程上调用。但是,由于文档或 api 中未指定此内容,因此我不会依赖它,因为它可能会在将来发生变化或取决于实现。
Python 没有原子增量(除了一些itertools trick that relies on the GIL),所以为了确保你是线程安全的,你必须使用锁或其他形式的同步。你为什么要避免它?它可以用作上下文管理器,这使得代码非常少:
from threading import Lock
class Counter(object):
def __init__(self):
self.value = 0
self.lock = Lock()
def incr(self, x):
with self.lock:
self.value += 1
另一种方法是使用imap_unordered
,在结果可用时循环(在主线程中)并在那里更新您的进度/计数器。
【讨论】:
【参考方案2】:或者您可以使用 imap_unordered 作为 bj0 提到的循环计数:
results = []
for count, result in enumerate(pool.imap_unordered(sqr, range(10)), 1):
results.append(result)
print(count)
就个人而言,我发现处理 imap_unordered()
返回的原始结果比处理 apply_async()
返回的 Result 对象更直接。
【讨论】:
以上是关于我可以使用池回调实现多处理计数器吗?的主要内容,如果未能解决你的问题,请参考以下文章