BlockingCollection 中的多个消费者是不是同时处理?

Posted

技术标签:

【中文标题】BlockingCollection 中的多个消费者是不是同时处理?【英文标题】:Are multiple consumers in BlockingCollection handled concurrently?BlockingCollection 中的多个消费者是否同时处理? 【发布时间】:2021-05-18 04:32:46 【问题描述】:

我有一个 BlockingCollection,我从一个线程写入并从另一个线程读取。 生产者线程获取从服务器接收的项目并将它们添加到 BlockingCollection,而读取线程尝试清空 BlockingCollection 并处理它们。

我正在尝试批量清空队列的问题,因为一个一个处理它们会太慢。但是当它不断被写入(数千个项目)时,消费者线程会一直读取它们直到它被清空,这意味着在写入完成之前处理甚至不会开始。

现在,消费者中的处理可以并行完成,所以我一直在想如何去做。

目前我有两个想法:

    从消费者中的 BlockingCollection 读取一定数量的项目后,启动一个新的并行作业来处理它们,而不是等待完全清空队列然后开始处理。

    使用多个消费者并希望它们并行运行,而不是在尝试同时读取 BlockingCollection 时不断地相互阻塞。

所以我的问题是关于选项 2 - BlockingCollection 是否针对这种情况进行了内部优化?它会划分读取的区域,还是消费者会为每个项目争吵?如果是这样,那选项 1 更胜一筹?

【问题讨论】:

Fildor 是在建议选项 1,确定吗? But when it's being constantly written to (thousands of items), then the consumer thread keeps reading them until it's emptied, which means that the processing will not even start until the writing is done.这句话我读了好几遍了,不知道是什么意思。 @Fildor 啊,在这种情况下,OP 应该“停止这样做”。 :) Use multiple consumers and hope that they will run in parallel instead of just constantly blocking each other while trying to read the BlockingCollection at the same time. 这确实是它的工作,所以我认为它会没事的。但可以确定的是个人资料。如果有问题,请让单个消费者将条目批量输入List<entry>,然后将它们添加到 second BlockingCollection(由多个阅读器使用)以减少争用。但老实说,这不太可能需要。 在您跳转到更多定制解决方案之前,您为什么不尝试对您的当前解决方案进行简单修复,该解决方案现已由 2 个用户提出建议?如果这不让您满意,您为什么不采取相反的方式,尝试由非常聪明的人和数十名测试人员创建的更高抽象,并且已经在该领域使用了几年?跨度> 【参考方案1】:

添加另一种选择:(绝不是生产就绪!)

这利用了 TPL 的数据流,BatchBlock<T> 为我们抽象了批处理。

using System;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;

public class HoneyBatcher

    private const int BATCHSIZE = 10; // Find the size that works best for you.
    private readonly BatchBlock<Honey> batchBlock;
    private readonly ExecutionDataflowBlockOptions _options = 
                     new ExecutionDataflowBlockOptions()
    
         // I'd start with 1, then benchmark if higher number actually benefits.
         MaxDegreeOfParallelism = 1, 
         SingleProducerConstrained = true // if so, may micro-optimize throughput
    ;
                       // vv Whatever process you want done on a batch
    public HoneyBatcher( Action<Honey[]> batchProcessor )
    
        // BatchBlock does the batching
        // and is the entrypoint to the pipline.
        batchBlock = new BatchBlock<Honey>(BATCHSIZE);
        // processorBlock processes each batch that batchBlock will produce
        // Parallel executions as well as other tweaks can be configured through options.
        ActionBlock<Honey[]> processorBlock = 
                             new ActionBlock<Honey[]>(batchProcessor, _options);
        // build the pipline
        batchBlock.LinkTo(processorBlock);
        // item => batchBlock => item[BATCHSIZE] => batchProcessor(item[])
    

    // Add item individually and have them batched up
    // and processed in a pipeline.
    public Task<bool> ProcessAsync(Honey item)
    
        return batchBlock.SendAsync(item);
        // Can also be done with sync API.
    


public class Honey 

    // Just a dummy

请注意,上面的 sn-p 只是想法的粗略布局。在生产中,您当然会处理错误处理、完成等问题。

【讨论】:

【参考方案2】:

批量处理项目的一种自然方法是将它们插入BlockingCollection 中,而不是尝试在以后批量删除它们。换句话说,您可以使用BlockingCollection&lt;T[]&gt; 而不是BlockingCollection&lt;T&gt;。生产者线程可以使用Queue&lt;T&gt; 轻松完成批处理:

var queue = new Queue<T>;
while (someCondition)

    var item = ProduceItem();
    queue.Enqueue(item);
    if (queue.Count == batchSize)
    
        blockingCollection.Add(queue.ToArray());
        queue.Clear();
    

if (queue.Count > 0)

    blockingCollection.Add(queue.ToArray());
    queue.Clear();

blockingCollection.CompleteAdding();

根据具体情况,您还可以使用一些 LINQ 样式的运算符,例如 MoreLinq 库中的 Batch

最后,回答您的主要问题,是的,BlockingCollection 类可以出色地处理多个消费者和多个生产者。如果集合为空,则所有消费者都会被阻止,并且当物品到达时,它会被提供给等待中的消费者之一。

【讨论】:

请注意Batch 不能很好地与任何实现ICollection 的并发数据结构配合使用。 @mjwills 哦,我不知道。你有什么要读的,你会推荐吗? @Fildor 大多数 LINQ 都是这种情况 - 但请参阅 github.com/morelinq/MoreLINQ/issues/794 示例。 @Fildor 另一个例子:Calling ToList() on ConcurrentDictionary<TKey, TValue> while adding items。来自documentation:“通过ConcurrentDictionary&lt;TKey,TValue&gt; 实现的接口之一访问的成员(包括扩展方法)不能保证是线程安全的,可能需要由调用者同步。”

以上是关于BlockingCollection 中的多个消费者是不是同时处理?的主要内容,如果未能解决你的问题,请参考以下文章

异步简析之BlockingCollection实现生产消费模式

csharp .NET 4 BlockingCollection的生产者/消费者实现

使用blockingcollection和tasks .net 4 TPL的经典生产者消费者模式

利用BlockingCollection实现生产者和消费者队列,实现写文本

如何加快大块 BlockingCollection 的实现

C#阻塞队列BlockingCollection