使用blockingcollection和tasks .net 4 TPL的经典生产者消费者模式

Posted

技术标签:

【中文标题】使用blockingcollection和tasks .net 4 TPL的经典生产者消费者模式【英文标题】:classic producer consumer pattern using blockingcollection and tasks .net 4 TPL 【发布时间】:2011-09-24 14:49:36 【问题描述】:

请看下面的伪代码

//Single or multiple Producers produce using below method
    void Produce(object itemToQueue)
    
        concurrentQueue.enqueue(itemToQueue);
        consumerSignal.set;
    

    //somewhere else we have started a consumer like this
    //we have only one consumer
    void StartConsumer()
    
        while (!concurrentQueue.IsEmpty())
        
            if (concurrentQueue.TrydeQueue(out item))
            
                //long running processing of item
            
        
        consumerSignal.WaitOne();
    

如何移植我自古以来就使用的这种模式,以使用 taskfactory 创建的任务和 net 4 的新信号功能。换句话说,如果有人要使用 net 4 编写这种模式,它会是什么样子?伪代码很好。如您所见,我已经在使用 .net 4 concurrentQueue。如果可能,我如何使用任务并可能使用一些更新的信号机制。谢谢

感谢 Jon/Dan 解决了我的问题。甜的。 没有像过去那样的手动信号或 while(true) 或 while(itemstoProcess) 类型循环

//Single or multiple Producers produce using below method
 void Produce(object itemToQueue)
 
     blockingCollection.add(item);
 

 //somewhere else we have started a consumer like this
 //this supports multiple consumers !
 task(StartConsuming()).Start; 

 void StartConsuming()
 
     foreach (object item in blockingCollection.GetConsumingEnumerable())
     
                //long running processing of item
     
 

cancellations are handled using cancel tokens

【问题讨论】:

一个非常好的(因为它是一步一步的)解释和例子may be found here。 嗨 Gullu,请查看this code。这是一个关于如何将 BlockingCollection 用于生产者-消费者模式的简单工作示例。 【参考方案1】:

您将使用BlockingCollection<T>。文档中有一个示例。

该类是专门为使这变得微不足道而设计的。

【讨论】:

@user666490:没关系 - 你把它包装在 BlockingCollection<T> 中。如果你坚持直接做事,你基本上会在BlockingCollection<T>中重复一堆代码——为什么要重新发明***? @user666490:Jon 为您提供了针对生产者消费者问题的规范 .Net 4 解决方案。来自 MSDN:“为实现 IProducerConsumerCollection 的线程安全集合提供阻塞和边界功能。” Jon,如果你在上面提到的 msdn 页面上滚动到页面底部,你会看到下面的代码。与我在上面的帖子中展示的老式信号相比,这太蹩脚了。你不同意吗? // 使用 bc while (true) Console.WriteLine(bc.Take()); catch (InvalidOperationException) // IOE 表示 Take() 已在已完成的集合上调用 Console.WriteLine("That's All!"); 阻塞收集不会向消费者提供任何信号,除非消费者要轮询blockingcollection.IsCompleted/IsAddingCompleted。在经典模式中,我发布了一个生产者添加到队列中,向消费者发出信号并完成。使用阻塞 coll,我们可以将 coll 标记为已完成添加,然后将其置于无法添加更多项目的状态,直到消费者将所有项目取消队列。 @user666490:你不需要轮询——你只需要使用Take,它就会阻塞直到有一个项目准备好。如果您希望生产者阻塞直到队列中有新项目的空间,您可以指定一个有界容量。【参考方案2】:

您的第二个代码块看起来更好。但是,启动Task 然后立即等待它是没有意义的。只需调用Take,然后处理直接在消费线程上返回的项目。这就是生产者-消费者模式的意义所在。如果您认为工作项的处理足够密集以保证更多的消费者,那么一定要启动更多的消费者。 BlockingCollection 是安全的多个生产者多个消费者。

public class YourCode

  private BlockingCollection<object> queue = new BlockingCollection<object>();

  public YourCode()
  
    var thread = new Thread(StartConsuming);
    thread.IsBackground = true;
    thread.Start();
  

  public void Produce(object item)
  
    queue.Add(item);
  

  private void StartConsuming()
  
    while (true)
    
      object item = queue.Take();
      // Add your code to process the item here.
      // Do not start another task or thread. 
    
  

【讨论】:

为什么我们不应该在 StartConsuming 中启动另一个任务或线程? @Lokeshwer:这本身就可以了。不好的是开始另一个任务然后等待它。那是没有意义的。 优秀的例子;我可以建议添加使用取消令牌以确保完整性【参考方案3】:

我之前使用过一种模式,它创建了一种“按需”队列消费者(基于来自 ConcurrentQueue 的消费):

        private void FireAndForget(Action fire)
        
            _firedEvents.Enqueue(fire);
            lock (_taskLock)
            
                if (_launcherTask == null)
                
                    _launcherTask = new Task(LaunchEvents);
                    _launcherTask.ContinueWith(EventsComplete);
                    _launcherTask.Start();
                
            
        

        private void LaunchEvents()
        
            Action nextEvent;

            while (_firedEvents.TryDequeue(out nextEvent))
            
                if (_synchronized)
                
                    var syncEvent = nextEvent;
                    _mediator._syncContext.Send(state => syncEvent(), null);
                
                else
                
                    nextEvent();                        
                

                lock (_taskLock)
                
                    if (_firedEvents.Count == 0)
                    
                        _launcherTask = null;
                        break;
                    
                
            
        

        private void EventsComplete(Task task)
        
            if (task.IsFaulted && task.Exception != null)
            
                 // Do something with task Exception here
            
        

【讨论】:

谢谢。但我希望 net 4 new tpl pfx 应该使我发布的经典模式更容易维护/理解。你的方法对我来说太过分了。 如果您的主要目标是易于使用,Jon 的答案是最好的; BlockingCollection 是专门为这种非常常见的模式设计的(通过对 Take 的简单阻塞调用和对新 Cancellation 系统的内置支持。)

以上是关于使用blockingcollection和tasks .net 4 TPL的经典生产者消费者模式的主要内容,如果未能解决你的问题,请参考以下文章

BlockingCollection实现单体程序内队列

.NET(C#):线程安全集合的阻塞BlockingCollection的使用

异步简析之BlockingCollection实现生产消费模式

如何加快大块 BlockingCollection 的实现

创建异步任务队列 - 使用 BlockingCollection C#

C#阻塞队列BlockingCollection