如何使用ConcurrentQueue进行线程处理
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了如何使用ConcurrentQueue进行线程处理相关的知识,希望对你有一定的参考价值。
我想弄清楚使用队列的最佳方法是什么。我有一个返回DataTable的进程。反过来,每个DataTable都与之前的DataTable合并。有一个问题,在最终的BulkCopy(OutOfMemory)之前要保留的记录太多。
所以,我已经确定我应该立即处理每个传入的DataTable。考虑ConcurrentQueue<T>
...但我不知道WriteQueuedData()
方法如何知道将表出列并将其写入数据库。
例如:
public class TableTransporter
{
private ConcurrentQueue<DataTable> tableQueue = new ConcurrentQueue<DataTable>();
public TableTransporter()
{
tableQueue.OnItemQueued += new EventHandler(WriteQueuedData); // no events available
}
public void ExtractData()
{
DataTable table;
// perform data extraction
tableQueue.Enqueue(table);
}
private void WriteQueuedData(object sender, EventArgs e)
{
BulkCopy(e.Table);
}
}
我的第一个问题是,除了我实际上没有订阅任何事件的事实,如果我异步调用ExtractData()
这将是我需要的全部吗?第二,我是否缺少关于ConcurrentQueue<T>
函数的方式,需要某种形式的触发器与排队对象异步工作?
更新我刚刚从ConcurrentQueue<T>
派生了一个具有OnItemQueued事件处理程序的类。然后:
new public void Enqueue (DataTable Table)
{
base.Enqueue(Table);
OnTableQueued(new TableQueuedEventArgs(Table));
}
public void OnTableQueued(TableQueuedEventArgs table)
{
EventHandler<TableQueuedEventArgs> handler = TableQueued;
if (handler != null)
{
handler(this, table);
}
}
对此实施的任何担忧?
从我对这个问题的理解来看,你遗漏了一些东西。
并发队列是一种数据结构,旨在接受多个线程读取和写入队列,而无需显式锁定数据结构。 (所有爵士乐都在幕后处理,或者集合以不需要锁定的方式实现。)
考虑到这一点,看起来您尝试使用的模式是“生产/消费者”。首先,您有一些任务产生工作(并向队列中添加项目)。第二个你有第二个任务从队列中消耗东西(和dequeing items)。
所以你真的需要两个线程:一个添加项目,另一个删除项目。因为您使用的是并发集合,所以可以让多个线程添加项目,多个线程删除项目。但显然你对并发队列的争论越多,就会越快成为瓶颈。
我认为ConcurrentQueue
仅在极少数情况下有用。它的主要优点是无锁。但是,通常生产者线程必须以某种方式通知消费者线程有可用于处理的数据。线程之间的这种信令需要锁定并否定使用ConcurrentQueue
的好处。同步线程的最快方法是使用Monitor.Pulse()
,它只能在锁中使用。所有其他同步工具甚至更慢。
当然,消费者可以不断检查队列中是否存在某些东西,这些东西在没有锁定的情况下工作,但却极大地浪费了处理器资源。如果消费者在检查之间等待,那就更好了。
写入队列时引发线程是一个非常糟糕的主意。使用ConcurrentQueue
节省可能1微秒将完全浪费执行eventhandler
,这可能需要1000倍的时间。
如果所有处理都是在事件处理程序或异步调用中完成的,那么问题是为什么还需要一个队列?最好将数据直接传递给处理程序,根本不使用队列。
请注意,ConcurrentQueue
的实现相当复杂,以允许并发。在大多数情况下,最好使用普通的Queue<>
并锁定对队列的每次访问。由于队列访问只需要几微秒,因此2个线程在同一微秒内访问队列的可能性极小,并且由于锁定几乎没有任何延迟。使用带锁定的普通Queue<>
通常会导致代码执行速度比ConcurrentQueue
更快。
这是我提出的完整解决方案:
public class TableTransporter
{
private static int _indexer;
private CustomQueue tableQueue = new CustomQueue();
private Func<DataTable, String> RunPostProcess;
private string filename;
public TableTransporter()
{
RunPostProcess = new Func<DataTable, String>(SerializeTable);
tableQueue.TableQueued += new EventHandler<TableQueuedEventArgs>(tableQueue_TableQueued);
}
void tableQueue_TableQueued(object sender, TableQueuedEventArgs e)
{
// do something with table
// I can't figure out is how to pass custom object in 3rd parameter
RunPostProcess.BeginInvoke(e.Table,new AsyncCallback(PostComplete), filename);
}
public void ExtractData()
{
// perform data extraction
tableQueue.Enqueue(MakeTable());
Console.WriteLine("Table count [{0}]", tableQueue.Count);
}
private DataTable MakeTable()
{ return new DataTable(String.Format("Table{0}", _indexer++)); }
private string SerializeTable(DataTable Table)
{
string file = Table.TableName + ".xml";
DataSet dataSet = new DataSet(Table.TableName);
dataSet.Tables.Add(Table);
Console.WriteLine("[{0}]Writing {1}", Thread.CurrentThread.ManagedThreadId, file);
string xmlstream = String.Empty;
using (MemoryStream memstream = new MemoryStream())
{
XmlSerializer xmlSerializer = new XmlSerializer(typeof(DataSet));
XmlTextWriter xmlWriter = new XmlTextWriter(memstream, Encoding.UTF8);
xmlSerializer.Serialize(xmlWriter, dataSet);
xmlstream = UTF8ByteArrayToString(((MemoryStream)xmlWriter.BaseStream).ToArray());
using (var fileStream = new FileStream(file, FileMode.Create))
fileStream.Write(StringToUTF8ByteArray(xmlstream), 0, xmlstream.Length + 2);
}
filename = file;
return file;
}
private void PostComplete(IAsyncResult iasResult)
{
string file = (string)iasResult.AsyncState;
Console.WriteLine("[{0}]Completed: {1}", Thread.CurrentThread.ManagedThreadId, file);
RunPostProcess.EndInvoke(iasResult);
}
public static String UTF8ByteArrayToString(Byte[] ArrBytes)
{ return new UTF8Encoding().GetString(ArrBytes); }
public static Byte[] StringToUTF8ByteArray(String XmlString)
{ return new UTF8Encoding().GetBytes(XmlString); }
}
public sealed class CustomQueue : ConcurrentQueue<DataTable>
{
public event EventHandler<TableQueuedEventArgs> TableQueued;
public CustomQueue()
{ }
public CustomQueue(IEnumerable<DataTable> TableCollection)
: base(TableCollection)
{ }
new public void Enqueue (DataTable Table)
{
base.Enqueue(Table);
OnTableQueued(new TableQueuedEventArgs(Table));
}
public void OnTableQueued(TableQueuedEventArgs table)
{
EventHandler<TableQueuedEventArgs> handler = TableQueued;
if (handler != null)
{
handler(this, table);
}
}
}
public class TableQueuedEventArgs : EventArgs
{
#region Fields
#endregion
#region Init
public TableQueuedEventArgs(DataTable Table)
{this.Table = Table;}
#endregion
#region Functions
#endregion
#region Properties
public DataTable Table
{get;set;}
#endregion
}
作为概念的证明,它似乎工作得很好。我最多看到4个工作线程。
以上是关于如何使用ConcurrentQueue进行线程处理的主要内容,如果未能解决你的问题,请参考以下文章
C#:高效的线程安全队列ConcurrentQueue<T>