如何并行处理 MSMQ 消息

Posted

技术标签:

【中文标题】如何并行处理 MSMQ 消息【英文标题】:How to process MSMQ messages in parallel 【发布时间】:2011-07-27 02:42:53 【问题描述】:

我正在编写一个 Windows 服务来使用 MSMQ 消息。该服务将有高活跃期(80k 消息很快收到)和长时间不活跃(可能几天没有新消息)。

处理消息是非常受网络限制的,所以我从并行性中获得了很大的好处。但是在不活动期间,我不想占用一堆线程等待不会很快到来的消息。

MSMQ 接口似乎非常专注于同步工作流 - 获取一条消息、处理它、获取另一条消息等。我应该如何构建我的代码,以便我可以在高活动期间利用并行性而不是平局在无活动期间启动一堆线程?使用 TPL 的奖励积分。伪代码将不胜感激。

【问题讨论】:

虽然在回调处理程序中对单个消息的处理可能看起来是同步的,但与 RPC 相比,总体方案中的消息传递概念是异步的 【参考方案1】:

多年来,我已经完成了 MSMQ(包括移动实现)的分配,您对“同步工作流”的描述是正确的。并不是说您不能通过 TPL 获取各种消息信封并在不同的内核上处理它们……限制因素是读/写队列……本质上是串行操作。例如,您不能一次发送 8 条消息(一台具有 8 个内核的计算机)。

我也有类似的需求(不使用 System.Messaging 命名空间),并在我读过 Campbell 和 Johnson 的“Microsoft.NET 并行编程”一书中的帮助下解决了这个问题。

查看他们的“并行任务”一章,特别是使用全局队列的部分,该队列与每线程本地队列协作进行工作处理(即 TPL),使用“工作窃取”算法来执行负载平衡。我在一定程度上模仿了他们的解决方案。我的系统的最终版本在性能上有很大的不同(从每秒 23 条消息到超过 200 条消息)。

根据您的系统从 0 到 80,000 所需的时间,您需要采用相同的设计并将其分布在多个服务器(每个服务器具有多个处理器和多个内核)上。从理论上讲,我的设置需要不到 7 分钟的时间来磨光所有 80K,因此通过添加第二台计算机,它将减少到大约 3 分 20 秒等,等等等等。诀窍是工作窃取逻辑。

深思……

快速编辑:顺便说一句,这台计算机是戴尔 T7500 工作站,配备双四核 Xeons @ 3GHz、24 GB RAM、Windows 7 Ultimate 64 位版本。

【讨论】:

谢谢。我曾想过类似的事情,但我试图避免不得不推出自己的工作窃取队列实现。 不管它值多少钱,我以前从来没有把这样的东西放在一起......这本书帮助我分配,我在短短几天内就完成了它并运行起来。考虑到我不是最快的编码员,我对结果很满意。拥有一堆具有不同寿命的工作项(它们是独立的 - 它们之间没有依赖关系)的基本问题对我来说是一个常见问题,因此我将其拆分为一个可重用的组件并在其他地方也使用它。跨度> 作为后续,我自己也在研究这本书,还没有全部看完,但是我发现这里提到的这本书可以在MSDN上免费获得。这是并行任务章节的直接链接:msdn.microsoft.com/en-us/library/ff963549.aspx【参考方案2】:

这是我最终所做的简化版本:

