消息队列和异步/等待

Posted

技术标签:

【中文标题】消息队列和异步/等待【英文标题】:MessageQueue and Async / Await 【发布时间】:2013-04-12 14:52:30 【问题描述】:

我只想以异步方法接收我的消息!它冻结了我的用户界面

    public async void ProcessMessages()
    
        MessageQueue MyMessageQueue = new MessageQueue(@".\private$\MyTransactionalQueue");
        MyMessageQueue.Formatter = new XmlMessageFormatter(new Type[]  typeof(string) );

        while (true)
        
            MessageQueueTransaction MessageQueueTransaction = new MessageQueueTransaction();
            MessageQueueTransaction.Begin();

            ContainError = false;
            ProcessPanel.SetWaiting();

            string Body = MyMessageQueue.Receive(MessageQueueTransaction).Body.ToString();

            //Do some process with body string.

            MessageQueueTransaction.Commit();
        
    

我只是像任何常规方法一样调用该方法,但它没有工作! 当我使用 BackgroundWorkers 而不是 async/await 时,这段代码可以正常工作

想法?

【问题讨论】:

您的代码是否产生任何警告?您是否阅读过他们所说的并试图理解他们? 查看您的错误列表,您会看到“此异步方法缺少 'await' 运算符并将同步运行” 这不是一个本地化的问题——事实上,SO 中有很多类似的问题,人们认为async 以某种方式使他们的方法异步运行 【参考方案1】:

正如斯蒂芬所写,异步不会在线程中运行您的代码。幸运的是,您可以使用TaskFactory.FromAsync 和MessageQueue.BeginReceive/MessageQueue.EndReceive 来异步接收消息:

    private  async Task<Message> MyAsyncReceive()
    
        MessageQueue queue=new MessageQueue();
        ...
        var message=await Task.Factory.FromAsync<Message>(
                           queue.BeginReceive(),
                           queue.EndReceive);

        return message;

    

您应该注意,虽然没有使用 Transaction 的 BeginReceive 版本。来自 BeginReceive 的文档:

不要对事务使用异步调用 BeginReceive。如果要执行事务性异步操作,请调用 BeginPeek,并将事务和(同步)Receive 方法放入为 peek 操作创建的事件处理程序中。

这是有道理的,因为无法保证您必须等待多长时间才能等待响应,或者哪个线程最终会处理完成的调用。

要使用事务,你可以这样写:

    private  async Task<Message> MyAsyncReceive()
    
        var queue=new MessageQueue();

        var message=await Task.Factory.FromAsync<Message>(queue.BeginPeek(),queue.EndPeek);

        using (var tx = new MessageQueueTransaction())
        
            tx.Begin();

            //Someone may have taken the last message, don't wait forever
            //Use a smaller timeout if the queue is local
            message=queue.Receive(TimeSpan.FromSeconds(1), tx);
            //Process the results inside a transaction
            tx.Commit();
        
        return message;
    

更新

正如 Rob 所指出的,原始代码使用了从 Peek 返回的 message,它可能在 PeekReceive 之间发生了变化。在这种情况下,第二条消息将丢失。

如果另一个客户端读取队列中的最后一条消息,仍然有可能发生阻塞。为了防止这种情况,Receive 应该有一个小的超时。

【讨论】:

这个答案很棒!它有效!如果我想多次运行这个 MyAsyncReceive 方法,他们会共享我的线程吗?所以他们会很慢? 该方法没有什么特别之处。运行时将为每次调用使用线程池中的可用线程。 这不包含竞争条件吗? message 变量可能包含与收到的消息不同的消息,不是吗? (特别是如果有多个消费者)。 peek 将返回 a 消息,但 Receive() 方法可能会收到不同的消息。我会将var message = 放在“Peek 等待者”中并使用var message = queue.Receive(tx); 确保在事务中处理正确的消息.. 糟糕,应该是message=queue.Receive(tx) 仅使用收到的消息是不够的,因为其他人可能收到了唯一可用的消息。 Receive 应该有一个非常短的超时时间,或者应该使用游标来确保实际加载了偷看的消息【参考方案2】:

async does not run your code on a background thread.您上面的代码应该引起编译器警告,告诉您您的方法将同步运行。

如果要在后台线程上执行方法,请使用TaskEx.Run

public void ProcessMessages()

  ...


TaskEx.Run(() => ProcessMessages());

【讨论】:

如果我使用 run 那么它将创建一个新线程!所以我不能直接修改 UI,我需要使用 Dispatch rigth 吗?像普通线程一样?或者还有其他方法吗?我只需要更新主窗体中的进度条。 我推荐使用IProgress&lt;T&gt;Progress&lt;T&gt; 来处理线程调度。 TAP documentation covers IProgress&lt;T&gt;. 我无法将两个答案标记为正确!我的问题的答案是使用 Task.Factory.FromAsync 但这样我就不会使用新线程的力量。所以我会用运行!谢谢

以上是关于消息队列和异步/等待的主要内容,如果未能解决你的问题,请参考以下文章

异步IO(协程,消息循环队列)

需要一个线程安全的异步消息队列

Redis实现简单消息队列

rabbitmq消息队列介绍

常用消息队列介绍和对比

常用消息队列介绍