如何加快大块 BlockingCollection 的实现

Posted

技术标签:

【中文标题】如何加快大块 BlockingCollection 的实现【英文标题】:How to speed up a chunky BlockingCollection implementation 【发布时间】:2019-11-16 13:27:30 【问题描述】:

我已经多次使用BlockingCollection 来实现生产者/消费者模式,但是由于相关的开销,我在处理极其精细的数据时遇到了糟糕的性能。这通常迫使我通过对数据进行分块/分区来即兴创作,换句话说,使用BlockingCollection<T[]> 而不是BlockingCollection<T>。这是resent example。这可行,但它很丑陋且容易出错。我最终在生产者和消费者处都使用了嵌套循环,并且我必须记住 Add 在生产者的工作负载结束时剩下的内容。所以我有了实现一个粗壮的BlockingCollection 的想法,它将在内部处理所有这些复杂性,并将与现有BlockingCollection 相同的简单接口外部化。我的问题是我还没有设法匹配复杂手动分区的性能。对于极其精细的数据(基本上只是整数值),我最好的尝试仍然支付大约 +100% 的性能税。所以我想在这里介绍一下我到目前为止所做的事情,希望能得到一个可以帮助我缩小性能差距的建议。

我最好的尝试是使用ThreadLocal<List<T>>,这样每个线程都在一个专用块上工作,无需任何锁。

public class ChunkyBlockingCollection1<T>

    private readonly BlockingCollection<T[]> _blockingCollection;
    public readonly int _chunkSize;
    private readonly ThreadLocal<List<T>> _chunk;

    public ChunkyBlockingCollection1(int chunkSize)
    
        _blockingCollection = new BlockingCollection<T[]>();
        _chunkSize = chunkSize;
        _chunk = new ThreadLocal<List<T>>(() => new List<T>(chunkSize), true);
    

    public void Add(T item)
    
        var chunk = _chunk.Value;
        chunk.Add(item);
        if (chunk.Count >= _chunkSize)
        
            _blockingCollection.Add(chunk.ToArray());
            chunk.Clear();
        
    

    public void CompleteAdding()
    
        var chunks = _chunk.Values.ToArray();
        foreach (var chunk in chunks)
        
            _blockingCollection.Add(chunk.ToArray());
            chunk.Clear();
        
        _blockingCollection.CompleteAdding();
    

    public IEnumerable<T> GetConsumingEnumerable()
    
        foreach (var chunk in _blockingCollection.GetConsumingEnumerable())
        
            for (int i = 0; i < chunk.Length; i++)
            
                yield return chunk[i];
            
        
    

我的第二个最佳尝试是使用单个List&lt;T&gt; 作为块,所有线程都可以使用锁以线程安全的方式访问它。令人惊讶的是,这仅比ThreadLocal&lt;List&lt;T&gt;&gt; 解决方案慢一点。

public class ChunkyBlockingCollection2<T>

    private readonly BlockingCollection<T[]> _blockingCollection;
    public readonly int _chunkSize;
    private readonly List<T> _chunk;
    private readonly object _locker = new object();

    public ChunkyBlockingCollection2(int chunkSize)
    
        _blockingCollection = new BlockingCollection<T[]>();
        _chunkSize = chunkSize;
        _chunk = new List<T>(chunkSize);
    

    public void Add(T item)
    
        lock (_locker)
        
            _chunk.Add(item);
            if (_chunk.Count >= _chunkSize)
            
                _blockingCollection.Add(_chunk.ToArray());
                _chunk.Clear();
            
        
    

    public void CompleteAdding()
    
        lock (_locker)
        
            _blockingCollection.Add(_chunk.ToArray());
            _chunk.Clear();
        
        _blockingCollection.CompleteAdding();
    

    public IEnumerable<T> GetConsumingEnumerable()
    
        foreach (var chunk in _blockingCollection.GetConsumingEnumerable())
        
            for (int i = 0; i < chunk.Length; i++)
            
                yield return chunk[i];
            
        
    

我还尝试将ConcurrentBag&lt;T&gt; 用作块,这导致性能不佳和正确性问题(因为我没有使用锁)。另一种尝试是用SpinLock 替换lock (_locker),性能更差。锁定显然是我问题的根源,因为如果我完全删除它,那么我的班级将获得最佳性能。当然,如果有多个生产者,移除锁会非常失败。


更新:我通过Nick 实现了无锁解决方案suggested,大量使用了Interlocked 类。在一个生产者的配置中,性能稍微好一些,但如果有两个或更多的生产者,性能就会变得更差。有许多不一致的冲突导致线程旋转。实现也很棘手,容易引入错误。

