C#ConcurrentBag - 如何安全地清除每个添加的N个对象

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了C#ConcurrentBag - 如何安全地清除每个添加的N个对象相关的知识,希望对你有一定的参考价值。

我正在实现一个记录到AWS CloudWatch的自定义记录器,它是一个异步记录器。记录器适用于我的WCF服务,多个线程正在尝试同时记录。我已经为日志记录创建了一个线程安全的单例访问,但AWS CloudWatch SDK在完成发送当前日志之前无法发送下一个日志。

所以我的想法是将日志消息汇集在ConcurrentBag中,一旦它到达N个成员,我将批量发送到CloudWatch(SDK允许这样,所以这一切都很好)。

过程如下:任何线程调用记录器> Logger向ConcurrentBag添加消息>发送者线程不断检查计数(或同一个)>如果count> = N发送批量日志并锁定其他线程添加的同时它正在发送>重复

该过程不必等待响应开始再次添加到ConcurrentBag。

这样做的最佳方法是什么?

答案

我的想法就像一个后备缓冲器。一个袋子装满后,您可以在atomar操作中进行交换并进行处理。我已经草拟了这样一个可以处理这个问题的类:

public class DualBag<T> : IDisposable
{
    private ConcurrentBag<T> _bag;
    private int _threshold;
    private ManualResetEventSlim _exchangeEvent;
    private CancellationTokenSource _cts;

    public DualBag(int threshold)
    {
        _bag = new ConcurrentBag<T>();
        _threshold = threshold;
        _exchangeEvent = new ManualResetEventSlim();
        _cts = new CancellationTokenSource();
        Task.Run(() => Consume());
    }

    public void Add(T item)
    {
        _bag.Add(item);

        if (_bag.Count > _threshold)
        {
            _exchangeEvent.Set();
        }
    }

    private async Task Consume()
    {
        while (!_cts.IsCancellationRequested)
        {
            _exchangeEvent.Wait();
            _exchangeEvent.Reset();
            var _replacementBag = new ConcurrentBag<T>();
            var bag = Interlocked.Exchange(ref _bag, _replacementBag);
            await ConsumeBag(bag);
        }

        if (!_bag.IsEmpty)
            await ConsumeBag(_bag);
    }

    public void Dispose()
    {
        _cts.Cancel();
    }

    public Task ConsumeBag(ConcurrentBag<T> bag)
    {
        // post entries to the api
    }
}

这可以通过一些观点来改进。如果你使用Stephen Cleary asyncEx库,你可以使用AsyncManualResetEvent使处理循环真正异步并消除大部分时间阻塞的线程。

以上是关于C#ConcurrentBag - 如何安全地清除每个添加的N个对象的主要内容,如果未能解决你的问题,请参考以下文章

在C#中使用Threads和ConcurrentBag

如何安全地从内存中清除使用 new 关键字创建的对象(带有属性)?

如何避免 ConcurrentBag<T>.GetEnumerator() 上的争用

C# ConcurrentBag与list

Parallel.For 和 ConcurrentBag 给出不可预测的行为

我可以在 ConcurrentBag 上使用普通的 foreach 吗?