Python 在多处理进程之间共享一个双端队列
Posted
技术标签:
【中文标题】Python 在多处理进程之间共享一个双端队列【英文标题】:Python sharing a deque between multiprocessing processes 【发布时间】:2018-06-25 10:31:57 【问题描述】:在过去的一小时里,我一直在研究以下问题,但没有任何运气:
Python sharing a dictionary between parallel processes
multiprocessing: sharing a large read-only object between processes?
multiprocessing in python - sharing large object (e.g. pandas dataframe) between multiple processes
我编写了一个非常基本的测试文件来说明我想要做什么:
from collections import deque
from multiprocessing import Process
import numpy as np
class TestClass:
def __init__(self):
self.mem = deque(maxlen=4)
self.process = Process(target=self.run)
def run(self):
while True:
self.mem.append(np.array([0, 1, 2, 3, 4]))
def print_values(x):
while True:
print(x)
test = TestClass()
process = Process(target=print_values(test.mem))
test.process.start()
process.start()
目前输出如下:
deque([], maxlen=4)
如何从主代码或运行“print_values”的进程访问内存值?
【问题讨论】:
您需要阅读exchanging objects 或sharing state between processes。您的子进程每个都获得双端队列的分叉副本,它们之间没有进一步的连接。你可能想要managedQueue
。
【参考方案1】:
所以通过结合@bivouac0 提供的代码和@Marijn Pieters 发布的评论,我想出了以下解决方案:
from multiprocessing import Process, Manager, Queue
class testClass:
def __init__(self, maxlen=4):
self.mem = Queue(maxsize=maxlen)
self.process = Process(target=self.run)
def run(self):
i = 0
while True:
self.mem.empty()
while not self.mem.full():
self.mem.put(i)
i += 1
def print_values(queue):
while True:
values = queue.get()
print(values)
if __name__ == "__main__":
test = testClass()
print_process = Process(target=print_values, args=(test.mem,))
test.process.start()
print_process.start()
test.process.join()
print_process.join()
【讨论】:
【参考方案2】:很遗憾,multiprocessing.Manager()
不支持deque
,但它可以与list
、dict
、Queue
、Value
和Array
一起使用。 list
相当接近,所以我在下面的示例中使用了它..
from multiprocessing import Process, Manager, Lock
import numpy as np
class TestClass:
def __init__(self):
self.maxlen = 4
self.manager = Manager()
self.mem = self.manager.list()
self.lock = self.manager.Lock()
self.process = Process(target=self.run, args=(self.mem, self.lock))
def run(self, mem, lock):
while True:
array = np.random.randint(0, high=10, size=5)
with lock:
if len(mem) >= self.maxlen:
mem.pop(0)
mem.append(array)
def print_values(mem, lock):
while True:
with lock:
print mem
test = TestClass()
print_process = Process(target=print_values, args=(test.mem, test.lock))
test.process.start()
print_process.start()
test.process.join()
print_process.join()
您必须小心使用管理器对象。您可以像使用它们引用的对象一样使用它们,但您不能执行类似...mem = mem[-4:]
来截断值,因为您正在更改引用的对象。
至于编码风格,我可能会将Manager
对象移到类外或将print_values
函数移到其中,但例如,这是可行的。如果您移动东西,请注意您不能在run
方法中直接使用self.mem
。你需要在启动进程时传入它,否则python在后台执行的fork
会创建一个新实例并且不会被共享。
希望这适用于您的情况,如果没有,我们可以尝试调整一下。
【讨论】:
感谢您抽出宝贵时间回答这个问题。我已经实现了您的代码,但出现以下错误: _pickle.PicklingError: Can't pickle以上是关于Python 在多处理进程之间共享一个双端队列的主要内容,如果未能解决你的问题,请参考以下文章