Python 在多处理进程之间共享一个双端队列

Posted

技术标签:

【中文标题】Python 在多处理进程之间共享一个双端队列【英文标题】:Python sharing a deque between multiprocessing processes 【发布时间】:2018-06-25 10:31:57 【问题描述】:

在过去的一小时里,我一直在研究以下问题,但没有任何运气:

Python sharing a dictionary between parallel processes

multiprocessing: sharing a large read-only object between processes?

multiprocessing in python - sharing large object (e.g. pandas dataframe) between multiple processes

我编写了一个非常基本的测试文件来说明我想要做什么:

from collections import deque
from multiprocessing import Process
import numpy as np


class TestClass:
    def __init__(self):
        self.mem = deque(maxlen=4)
        self.process = Process(target=self.run)

    def run(self):
        while True:
            self.mem.append(np.array([0, 1, 2, 3, 4]))


def print_values(x):
    while True:
        print(x)


test = TestClass()
process = Process(target=print_values(test.mem))

test.process.start()
process.start()

目前输出如下:

deque([], maxlen=4)

如何从主代码或运行“print_values”的进程访问内存值?

【问题讨论】:

您需要阅读exchanging objectssharing state between processes。您的子进程每个都获得双端队列的分叉副本,它们之间没有进一步的连接。你可能想要managed Queue 【参考方案1】:

所以通过结合@bivouac0 提供的代码和@Marijn Pieters 发布的评论,我想出了以下解决方案:

from multiprocessing import Process, Manager, Queue


class testClass:
    def __init__(self, maxlen=4):
        self.mem = Queue(maxsize=maxlen)
        self.process = Process(target=self.run)

    def run(self):
        i = 0

        while True:
            self.mem.empty()
            while not self.mem.full():
                self.mem.put(i)
                i += 1


def print_values(queue):
    while True:
        values = queue.get()
        print(values)


if __name__ == "__main__":
    test = testClass()
    print_process = Process(target=print_values, args=(test.mem,))

    test.process.start()
    print_process.start()

    test.process.join()
    print_process.join()

【讨论】:

【参考方案2】:

很遗憾,multiprocessing.Manager() 不支持deque,但它可以与listdictQueueValueArray 一起使用。 list 相当接近,所以我在下面的示例中使用了它..

from multiprocessing import Process, Manager, Lock
import numpy as np

class TestClass:
    def __init__(self):
        self.maxlen = 4
        self.manager = Manager()
        self.mem = self.manager.list()
        self.lock = self.manager.Lock()
        self.process = Process(target=self.run, args=(self.mem, self.lock))

    def run(self, mem, lock):
        while True:
            array = np.random.randint(0, high=10, size=5)
            with lock:
                if len(mem) >= self.maxlen:
                    mem.pop(0)
                mem.append(array)

def print_values(mem, lock):
    while True:
        with lock:
            print mem

test = TestClass()
print_process = Process(target=print_values, args=(test.mem, test.lock))
test.process.start()
print_process.start()

test.process.join()
print_process.join()

您必须小心使用管理器对象。您可以像使用它们引用的对象一样使用它们,但您不能执行类似...mem = mem[-4:] 来截断值,因为您正在更改引用的对象。

至于编码风格,我可能会将Manager 对象移到类外或将print_values 函数移到其中,但例如,这是可行的。如果您移动东西,请注意您不能在run 方法中直接使用self.mem。你需要在启动进程时传入它,否则python在后台执行的fork会创建一个新实例并且不会被共享。

希望这适用于您的情况,如果没有,我们可以尝试调整一下。

【讨论】:

感谢您抽出宝贵时间回答这个问题。我已经实现了您的代码,但出现以下错误: _pickle.PicklingError: Can't pickle : attribute lookup weakref on builtins failed 我尝试使用以下代码,但没有任何运气: test.process = Process(target=test.run, args=(test.mem, test.lock)) 我已经使用您提供的部分代码和@Marijn Pieters 给我的提示解决了这个问题。我将在下面发布我的解决方案:)

以上是关于Python 在多处理进程之间共享一个双端队列的主要内容,如果未能解决你的问题,请参考以下文章

Python实现双端队列

python:双端队列与列表性能比较

第十一节 双端队列的概念和python代码实现

python 实现双端队列

Python双端队列 实现回文检测

Python双端队列追加问题