为啥 Python 多处理队列会弄乱字典?

Posted

技术标签:

【中文标题】为啥 Python 多处理队列会弄乱字典?【英文标题】:Why does Python multiprocessing Queue messes up dictionaries?为什么 Python 多处理队列会弄乱字典? 【发布时间】:2015-01-11 13:14:31 【问题描述】:

我正在尝试在 python 中创建一个多进程、多线程程序。到目前为止,我已经成功了,但是我遇到了一个一直困扰我的问题。

我有 3 节课。主类是管理器,它创建一个或多个子进程(Subprocess 类)并通过专用的 multiprocessing.Queue 连接到每个子进程。然后,它通过队列发送这些子进程命令以创建套接字管理线程(Server_Thread 类)。 Server_Thread的配置选项在Manager类中创建,并以字典的形式通过队列传递给子进程。

代码如下

import threading
import multiprocessing
import socket
import time


class Server_Thread(threading.Thread):
    def __init__(self, client_config):
        threading.Thread.__init__(self)
        self.address = client_config['address']
        self.port = client_config['port']

    def run(self):
        self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        print "Binding to: local host, port = ", self.port 
        self.socket.bind((socket.gethostname(), self.port))
        self.socket.listen(1)

        self.running = True
        while self.running:    
            client_socket, client_address = self.socket.accept()
            # do stuff

    def stop(self):
        self.running = False


class Subprocess(multiprocessing.Process):
    def __init__(self, queue):
        multiprocessing.Process.__init__(self)
        self.queue = queue
        self.server_thread_list = []

    def run(self):
        self.running = True
        while self.running:
            command = self.queue.get()
            if command[0] == "create_client":
                server_thread = Server_Thread(command[1])
                server_thread.start()
                self.server_thread_list.append(server_thread)
            elif command[0] == "terminate":
                self.running = False
        for server_thread in self.server_thread_list:
            server_thread.stop()
            server_thread.join()


class Manager:
    def __init__(self):
        self.client_config =      
        self.client_config['junk'] = range(10000)    # actually contains lots of stuff
        self.client_config['address'] = 'localhost'

    def run(self):
        current_bind_port = 40001
        self.queue = multiprocessing.Queue()
        subprocess = Subprocess(self.queue)
        subprocess.start()
        for i in range(20):
            print "creating socket thread at port =", current_bind_port
            self.client_config['port'] = current_bind_port
            self.queue.put(("create_client", self.client_config.copy()))    # pass a dictionary copy
            current_bind_port += 1
        time.sleep(10)
        self.queue.put(("terminate", None))
        subprocess.join()


if __name__ == "__main__":
    manager = Manager()
    manager.run()

问题是当我运行它时,有时它运行正常,但有时,配置字典在队列中被弄乱了。我认为这与队列被填充的速度和被清空的速度有关,并且我认为它会在没有警告的情况下溢出。

经过一些重组的输出(多个进程与打印混为一谈)

>Python temp.py
creating socket thread at port = 40001
creating socket thread at port = 40002
creating socket thread at port = 40003
creating socket thread at port = 40004
creating socket thread at port = 40005
creating socket thread at port = 40006
creating socket thread at port = 40007
creating socket thread at port = 40008
creating socket thread at port = 40009
creating socket thread at port = 40010
creating socket thread at port = 40011
creating socket thread at port = 40012
creating socket thread at port = 40013
creating socket thread at port = 40014
creating socket thread at port = 40015
creating socket thread at port = 40016
creating socket thread at port = 40017
creating socket thread at port = 40018
creating socket thread at port = 40019
creating socket thread at port = 40020  << OK

Binding to: local host, port =  40001
Binding to: local host, port =  40020  << NOT OK from here
Binding to: local host, port =  40020
Binding to: local host, port =  40020
Binding to: local host, port =  40020
Binding to: local host, port =  40020
Binding to: local host, port =  40020
Binding to: local host, port =  40020
Binding to: local host, port =  40020
Binding to: local host, port =  40020
Binding to: local host, port =  40020
Binding to: local host, port =  40020
Binding to: local host, port =  40020
Binding to: local host, port =  40020
Binding to: local host, port =  40020
Binding to: local host, port =  40020
Binding to: local host, port =  40020
Binding to: local host, port =  40020
Binding to: local host, port =  40020
Binding to: local host, port =  40020

Exception in thread Thread-4:
Traceback (most recent call last):
  File "C:\Python27\lib\threading.py", line 810, in __bootstrap_inner
    self.run()
  File "Y:\cStation\Python\iReact connection PoC\temp.py", line 18, in run
    self.socket.bind((socket.gethostname(), self.port))
  File "C:\Python27\lib\socket.py", line 224, in meth
    return getattr(self._sock,name)(*args)
error: [Errno 10048] Only one usage of each socket address (protocol/network address/port) is normally permitted

.... Get this message several more times ....

如果我在将每个 create_thread 命令放入队列后插入“time.sleep(0.1)”命令,问题似乎会变得不那么频繁(但不会完全消失)。

有趣的是,带有"create_thread" 命令的元组传输没有问题,问题似乎是值字典。有没有办法确保在没有time.wait() 的情况下将值放入队列之前可以写入队列?我试过输入一个while not self.queue.empty(): pass,但在几个命令之后似乎永远卡住了......

【问题讨论】:

【参考方案1】:

我在发送包含 **big numpy 数组** 的字典时遇到了这个问题。经过对不同事物的大量尝试和测试,我想出了以下几点:

“不要通过多处理队列发送巨大或大的对象”

但是你可以做一些事情:

1- 在发送大对象后创建延迟,并确保队列腌制这个大对象(或消费者收到此消息)

2- 复制您的对象并在通过队列发送另一个对象之前创建延迟

3- 对于字典,确保在通过队列发送字典时不要更改字典(使用复制、延迟、锁定等)

希望对你有帮助

但是,需要进一步调查以澄清根本原因。

【讨论】:

以上是关于为啥 Python 多处理队列会弄乱字典?的主要内容,如果未能解决你的问题,请参考以下文章

为啥 matplotlib.PatchCollection 会弄乱补丁的颜色?

为啥 OPTIND 会弄乱我的位置参数?

为啥 fread 会弄乱我的字节顺序?

为啥 PHP 会弄乱我的 CSS?

为啥浏览器的后退按钮会弄乱我的 Vue 组件?

为啥在python里推荐使用多进程而不是多线程