while(true) 
    int msgCount = 0;

    Parallel.ForEach(Enumerable.Range(0,20), (i) => 
        MessageQueue queue = new MessageQueue(_queuePath);

        try 
            msg = queue.Receive(TimeSpan.Zero);
            // do work

            Interlocked.Increment(ref msgCount);
        catch(MessageQueueException mqex) 
            if (mqex.MessageQueueErrorCode == MessageQueueErrorCode.IOTimeout) 
                return; // nothing in queue
            
            else throw;
                   
    

    if (msgCount < 20) 
        Thread.Sleep(1000); // nothing more to do, take a break

所以我尝试一次接收 20 条消息,并计算我收到的消息。对于那 20 个人,我让 TPL 进城。最后,如果我处理的消息少于 20 条,则队列为空,我将线程休眠一秒钟,然后重试。

【讨论】:

有趣。如果我没看错,那么您正在为同一路径创建 20 个队列,然后同步读取它们。我很好奇您是否在 Microsoft 的实现中遇到了任何使它们成为线程安全的锁定机制(因为它是在队列上接收或发送的串行操作)?分析这个也很有趣......队列的所有重复实例化(对 80K 消息的昂贵操作)。底线......对于您计算机上的每个核心,您应该看到处理时间的“因素”下降(大约)。效果如何? @Cirrus 是的,我在每次迭代中都新建了一个MessageQueue 对象,因为我不相信同时使用来自多个线程的单个对象是安全的。我没有分析所有这些对象创建的成本,但一个简单的优化是每个线程创建一个,而不是每次迭代创建一个。 Parallel.ForEach 对此有一个重载。 我对 MessageQueue 线程安全也有同样的疑问。来自以下链接“只有以下方法对于多线程操作是安全的:BeginPeek、BeginReceive、EndPeek、EndReceive、GetAllMessages、Peek 和 Receive。”所以大概你可以让一个 MessageQueue 实例引用闭包样式来进行并行操作。 msdn.microsoft.com/en-us/library/… 在.Net 4.0/4.5 的MessageQueue 类文档中msdn.microsoft.com/en-us/library/… 微软表示Only the GetAllMessages method is thread safe.。你的代码是如何解决这个问题的?【参考方案3】:

NServiceBus 对这个问题有一个很好的概念。它被称为Distributor。这个想法是分发器可以转发要完成的工作并将其分发到任意数量的运行子节点。取决于正在完成的工作类型,例如繁重的计算与磁盘写入相比,您可以将其分布在多个进程甚至多台机器上。

【讨论】:

【参考方案4】:

解决方案还部分取决于消息的处理方式。

我使用了托管在 Windows Server AppFabric 中的 WorkflowService,并带有 Net.Msmq 绑定和事务队列。需要事务性 net.msmq 绑定来处理乱序消息处理。工作流是一个 .Net 4.0.1 状态机,消息从不同系统进入同一个队列。例如,可以让一个系统在另一个系统发送消息以对其进行实例化之前将更新发送到状态机实例。为了启用乱序消息处理,工作流服务主机使用 BufferedReceive 来锁定消息,并反复尝试从锁定子队列中获取它们。 BufferedReceive 将最大待处理消息设置为可能的最大批处理长度,因为锁定队列中的消息返回到重试队列在前面

WF 还有一些限制设置。我的最大可能批处理长度约为 20000。我已将 MaxConcurrentCalls 设置为 64,将 MaxConcurrentInstances 设置为 20000。这导致 IIS/WAS 处理 64 个并发调用。

但是,事情就是这样,因为工作流中的接收是单向的,这并不意味着一旦接收完成,衍生的 WF 进程就会终止。在我的场景中接下来发生的事情是,在消息出列并调用 WF 实例(这是 64 个调用之一)之后,工作流引擎会安排许多后续步骤,其中一个是数据库操作。

问题在于,最多 64 次调用可能是最大的,但如果消息消耗率高于异步进程完成率,则在处理传入的一批消息时,执行线程数会更高(在我的案例WF实例)。这可能会导致意外的事情发生,例如 ADO.NET 连接池的最大连接数默认为 100。这将导致进程超时等待来自耗尽池的连接。对于这个特殊问题,您可以提高 MaxPoolSize 值,也可以使用 Service Broker 异步处理数据库操作(这意味着工作流程更加复杂)。

希望这对某人有所帮助。

【讨论】:

【参考方案5】:

我是这样做的(动态更改代码,以免出现拼写错误):

for (int i = 0; i < numberOfSimultaneousRequests; i++)
            priorityQueue.BeginReceive(TimeSpan.FromDays(30), state, callback);

回调看起来像这样:

private void ProcessMessage(IAsyncResult asyncResult)
    
        try
        
            Message msg = priorityQueue.EndReceive(asyncResult);
            //Do something with the message
        
        finally
        
            priorityQueue.BeginDequeue(null, ProcessMessage);//start processing another one
        

【讨论】:

【参考方案6】:

只是尝试以某种方式相似,tpl 似乎能够在遇到物理问题时抛出某种线程安全异常,例如尝试在 tpl foreach 之外创建一个 sqlconnection 并在循环体中使用它 - 它抛出了一个异常为了我。我在进入正文之前新建了一个队列,枚举了一个字符串列表,看起来还不错,我的代码在 i7 2500 8gb 和本地 msmq 上使用 1way 消息传递始终在 500 毫秒以下处理 10000 个项目

【讨论】:

【参考方案7】:

我在一个名为 CodeRonin 的博客上找到了一个完整的解决方案。在我看来,这是整个互联网上唯一完整的例子。谢谢你,CodeRonin!

http://code-ronin.blogspot.de/2008/09/msmq-transactional-message-processing.html

【讨论】:

正确,如果您没有在事务模式下使用 MSMQ,而是使用并行或分布式处理,您的故障模式可能涉及丢失或复制数据。如果您使用 MSMQ 并且不关心故障模式,则可能不需要使用 MSMQ。

以上是关于如何并行处理 MSMQ 消息的主要内容,如果未能解决你的问题,请参考以下文章

如何在确保每个实体 FIFO 的同时并行处理消息?

如何在 apache camel 中执行 gcp pubsub 消息的并行处理

异常消息: 查询处理器未能为执行并行查询启动必要的线程资源。

EOS 消息设计并行处理

Kafka消费者可以并行处理多条消息吗

c#数据流或任务,消费消息并行处理