多个生产者,单个消费者

Posted

技术标签:

【中文标题】多个生产者,单个消费者【英文标题】:Multiple producers, single consumer 【发布时间】:2011-02-24 13:37:35 【问题描述】:

我必须开发一个多线程应用程序,其中会有多个线程,每个线程都会生成需要保存在队列中的自定义事件日志(不是 Microsoft MSMQ)。

将有另一个线程从队列中读取日志数据并对其进行操作,并使用某些信息将日志信息保存到文件中。基本上,我们在这里实现了多生产者、单消费者范式。

谁能提供有关如何在 C++ 或 C# 中实现此功能的建议。

谢谢,

【问题讨论】:

【参考方案1】:

使用System.Collections.Concurrent中定义的BlockingCollection<T>很容易做到这一点。

基本上,您创建队列以便所有线程都可以访问它:

BlockingCollection<LogRecord> LogQueue = new BlockingCollection<LogRecord>();

每个生产者都将项目添加到队列中:

while (!Shutdown)

    LogRecord rec = CreateLogRecord(); // however that's done
    LogQueue.Add(rec);

消费者也会做类似的事情:

while (!Shutdown)

    LogRecord rec = LogQueue.Take();
    // process the record

默认情况下,BlockingCollection 使用ConcurrentQueue&lt;T&gt; 作为后备存储。 ConcurrentQueue 负责线程同步,BlockingCollection 在尝试获取项目时会进行非忙等待。也就是说,如果消费者在队列中没有项目时调用Take,它会进行非忙等待(无睡眠/旋转),直到有项目可用。

【讨论】:

也许 Onsumer 获得附加条件while (!Shutdown || LogQueue.&lt;notempty&gt;()) 会很有用(不知道它在 C# 中是如何表达的)。或者,如果 LogQueue 有办法停止,只使用这个。 你可以让生产者调用CompleteAdding,这将标记集合完成添加(即不能添加更多项目)。然后消费者可以使用while (LogQueue.TryTake(out rec, Timeout.Infinite))', which means it would empty the collection and then exit. TryTake`如果集合完成添加并且队列为空,则返回False @JimMischel 消费者出队后,进行一些处理,如何将结果返回给调用进程? @gdp:当线程完成某些工作时,有很多方法可以发出通知。你如何做取决于你所说的“调用过程”是什么意思。如果您有具体问题,我建议您发布一个问题。 @JimMischel 我实际上已经发布了一个问题,目前还没有回复。无论如何感谢您的回复。【参考方案2】:

您可以使用synchronized queue(如果您有 .NET 3.5 或更早版本的代码)甚至更好的是新的ConcurrentQueue&lt;T&gt;

【讨论】:

【参考方案3】:

您正在计划的是一个经典的生产者消费者队列,其中一个线程消耗队列中的项目来做一些工作。这可以包装成一个更高级别的结构,称为“演员”或“活动对象”。

基本上,这将队列和消费项目的线程包装到一个类中,其他线程在这个类上的所有异步方法都将消息放在队列中,由参与者的线程执行。在您的情况下,该类可能有一个方法 writeData 将数据存储在队列中并触发条件变量以通知参与者线程队列中有东西。如果没有等待条件变量,actor线程会查看队列中是否有任何数据。

这是一篇关于这个概念的好文章:

http://www.drdobbs.com/go-parallel/article/showArticle.jhtml;jsessionid=UTEXJOTLP0YDNQE1GHPSKH4ATMY32JVN?articleID=225700095

【讨论】:

以上是关于多个生产者,单个消费者的主要内容,如果未能解决你的问题,请参考以下文章

一个生产/消费者问题

如何在kafka中实现多个生产者和多个消费者

具有多个消费者的生产者使用 notify() 失败

C ++ 11中无锁的多生产者/消费者队列

Java多线程(实现多线程线程同步生产者消费者)

Windows 中的两个进程单个生产者/单个消费者。互斥量、事件或信号量哪个更好