Python 多处理和共享计数器
Posted
技术标签:
【中文标题】Python 多处理和共享计数器【英文标题】:How to increment a shared counter from multiple processes? 【发布时间】:2011-01-06 01:02:54 【问题描述】:我在使用多处理模块时遇到了问题。我正在使用具有 map 方法的工作人员池从大量文件中加载数据,并为每个文件使用自定义函数分析数据。每次处理一个文件时,我都希望更新一个计数器,以便我可以跟踪还有多少文件需要处理。 这是示例代码:
def analyze_data( args ):
# do something
counter += 1
print counter
if __name__ == '__main__':
list_of_files = os.listdir(some_directory)
global counter
counter = 0
p = Pool()
p.map(analyze_data, list_of_files)
我找不到解决办法。
【问题讨论】:
【参考方案1】:我正在使用 PyQT5 中的进程栏,所以我同时使用线程和池
import threading
import multiprocessing as mp
from queue import Queue
def multi(x):
return x*x
def pooler(q):
with mp.Pool() as pool:
count = 0
for i in pool.imap_unordered(ggg, range(100)):
print(count, i)
count += 1
q.put(count)
def main():
q = Queue()
t = threading.Thread(target=thr, args=(q,))
t.start()
print('start')
process = 0
while process < 100:
process = q.get()
print('p',process)
if __name__ == '__main__':
main()
我把这个放在 Qthread worker 中,它可以在可接受的延迟下工作
【讨论】:
【参考方案2】:一个极其简单的例子,从 jkp 的答案改变:
from multiprocessing import Pool, Value
from time import sleep
counter = Value('i', 0)
def f(x):
global counter
with counter.get_lock():
counter.value += 1
print("counter.value:", counter.value)
sleep(1)
return x
with Pool(4) as p:
r = p.map(f, range(1000*1000))
【讨论】:
【参考方案3】:更快的 Counter 类,无需两次使用 Value 的内置锁
class Counter(object):
def __init__(self, initval=0):
self.val = multiprocessing.RawValue('i', initval)
self.lock = multiprocessing.Lock()
def increment(self):
with self.lock:
self.val.value += 1
@property
def value(self):
return self.val.value
https://eli.thegreenplace.net/2012/01/04/shared-counter-with-pythons-multiprocessing https://docs.python.org/2/library/multiprocessing.html#multiprocessing.sharedctypes.Value https://docs.python.org/2/library/multiprocessing.html#multiprocessing.sharedctypes.RawValue
【讨论】:
与Value
与lock=True
基本相同,但这段代码更清晰。【参考方案4】:
问题是 counter
变量在您的进程之间没有共享:每个单独的进程都在创建自己的本地实例并增加它。
请参阅文档中的this section,了解可用于在进程之间共享状态的一些技术。在您的情况下,您可能希望在您的工作人员之间共享一个 Value
实例
这是您示例的工作版本(带有一些虚拟输入数据)。请注意,它使用了我在实践中会尽量避免的全局值:
from multiprocessing import Pool, Value
from time import sleep
counter = None
def init(args):
''' store the counter for later use '''
global counter
counter = args
def analyze_data(args):
''' increment the global counter, do something with the input '''
global counter
# += operation is not atomic, so we need to get a lock:
with counter.get_lock():
counter.value += 1
print counter.value
return args * 10
if __name__ == '__main__':
#inputs = os.listdir(some_directory)
#
# initialize a cross-process counter and the input lists
#
counter = Value('i', 0)
inputs = [1, 2, 3, 4]
#
# create the pool of workers, ensuring each one receives the counter
# as it starts.
#
p = Pool(initializer = init, initargs = (counter, ))
i = p.map_async(analyze_data, inputs, chunksize = 1)
i.wait()
print i.get()
【讨论】:
@jkp,如果没有全局变量,你会怎么做? - 我正在尝试使用一个类,但它并不像看起来那么容易。见***.com/questions/1816958/… 不幸的是,这个例子似乎有缺陷,因为counter.value += 1
在进程之间不是原子的,所以如果在几个进程中运行足够长的时间,这个值就会出错
与 Eli 所说的一致,Lock
必须围绕 counter value += 1
语句。见***.com/questions/1233222/…
注意应该是with counter.get_lock()
,而不是with counter.value.get_lock():
@jkp,正如@Jinghao-shi 所说,counter.value.get_lock()
会产生AttributeError: 'int' object has no attribute 'get_lock'
【参考方案5】:
没有竞争条件错误的计数器类:
class Counter(object):
def __init__(self):
self.val = multiprocessing.Value('i', 0)
def increment(self, n=1):
with self.val.get_lock():
self.val.value += n
@property
def value(self):
return self.val.value
【讨论】:
对于与joblib
s Parallel
一起使用的类似代码(此答案中的代码不适用于joblib
),请参阅github.com/davidheryanto/etc/blob/master/python-recipes/…
我还将return self
添加到increment
函数以启用链接以上是关于Python 多处理和共享计数器的主要内容,如果未能解决你的问题,请参考以下文章