当有大量消息排队时,从 MSMQ 读取速度会变慢

Posted

技术标签:

【中文标题】当有大量消息排队时,从 MSMQ 读取速度会变慢【英文标题】:Reading from MSMQ slows down when there is a lot of messages queued 【发布时间】:2011-09-23 09:38:51 【问题描述】:

简介

我有一个基于 SEDA 的系统,并使用 MSMQ 在不同的应用程序/服务之间进行通信(事件触发)。

其中一个服务按文件获取消息,因此我有一个文件侦听器,它读取文件内容并将其插入队列(或实际上是 4 个不同的队列,但这对于第一个问题不是很重要)。

服务器是 Windows Server 2008

第一个问题 - 阅读速度变慢

我在另一端读取这些消息的应用程序通常每秒从队列中读取大约 20 条消息,但是当发布消息的服务开始排队数千条消息时,读取​​下降,读取应用程序只读取 2-每秒 4 条消息。当没有发布到队列时,读取应用程序每秒可以再次读取多达 20 条消息。

阅读应用程序的代码很简单,用C#开发,我使用System.Messaging中的Read(TimeSpan timeout)函数。

问:为什么当有大量消息发布到队列时读取速度变慢?

第二个问题 - TPS 的限制

另一个问题是关于阅读本身。如果我使用 1 或 5 个线程从队列中读取,我每秒可以读取多少条消息似乎没有区别。我还尝试实现“循环解决方案”,其中发布服务发布到一组随机的 4 个队列,并且读取应用程序有一个线程监听这些队列中的每一个,但即使我仍然只有 20 TPS从具有 1 个线程的 1 个队列、具有 4 个线程的 1 个队列或 4 个队列(每个队列一个线程)读取。

我知道线程中的处理大约需要 50 毫秒,所以如果当时只处理一条消息,则 20 TPS 是相当正确的,但多线程的线索应该是消息是并行处理的,而不是顺序处理的。

服务器上有大约 110 个不同的队列。

问:为什么即使使用多线程和使用多个队列,我一次也不能从队列中取出超过 20 条消息?

这是今天运行的代码:

