启用 Queue<T> 并发
Posted
技术标签:
【中文标题】启用 Queue<T> 并发【英文标题】:Enabling Queue<T> with concurrency 【发布时间】:2011-06-01 02:47:54 【问题描述】:我以前有一个question,我提供了我的解决方案;但是,我无法访问ConcurrentQueue<T>
,因为我在.Net 3.5 上。我需要Queue<T>
来允许并发。我读了这个question,如果队列中的项目 not 并且线程方法尝试使项目出队,似乎会出现问题。
我现在的任务是确定我是否可以派生我自己的并发队列类。这是我想出的:
public sealed class ConcurrentQueue : Queue<DataTable>
public event EventHandler<TableQueuedEventArgs> TableQueued;
private ICollection que;
new public void Enqueue(DataTable Table)
lock (que.SyncRoot)
base.Enqueue(Table);
OnTableQueued(new TableQueuedEventArgs(Dequeue()));
// this is where I think I will have a problem...
new public DataTable Dequeue()
DataTable table;
lock (que.SyncRoot)
table = base.Dequeue();
return table;
public void OnTableQueued(TableQueuedEventArgs table)
EventHandler<TableQueuedEventArgs> handler = TableQueued;
if (handler != null)
handler(this, table);
因此,当 DataTable 排队时,EventArgs 会将出队的表传递给事件订阅者。这个实现会为我提供一个线程安全的队列吗?
【问题讨论】:
que
完全没用。你应该锁定readonly object key = new object();
。
@SLaks:我根据MSDN实现了ICollection que
和lock(que.SyncRoot)
:msdn.microsoft.com/en-us/library/bb344892.aspx
你根本不需要它。 SyncRoot
如果您有需要为同一个集合锁定的不相交的代码片段,这很有用。在您的情况下,que
是 null
。你只需要在你的方法中锁定一个对象。
我不明白你为什么要在这里使用队列,因为它会立即在同一个线程上出队。如果你想让工作线程处理出队,你可以使用生产者/消费者队列模式。 SO上有很多例子。另外,我假设这会在您入队时立即引发 nullreference 异常。
【参考方案1】:
快速浏览一下我最喜欢的搜索引擎,发现我的记忆是正确的; you can get the Task Parallel Library even on .NET 3.5。另请参阅The PFX team blog post on the subject,以及您下载的Reactive Extensions 以获得所需的System.Threading.dll
。
【讨论】:
不幸的是,我(根据内部政策)受制于 .Net 3.5 中使用的基本库 - 基本上让所有开发人员都使用相同的库。如果我尝试使用 TPLib,我将被标记为流氓。否则,一个不错的选择。 这是一个疯狂的政策。重新发明***是最严重的生产力消耗之一,也是错误的重要来源。 这种政策通常由对编程一无所知的人执行... 实际上,当 CTP 不再可用并且响应式扩展程序不支持时,该策略才有意义。【参考方案2】:您需要使用new
对基类隐藏方法这一事实通常表明您应该使用组合而不是继承...
这是一个简单的同步队列,它不使用继承但仍然依赖于标准Queue<T>
的行为:
public class ConcurrentQueue<T> : ICollection, IEnumerable<T>
private readonly Queue<T> _queue;
public ConcurrentQueue()
_queue = new Queue<T>();
public IEnumerator<T> GetEnumerator()
lock (SyncRoot)
foreach (var item in _queue)
yield return item;
IEnumerator IEnumerable.GetEnumerator()
return GetEnumerator();
public void CopyTo(Array array, int index)
lock (SyncRoot)
((ICollection)_queue).CopyTo(array, index);
public int Count
get
// Assumed to be atomic, so locking is unnecessary
return _queue.Count;
public object SyncRoot
get return ((ICollection)_queue).SyncRoot;
public bool IsSynchronized
get return true;
public void Enqueue(T item)
lock (SyncRoot)
_queue.Enqueue(item);
public T Dequeue()
lock(SyncRoot)
return _queue.Dequeue();
public T Peek()
lock (SyncRoot)
return _queue.Peek();
public void Clear()
lock (SyncRoot)
_queue.Clear();
【讨论】:
什么显式接口实现并强制转换到_queue?lock
-ing 在迭代器中是危险的。
通常,我不会派生一个类,然后在方法上使用new
。但是我不想重写基础提供的一堆方法。因为我知道我只会替换其中的两种方法,所以没有害处。所有base
方法和属性都可用,无需我做任何额外的努力。如果我的想法是错误的,那么我想解释一下为什么我不应该从 QueueQueue<T>
类型的变量使用它,你的new
方法将不会被调用
@Thomas: 只有当你Dispose()
迭代器的IEnumerator<T>
时才会释放锁。 ***.com/questions/2274664/…【参考方案3】:
您在将项目入队时将其出队。 您需要使用您的参数引发事件。
它是否真的是线程安全的取决于你如何使用它。
如果您曾经检查过Count
或检查是否为空,则它不是线程安全的,也不能轻易地使其成为线程安全的。
如果你不这样做,你可能可以使用比队列更简单的东西。
【讨论】:
入队预计比出队更快(但是,我还没有任何数据可以确认)。入队也在主线程上执行。当事件触发时,我使用操作BeginInvoke
异步运行出队过程。
另外......我从来没有检查过Count
,也没有看到有必要这样做,因为我几乎立即在工作线程上出列。我不知道什么会比 QueueBeginInvoke
。
@IAbstract 我想创建一个在项目队列上引发的事件(项目到达),你可以通过创建一个自定义事件 EventHandler 来实现。下面是一个使用 NET 3.5 的示例:experts-exchange.com/questions/28768639/…【参考方案4】:
在最初的问题之后的一段时间,我知道(这与另一个问题的右侧“相关”),但我在类似情况下使用了以下内容。对 CPU 缓存的使用没有它应有的好处,但是如果操作之间经常有很大的差距,那么简单、无锁、线程安全且通常 CPU 缓存的使用并不那么重要,以及何时不是分配的紧密程度可能会降低影响:
internal sealed class LockFreeQueue<T>
private sealed class Node
public readonly T Item;
public Node Next;
public Node(T item)
Item = item;
private volatile Node _head;
private volatile Node _tail;
public LockFreeQueue()
_head = _tail = new Node(default(T));
#pragma warning disable 420 // volatile semantics not lost as only by-ref calls are interlocked
public void Enqueue(T item)
Node newNode = new Node(item);
for(;;)
Node curTail = _tail;
if (Interlocked.CompareExchange(ref curTail.Next, newNode, null) == null) //append to the tail if it is indeed the tail.
Interlocked.CompareExchange(ref _tail, newNode, curTail); //CAS in case we were assisted by an obstructed thread.
return;
else
Interlocked.CompareExchange(ref _tail, curTail.Next, curTail); //assist obstructing thread.
public bool TryDequeue(out T item)
for(;;)
Node curHead = _head;
Node curTail = _tail;
Node curHeadNext = curHead.Next;
if (curHead == curTail)
if (curHeadNext == null)
item = default(T);
return false;
else
Interlocked.CompareExchange(ref _tail, curHeadNext, curTail); // assist obstructing thread
else
item = curHeadNext.Item;
if (Interlocked.CompareExchange(ref _head, curHeadNext, curHead) == curHead)
return true;
#pragma warning restore 420
【讨论】:
【参考方案5】:在OnTableQueued(new TableQueuedEventArgs(Dequeue()));
方法中的Enqueue
行中
使用Peek 代替出列
应该是
OnTableQueued(new TableQueuedEventArgs(base.Peek()));
【讨论】:
以上是关于启用 Queue<T> 并发的主要内容,如果未能解决你的问题,请参考以下文章