消息队列和异步/等待
Posted
技术标签:
【中文标题】消息队列和异步/等待【英文标题】:MessageQueue and Async / Await 【发布时间】:2013-04-12 14:52:30 【问题描述】:我只想以异步方法接收我的消息!它冻结了我的用户界面
public async void ProcessMessages()
MessageQueue MyMessageQueue = new MessageQueue(@".\private$\MyTransactionalQueue");
MyMessageQueue.Formatter = new XmlMessageFormatter(new Type[] typeof(string) );
while (true)
MessageQueueTransaction MessageQueueTransaction = new MessageQueueTransaction();
MessageQueueTransaction.Begin();
ContainError = false;
ProcessPanel.SetWaiting();
string Body = MyMessageQueue.Receive(MessageQueueTransaction).Body.ToString();
//Do some process with body string.
MessageQueueTransaction.Commit();
我只是像任何常规方法一样调用该方法,但它没有工作! 当我使用 BackgroundWorkers 而不是 async/await 时,这段代码可以正常工作
想法?
【问题讨论】:
您的代码是否产生任何警告?您是否阅读过他们所说的并试图理解他们? 查看您的错误列表,您会看到“此异步方法缺少 'await' 运算符并将同步运行” 这不是一个本地化的问题——事实上,SO 中有很多类似的问题,人们认为async
以某种方式使他们的方法异步运行
【参考方案1】:
正如斯蒂芬所写,异步不会在线程中运行您的代码。幸运的是,您可以使用TaskFactory.FromAsync 和MessageQueue.BeginReceive/MessageQueue.EndReceive 来异步接收消息:
private async Task<Message> MyAsyncReceive()
MessageQueue queue=new MessageQueue();
...
var message=await Task.Factory.FromAsync<Message>(
queue.BeginReceive(),
queue.EndReceive);
return message;
您应该注意,虽然没有使用 Transaction 的 BeginReceive 版本。来自 BeginReceive 的文档:
不要对事务使用异步调用 BeginReceive。如果要执行事务性异步操作,请调用 BeginPeek,并将事务和(同步)Receive 方法放入为 peek 操作创建的事件处理程序中。
这是有道理的,因为无法保证您必须等待多长时间才能等待响应,或者哪个线程最终会处理完成的调用。
要使用事务,你可以这样写:
private async Task<Message> MyAsyncReceive()
var queue=new MessageQueue();
var message=await Task.Factory.FromAsync<Message>(queue.BeginPeek(),queue.EndPeek);
using (var tx = new MessageQueueTransaction())
tx.Begin();
//Someone may have taken the last message, don't wait forever
//Use a smaller timeout if the queue is local
message=queue.Receive(TimeSpan.FromSeconds(1), tx);
//Process the results inside a transaction
tx.Commit();
return message;
更新
正如 Rob 所指出的,原始代码使用了从 Peek
返回的 message
,它可能在 Peek
和 Receive
之间发生了变化。在这种情况下,第二条消息将丢失。
如果另一个客户端读取队列中的最后一条消息,仍然有可能发生阻塞。为了防止这种情况,Receive
应该有一个小的超时。
【讨论】:
这个答案很棒!它有效!如果我想多次运行这个 MyAsyncReceive 方法,他们会共享我的线程吗?所以他们会很慢? 该方法没有什么特别之处。运行时将为每次调用使用线程池中的可用线程。 这不包含竞争条件吗?message
变量可能包含与收到的消息不同的消息,不是吗? (特别是如果有多个消费者)。 peek 将返回 a 消息,但 Receive()
方法可能会收到不同的消息。我会将var message =
放在“Peek 等待者”中并使用var message = queue.Receive(tx);
确保在事务中处理正确的消息..
糟糕,应该是message=queue.Receive(tx)
。
仅使用收到的消息是不够的,因为其他人可能收到了唯一可用的消息。 Receive
应该有一个非常短的超时时间,或者应该使用游标来确保实际加载了偷看的消息【参考方案2】:
async
does not run your code on a background thread.您上面的代码应该引起编译器警告,告诉您您的方法将同步运行。
如果要在后台线程上执行方法,请使用TaskEx.Run
:
public void ProcessMessages()
...
TaskEx.Run(() => ProcessMessages());
【讨论】:
如果我使用 run 那么它将创建一个新线程!所以我不能直接修改 UI,我需要使用 Dispatch rigth 吗?像普通线程一样?或者还有其他方法吗?我只需要更新主窗体中的进度条。 我推荐使用IProgress<T>
和Progress<T>
来处理线程调度。 TAP documentation covers IProgress<T>
.
我无法将两个答案标记为正确!我的问题的答案是使用 Task.Factory.FromAsync 但这样我就不会使用新线程的力量。所以我会用运行!谢谢以上是关于消息队列和异步/等待的主要内容,如果未能解决你的问题,请参考以下文章