如何实现具有多个消费者的单个队列

Posted

技术标签:

【中文标题】如何实现具有多个消费者的单个队列【英文标题】:How to implement a single queue with multiple consumers 【发布时间】:2022-01-14 15:34:01 【问题描述】:

我有一个列表list_of_params 和一个函数run_train(),它接收来自list_of_params 的项目(例如run_train(list_of_params[0]))。我可以一次将run_train() 发送到多个 GPU。所以我想知道是否有任何可以并行化的单个队列的实现。

如果这还不够清楚,请想象以下场景:

“超市有一个顾客队列,但有5个收银员。一旦一个收银员空闲,它就会处理队列中下一个顾客的产品。这与每个收银员都有自己的线路相反。”

如果需要,我可以提供更多详细信息。

谢谢!

【问题讨论】:

【参考方案1】:

试试queue 模块。 '线程安全的多生产者、多消费者队列'。

【讨论】:

我不理解“线程安全的多生产者、多消费者队列”。这是一种设计模式吗? 我的错,“线程安全的多生产者、多消费者队列”是指为您完成锁定和同步,并通过简单的 API(基本上是文档的第一段)公开。将它与生产者 - 消费者流程一起使用,或者您想要的任何方式。您可以使用任务数据预填充队列,然后让线程一一挑选任务。 IDK 您正在使用什么框架,以及它如何将任务分派到 GPU,也许它内部有某种队列,您可以使用。 glfh 如果你使用多处理,那么你需要使用multiprocessing.Queue【参考方案2】:

下面是一个使用带有几个生产者和消费者进程的队列的示例:

from multiprocessing import Process, Queue, Event
#if you use threading instead of multiprocessing, use queue.Queue rather than multiprocessing.Queue
from queue import Empty
from time import sleep
from random import random

def producer(stopflag, out_queue):
    while True:
        if stopflag.is_set():
            break
        sleep(random()) #sleep anywhere from 0-1 sec avg 0.5 (producer will produce on average a maximum of 2 outputs / sec)
        out_queue.put(random()) #may wait if the queue is currently full, thereby reducing production rate.
        
def consumer(stopflag, in_queue):
    while True:
        if stopflag.is_set():
            break
        x = in_queue.get()
        sleep(x*2) #consumers work twice slower than producers averaging 1 item per sec

def main():
    stopflag = Event()
    
    shared_q = Queue(maxsize=100) # if the queue fills up, it will force the producers
                                  #   to wait for the consumers to catch up. Otherwise, it
                                  #   may grow infinitely (until the computer runs out of memory)
    
    #2 producers
    producers = [Process(target=producer, args=(stopflag, shared_q)) for _ in range(2)]
    #4 consumers
    consumers = [Process(target=consumer, args=(stopflag, shared_q)) for _ in range(4)]
    
    for process in producers + consumers:
        process.start()
        
    sleep(20) #let them work a while
    
    stopflag.set() #tell them to stop
    
    for process in producers + consumers: #wait for them to stop
        process.join()
        
    #empty unfinished tasks from the queue so its thread can exit normally
    #(it's good practice to clean up resources kind of like closing files when done with them)
    try:
        while True:
            shared_q.get_nowait()
    except Empty:
        pass
    
if __name__ == '__main__':
    main()

【讨论】:

以上是关于如何实现具有多个消费者的单个队列的主要内容,如果未能解决你的问题,请参考以下文章

RabbitMQ 如何实现对同一个应用的多个节点进行广播

具有多个消费者但只有一个活动的 MQ 队列

C ++ 11中无锁的多生产者/消费者队列

如何从单个数据框中创建具有不同内容的多个 PDF?

RabbitMQ / AMQP:单个队列,同一消息的多个消费者?

什么是阻塞队列?阻塞队列的实现原理是什么?如何使用阻塞队列来实现生产者-消费者模型?