需要一个线程安全的异步消息队列
Posted
技术标签:
【中文标题】需要一个线程安全的异步消息队列【英文标题】:Need a thread-safe asynchronous message queue 【发布时间】:2013-05-27 07:26:51 【问题描述】:我正在寻找一个 Python 类(最好是标准语言的一部分,而不是第 3 方库)来管理异步“广播风格”消息传递。
我将有一个线程将消息放入队列('putMessageOnQueue' 方法不得阻塞),然后有多个其他线程都将等待消息,大概调用了一些阻塞'waitForMessage' 函数。当一条消息被放入队列时,我希望每个等待的线程都获得自己的消息副本。
我查看了内置的Queue
类,但我认为这不合适,因为使用消息似乎涉及将它们从队列中删除,因此只有 1 个客户端线程会看到每个消息。
这似乎应该是一个常见的用例,有人可以推荐一个解决方案吗?
【问题讨论】:
我相信您可以构建自己的类来跟踪哪个线程收到了哪个消息,而不会出现很多问题。 【参考方案1】:我认为解决此问题的典型方法是为每个线程使用单独的消息队列,并将消息推送到先前已注册有兴趣接收此类消息的每个队列。
这样的东西应该可以工作,但它是未经测试的代码......
from time import sleep
from threading import Thread
from Queue import Queue
class DispatcherThread(Thread):
def __init__(self, *args, **kwargs):
super(DispatcherThread, self).__init__(*args, **kwargs)
self.interested_threads = []
def run(self):
while 1:
if some_condition:
self.dispatch_message(some_message)
else:
sleep(0.1)
def register_interest(self, thread):
self.interested_threads.append(thread)
def dispatch_message(self, message):
for thread in self.interested_threads:
thread.put_message(message)
class WorkerThread(Thread):
def __init__(self, *args, **kwargs):
super(WorkerThread, self).__init__(*args, **kwargs)
self.queue = Queue()
def run(self):
# Tell the dispatcher thread we want messages
dispatcher_thread.register_interest(self)
while 1:
# Wait for next message
message = self.queue.get()
# Process message
# ...
def put_message(self, message):
self.queue.put(message)
dispatcher_thread = DispatcherThread()
dispatcher_thread.start()
worker_threads = []
for i in range(10):
worker_thread = WorkerThread()
worker_thread.start()
worker_threads.append(worker_thread)
dispatcher_thread.join()
【讨论】:
完美,效果很好!很遗憾没有现成的版本,但我想一旦有人解释清楚(就像你所做的那样),原理就不会那么复杂。 @codebox 好吧,multiprocessing
模块提供了更好的支持,但这是针对子进程而不是线程的。我猜这是因为进程间通信通常比线程间通信更复杂,因为线程自然共享同一个堆。
如果您需要一位作家的广播,队列是最佳解决方案吗?也许最好使用 write-once/read-many 结构,每个进程可以在自己的空闲时间并发读取?【参考方案2】:
我认为这是一个更直接的示例(取自 Python Lib 中的队列示例)
from threading import Thread
from Queue import Queue
num_worker_threads = 2
def worker():
while True:
item = q.get()
do_work(item)
q.task_done()
q = Queue()
for i in range(num_worker_threads):
t = Thread(target=worker)
t.daemon = True
t.start()
for item in source():
q.put(item)
q.join() # block until all tasks are done
【讨论】:
这如何满足问题的要求?他明确表示队列不起作用,因为每个线程都需要该项目的副本。以上是关于需要一个线程安全的异步消息队列的主要内容,如果未能解决你的问题,请参考以下文章