public class ChunkyBlockingCollection3<T>

    private readonly BlockingCollection<(T[], int)> _blockingCollection;
    public readonly int _chunkSize;
    private T[] _array;
    private int _arrayCount;
    private int _arrayCountOfCompleted;
    private T[] _emptyArray;

    public ChunkyBlockingCollection3(int chunkSize)
    
        _chunkSize = chunkSize;
        _blockingCollection = new BlockingCollection<(T[], int)>();
        _array = new T[chunkSize];
        _arrayCount = 0;
        _arrayCountOfCompleted = 0;
        _emptyArray = new T[chunkSize];
    

    public void Add(T item)
    
        while (true) // Spin
        
            int count = _arrayCount;
            while (true) // Spin
            
                int previous = count;
                count++;
                int result = Interlocked.CompareExchange(ref _arrayCount,
                    count, previous);
                if (result == previous) break;
                count = result;
            
            var array = Interlocked.CompareExchange(ref _array, null, null);
            if (array == null) throw new InvalidOperationException(
                    "The collection has been marked as complete.");
            if (count <= _chunkSize)
            
                // There is empty space in the array
                array[count - 1] = item;
                Interlocked.Increment(ref _arrayCountOfCompleted);
                break; // Adding is completed
            
            if (count == _chunkSize + 1)
            
                // Array is full. Push it to the BlockingCollection.
                while (Interlocked.CompareExchange(
                    ref _arrayCountOfCompleted, 0, 0) < _chunkSize)   // Spin
                _blockingCollection.Add((array, _chunkSize));
                T[] newArray;
                while ((newArray = Interlocked.CompareExchange(
                    ref _emptyArray, null, null)) == null)   // Spin
                Interlocked.Exchange(ref _array, newArray);
                Interlocked.Exchange(ref _emptyArray, null);
                Interlocked.Exchange(ref _arrayCountOfCompleted, 0);
                Interlocked.Exchange(ref _arrayCount, 0); // Unlock other threads
                Interlocked.Exchange(ref _emptyArray, new T[_chunkSize]);
            
            else
            
                // Wait other thread to replace the full array with a new one.
                while (Interlocked.CompareExchange(
                    ref _arrayCount, 0, 0) > _chunkSize)   // Spin
            
        
    

    public void CompleteAdding()
    
        var array = Interlocked.Exchange(ref _array, null);
        if (array != null)
        
            int count = Interlocked.Exchange(ref _arrayCount, -1);
            while (Interlocked.CompareExchange(
                ref _arrayCountOfCompleted, 0, 0) < count)   // Spin
            _blockingCollection.Add((array, count));
            _blockingCollection.CompleteAdding();
        
    

    public IEnumerable<T> GetConsumingEnumerable()
    
        foreach (var (array, count) in _blockingCollection.GetConsumingEnumerable())
        
            for (int i = 0; i < count; i++)
            
                yield return array[i];
            
        
    

【问题讨论】:

I need ideas 开头的标题会引起错误的注意。 @Nkosi 看来我的问题仍然没有引起注意。???? @TheodorZoulias,您尝试使用BlockingCollection&lt;T&gt;[] 吗? @DmitryStepanov 不,我没有。这将如何运作? @TheodorZoulias,看看这个 (docs.microsoft.com/en-us/dotnet/standard/collections/…) 【参考方案1】:

您可以尝试使用_chunk 的数组,而不是使用List&lt;T&gt;。然后,您可以使用 Interlocked.Increment 来增加下一个索引以填充Add,并且当您的计数超过您的块的大小时,将其全部移动到阻塞集合并在锁定中重置索引。

【讨论】:

如果能去掉Add方法中的lock就好了,但我认为不可能。如果没有lock,我会冒着在BlockinCollection 的所有元素都填充数据之前将数组推入的风险。由消费者线程处理时导致NullReferenceException @TheodorZoulias,你说得有道理。为了消除竞争条件,如果 Interlocked.Increment 返回的值超过了块的大小,那么您可以旋转等待(使用 Increment.CompareExchange)等到操作将块移动到 BlockingCollection 然后重置块的当前索引完成。 这个值得一试! 伙计,无锁编程很难!我使用Interlocked 更新了我的问题。不幸的是,结果并不乐观。

以上是关于如何加快大块 BlockingCollection 的实现的主要内容,如果未能解决你的问题,请参考以下文章

C#阻塞队列BlockingCollection

BlockingCollection 中的多个消费者是不是同时处理?

BlockingCollection实现单体程序内队列

使用 BlockingCollection<T>() 缩放连接

带有 BlockingCollection.GetConsumableEnumerable 的 Parallel.ForEach 循环

用于从 BlockingCollection 消费的计时器 vs 线程 vs RegisteredWaitHandle