如何实现具有多个消费者的单个队列
Posted
技术标签:
【中文标题】如何实现具有多个消费者的单个队列【英文标题】:How to implement a single queue with multiple consumers 【发布时间】:2022-01-14 15:34:01 【问题描述】:我有一个列表list_of_params
和一个函数run_train()
,它接收来自list_of_params
的项目(例如run_train(list_of_params[0])
)。我可以一次将run_train()
发送到多个 GPU。所以我想知道是否有任何可以并行化的单个队列的实现。
如果这还不够清楚,请想象以下场景:
“超市有一个顾客队列,但有5个收银员。一旦一个收银员空闲,它就会处理队列中下一个顾客的产品。这与每个收银员都有自己的线路相反。”
如果需要,我可以提供更多详细信息。
谢谢!
【问题讨论】:
【参考方案1】:试试queue 模块。 '线程安全的多生产者、多消费者队列'。
【讨论】:
我不理解“线程安全的多生产者、多消费者队列”。这是一种设计模式吗? 我的错,“线程安全的多生产者、多消费者队列”是指为您完成锁定和同步,并通过简单的 API(基本上是文档的第一段)公开。将它与生产者 - 消费者流程一起使用,或者您想要的任何方式。您可以使用任务数据预填充队列,然后让线程一一挑选任务。 IDK 您正在使用什么框架,以及它如何将任务分派到 GPU,也许它内部有某种队列,您可以使用。glfh
如果你使用多处理,那么你需要使用multiprocessing.Queue
。【参考方案2】:
下面是一个使用带有几个生产者和消费者进程的队列的示例:
from multiprocessing import Process, Queue, Event
#if you use threading instead of multiprocessing, use queue.Queue rather than multiprocessing.Queue
from queue import Empty
from time import sleep
from random import random
def producer(stopflag, out_queue):
while True:
if stopflag.is_set():
break
sleep(random()) #sleep anywhere from 0-1 sec avg 0.5 (producer will produce on average a maximum of 2 outputs / sec)
out_queue.put(random()) #may wait if the queue is currently full, thereby reducing production rate.
def consumer(stopflag, in_queue):
while True:
if stopflag.is_set():
break
x = in_queue.get()
sleep(x*2) #consumers work twice slower than producers averaging 1 item per sec
def main():
stopflag = Event()
shared_q = Queue(maxsize=100) # if the queue fills up, it will force the producers
# to wait for the consumers to catch up. Otherwise, it
# may grow infinitely (until the computer runs out of memory)
#2 producers
producers = [Process(target=producer, args=(stopflag, shared_q)) for _ in range(2)]
#4 consumers
consumers = [Process(target=consumer, args=(stopflag, shared_q)) for _ in range(4)]
for process in producers + consumers:
process.start()
sleep(20) #let them work a while
stopflag.set() #tell them to stop
for process in producers + consumers: #wait for them to stop
process.join()
#empty unfinished tasks from the queue so its thread can exit normally
#(it's good practice to clean up resources kind of like closing files when done with them)
try:
while True:
shared_q.get_nowait()
except Empty:
pass
if __name__ == '__main__':
main()
【讨论】:
以上是关于如何实现具有多个消费者的单个队列的主要内容,如果未能解决你的问题,请参考以下文章