// There are 4 BackgroundWorkers running this function
void bw_DoWork(object sender, DoWorkEventArgs e) 

    using(var mq = new MessageQueue(".\\content"))
    
        mq.Formatter = new BinaryMessageFormatter();

        // ShouldIRun is a bool set to false by OnStop()
        while(ShouldIRun)
        
            try
            
                using(var msg = mq.Receive(new TimeSpan(0,0,2))
                
                    ProcessMessageBody(msg.Body); // This takes 50 ms to complete
                
            
            catch(MessageQueueException mqe)
            
               // This occurs every time TimeSpan in Receive() is reached
               if(mqe.MessageQueueErrorCode == MessageQueueErrorCode.IOTimeout) 
                   continue;
            
        
    

但是即使有4个线程,似乎都在等待函数再次进入“接收”点。我也尝试过使用 4 个不同的队列(content1、content2、content3 和 content4),但我仍然每 50 毫秒处理 1 条消息。

这与 Receive() 中的 TimeSpan 有什么关系,和/或是否可以省略它?

另一个问题是,如果使用私有队列,而不是公共队列可以解决什么问题?

【问题讨论】:

【参考方案1】:

性能问题。 您没有提及是否所有代码都在服务器上运行,或者您是否有客户端远程访问服务器上的队列。从速度来看,我假设是后者。 另外,队列是事务性的吗? 消息有多大?

如果您想从队列中读取消息,您的应用程序不会连接到队列本身。一切都在本地队列管理器和远程队列管理器之间进行。队列管理器是对队列进行写入和读取的唯一进程。因此,拥有多个队列或单个队列不一定会有任何不同的表现。

因此,MSMQ 队列管理器在某些时候会成为瓶颈,因为它只能同时完成这么多的工作。您的第一个问题表明了这一点 - 当您在将消息放入的队列管理器上施加高负载时,您将消息取出的能力会减慢。例如,我建议查看性能监视器以查看 MQSVC.EXE 是否已被最大化。

【讨论】:

所有代码都在同一台服务器上运行。它仅用于在各个服务之间发送消息。它是公共队列,非事务性的。消息很小。 您需要公共队列吗?除非您需要远程管理,否则专用队列通常就足够了。您是否使用 DIRECT FormatNames 来处理队列或路径名? 其实我不需要公共队列,但是当系统创建时我不得不因为远程访问,但队列现在只被本地应用程序使用。我正在使用路径名来访问队列,但是队列的初始化只进行了一次,所以这不应该意味着太多吗? 一个简单的测试是从服务器中嗅出网络,看看是否有任何额外的流量用于查询 Active Directory。 顺便说一下,每秒 20 条消息确实非常慢。我的台式机将在 32 秒内收到 10,000 条 1kb 的可恢复消息。这没有对消息进行任何处理 - 只是从本地队列中接收它们。【参考方案2】:

你为什么使用时间跨度? - 这是一件坏事,原因如下。

在开发服务和队列时,您需要以安全的方式进行编程。队列中的每个项目都会产生一个新线程。使用时间跨度会强制每个线程使用单个计时器事件线程。这些事件必须在事件线程中等待轮到它们。

标准是每个队列事件 1 个线程 - 这通常是您的 System.Messaging.ReceiveCompletedEventArgs 事件。另一个线程是您的 onStart 事件...

每秒 20 个线程或 20 次读取可能是正确的。通常,在线程池中,您一次只能在 .net 中生成 36 个线程。

我的建议是放弃计时器事件,让您的队列简单地处理数据。

做更多这样的事情;

namespace MessageService 
 

public partial class MessageService : ServiceBase 

 

    public MessageService() 

     

        InitializeComponent(); 

     



    private string MessageDirectory = ConfigurationManager.AppSettings["MessageDirectory"]; 

    private string MessageQueue = ConfigurationManager.AppSettings["MessageQueue"]; 



    private System.Messaging.MessageQueue messageQueue = null; 



    private ManualResetEvent manualResetEvent = new ManualResetEvent(true); 





    protected override void OnStart(string[] args) 

     

        // Create directories if needed 

        if (!System.IO.Directory.Exists(MessageDirectory)) 

            System.IO.Directory.CreateDirectory(MessageDirectory); 



        // Create new message queue instance 

        messageQueue = new System.Messaging.MessageQueue(MessageQueue); 



        try 

            

            // Set formatter to allow ASCII text 

            messageQueue.Formatter = new System.Messaging.ActiveXMessageFormatter(); 

            // Assign event handler when message is received 

            messageQueue.ReceiveCompleted += 

                new System.Messaging.ReceiveCompletedEventHandler(messageQueue_ReceiveCompleted); 

            // Start listening 



            messageQueue.BeginReceive(); 

         

        catch (Exception e) 

         



         

     



    protected override void OnStop() 

     

        //Make process synchronous before closing the queue 

        manualResetEvent.WaitOne(); 





        // Clean up 

        if (this.messageQueue != null) 

         

            this.messageQueue.Close(); 

            this.messageQueue = null; 

         

     



    public void messageQueue_ReceiveCompleted(object sender, System.Messaging.ReceiveCompletedEventArgs e) 

     

        manualResetEvent.Reset(); 

        System.Messaging.Message completeMessage = null; 

        System.IO.FileStream fileStream = null; 

        System.IO.StreamWriter streamWriter = null; 

        string fileName = null; 

        byte[] bytes = new byte[2500000]; 

        string xmlstr = string.Empty;                

            try 

             

                // Receive the message 

                completeMessage = this.messageQueue.EndReceive(e.AsyncResult);                    

                completeMessage.BodyStream.Read(bytes, 0, bytes.Length); 



                System.Text.ASCIIEncoding ascii = new System.Text.ASCIIEncoding(); 



                long len = completeMessage.BodyStream.Length; 

                int intlen = Convert.ToInt32(len);                   

                xmlstr = ascii.GetString(bytes, 0, intlen);                   

             

            catch (Exception ex0) 

             

                //Error converting message to string                    

             

        

【讨论】:

您是否可以参考一些文档以获取声明 - “您为什么使用时间跨度? - 这是一件坏事,这就是为什么。”快速寻找一些 doco,但找不到任何东西。

以上是关于当有大量消息排队时,从 MSMQ 读取速度会变慢的主要内容,如果未能解决你的问题,请参考以下文章

我们如何加快从 MSMQ 接收消息的速度?

MSMQ 上是不是有任何事件或回调将新消息添加到队列中

C# 消息队列之MSMQ

从远程队列中读取消息

适配器减慢 BizTalk

如何在C#中使用MSMQ