C#ConcurrentBag - 如何安全地清除每个添加的N个对象
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了C#ConcurrentBag - 如何安全地清除每个添加的N个对象相关的知识,希望对你有一定的参考价值。
我正在实现一个记录到AWS CloudWatch的自定义记录器,它是一个异步记录器。记录器适用于我的WCF服务,多个线程正在尝试同时记录。我已经为日志记录创建了一个线程安全的单例访问,但AWS CloudWatch SDK在完成发送当前日志之前无法发送下一个日志。
所以我的想法是将日志消息汇集在ConcurrentBag中,一旦它到达N个成员,我将批量发送到CloudWatch(SDK允许这样,所以这一切都很好)。
过程如下:任何线程调用记录器> Logger向ConcurrentBag添加消息>发送者线程不断检查计数(或同一个)>如果count> = N发送批量日志并锁定其他线程添加的同时它正在发送>重复
该过程不必等待响应开始再次添加到ConcurrentBag。
这样做的最佳方法是什么?
我的想法就像一个后备缓冲器。一个袋子装满后,您可以在atomar操作中进行交换并进行处理。我已经草拟了这样一个可以处理这个问题的类:
public class DualBag<T> : IDisposable
{
private ConcurrentBag<T> _bag;
private int _threshold;
private ManualResetEventSlim _exchangeEvent;
private CancellationTokenSource _cts;
public DualBag(int threshold)
{
_bag = new ConcurrentBag<T>();
_threshold = threshold;
_exchangeEvent = new ManualResetEventSlim();
_cts = new CancellationTokenSource();
Task.Run(() => Consume());
}
public void Add(T item)
{
_bag.Add(item);
if (_bag.Count > _threshold)
{
_exchangeEvent.Set();
}
}
private async Task Consume()
{
while (!_cts.IsCancellationRequested)
{
_exchangeEvent.Wait();
_exchangeEvent.Reset();
var _replacementBag = new ConcurrentBag<T>();
var bag = Interlocked.Exchange(ref _bag, _replacementBag);
await ConsumeBag(bag);
}
if (!_bag.IsEmpty)
await ConsumeBag(_bag);
}
public void Dispose()
{
_cts.Cancel();
}
public Task ConsumeBag(ConcurrentBag<T> bag)
{
// post entries to the api
}
}
这可以通过一些观点来改进。如果你使用Stephen Cleary asyncEx库,你可以使用AsyncManualResetEvent
使处理循环真正异步并消除大部分时间阻塞的线程。
以上是关于C#ConcurrentBag - 如何安全地清除每个添加的N个对象的主要内容,如果未能解决你的问题,请参考以下文章
如何安全地从内存中清除使用 new 关键字创建的对象(带有属性)?
如何避免 ConcurrentBag<T>.GetEnumerator() 上的争用