当有大量消息排队时,从 MSMQ 读取速度会变慢
Posted
技术标签:
【中文标题】当有大量消息排队时,从 MSMQ 读取速度会变慢【英文标题】:Reading from MSMQ slows down when there is a lot of messages queued 【发布时间】:2011-09-23 09:38:51 【问题描述】:简介
我有一个基于 SEDA 的系统,并使用 MSMQ 在不同的应用程序/服务之间进行通信(事件触发)。
其中一个服务按文件获取消息,因此我有一个文件侦听器,它读取文件内容并将其插入队列(或实际上是 4 个不同的队列,但这对于第一个问题不是很重要)。
服务器是 Windows Server 2008
第一个问题 - 阅读速度变慢
我在另一端读取这些消息的应用程序通常每秒从队列中读取大约 20 条消息,但是当发布消息的服务开始排队数千条消息时,读取下降,读取应用程序只读取 2-每秒 4 条消息。当没有发布到队列时,读取应用程序每秒可以再次读取多达 20 条消息。
阅读应用程序的代码很简单,用C#开发,我使用System.Messaging中的Read(TimeSpan timeout)函数。
问:为什么当有大量消息发布到队列时读取速度变慢?
第二个问题 - TPS 的限制
另一个问题是关于阅读本身。如果我使用 1 或 5 个线程从队列中读取,我每秒可以读取多少条消息似乎没有区别。我还尝试实现“循环解决方案”,其中发布服务发布到一组随机的 4 个队列,并且读取应用程序有一个线程监听这些队列中的每一个,但即使我仍然只有 20 TPS从具有 1 个线程的 1 个队列、具有 4 个线程的 1 个队列或 4 个队列(每个队列一个线程)读取。
我知道线程中的处理大约需要 50 毫秒,所以如果当时只处理一条消息,则 20 TPS 是相当正确的,但多线程的线索应该是消息是并行处理的,而不是顺序处理的。
服务器上有大约 110 个不同的队列。
问:为什么即使使用多线程和使用多个队列,我一次也不能从队列中取出超过 20 条消息?
这是今天运行的代码:
// There are 4 BackgroundWorkers running this function
void bw_DoWork(object sender, DoWorkEventArgs e)
using(var mq = new MessageQueue(".\\content"))
mq.Formatter = new BinaryMessageFormatter();
// ShouldIRun is a bool set to false by OnStop()
while(ShouldIRun)
try
using(var msg = mq.Receive(new TimeSpan(0,0,2))
ProcessMessageBody(msg.Body); // This takes 50 ms to complete
catch(MessageQueueException mqe)
// This occurs every time TimeSpan in Receive() is reached
if(mqe.MessageQueueErrorCode == MessageQueueErrorCode.IOTimeout)
continue;
但是即使有4个线程,似乎都在等待函数再次进入“接收”点。我也尝试过使用 4 个不同的队列(content1、content2、content3 和 content4),但我仍然每 50 毫秒处理 1 条消息。
这与 Receive() 中的 TimeSpan 有什么关系,和/或是否可以省略它?
另一个问题是,如果使用私有队列,而不是公共队列可以解决什么问题?
【问题讨论】:
【参考方案1】:性能问题。 您没有提及是否所有代码都在服务器上运行,或者您是否有客户端远程访问服务器上的队列。从速度来看,我假设是后者。 另外,队列是事务性的吗? 消息有多大?
如果您想从队列中读取消息,您的应用程序不会连接到队列本身。一切都在本地队列管理器和远程队列管理器之间进行。队列管理器是对队列进行写入和读取的唯一进程。因此,拥有多个队列或单个队列不一定会有任何不同的表现。
因此,MSMQ 队列管理器在某些时候会成为瓶颈,因为它只能同时完成这么多的工作。您的第一个问题表明了这一点 - 当您在将消息放入的队列管理器上施加高负载时,您将消息取出的能力会减慢。例如,我建议查看性能监视器以查看 MQSVC.EXE 是否已被最大化。
【讨论】:
所有代码都在同一台服务器上运行。它仅用于在各个服务之间发送消息。它是公共队列,非事务性的。消息很小。 您需要公共队列吗?除非您需要远程管理,否则专用队列通常就足够了。您是否使用 DIRECT FormatNames 来处理队列或路径名? 其实我不需要公共队列,但是当系统创建时我不得不因为远程访问,但队列现在只被本地应用程序使用。我正在使用路径名来访问队列,但是队列的初始化只进行了一次,所以这不应该意味着太多吗? 一个简单的测试是从服务器中嗅出网络,看看是否有任何额外的流量用于查询 Active Directory。 顺便说一下,每秒 20 条消息确实非常慢。我的台式机将在 32 秒内收到 10,000 条 1kb 的可恢复消息。这没有对消息进行任何处理 - 只是从本地队列中接收它们。【参考方案2】:你为什么使用时间跨度? - 这是一件坏事,原因如下。
在开发服务和队列时,您需要以安全的方式进行编程。队列中的每个项目都会产生一个新线程。使用时间跨度会强制每个线程使用单个计时器事件线程。这些事件必须在事件线程中等待轮到它们。
标准是每个队列事件 1 个线程 - 这通常是您的 System.Messaging.ReceiveCompletedEventArgs 事件。另一个线程是您的 onStart 事件...
每秒 20 个线程或 20 次读取可能是正确的。通常,在线程池中,您一次只能在 .net 中生成 36 个线程。
我的建议是放弃计时器事件,让您的队列简单地处理数据。
做更多这样的事情;
namespace MessageService
public partial class MessageService : ServiceBase
public MessageService()
InitializeComponent();
private string MessageDirectory = ConfigurationManager.AppSettings["MessageDirectory"];
private string MessageQueue = ConfigurationManager.AppSettings["MessageQueue"];
private System.Messaging.MessageQueue messageQueue = null;
private ManualResetEvent manualResetEvent = new ManualResetEvent(true);
protected override void OnStart(string[] args)
// Create directories if needed
if (!System.IO.Directory.Exists(MessageDirectory))
System.IO.Directory.CreateDirectory(MessageDirectory);
// Create new message queue instance
messageQueue = new System.Messaging.MessageQueue(MessageQueue);
try
// Set formatter to allow ASCII text
messageQueue.Formatter = new System.Messaging.ActiveXMessageFormatter();
// Assign event handler when message is received
messageQueue.ReceiveCompleted +=
new System.Messaging.ReceiveCompletedEventHandler(messageQueue_ReceiveCompleted);
// Start listening
messageQueue.BeginReceive();
catch (Exception e)
protected override void OnStop()
//Make process synchronous before closing the queue
manualResetEvent.WaitOne();
// Clean up
if (this.messageQueue != null)
this.messageQueue.Close();
this.messageQueue = null;
public void messageQueue_ReceiveCompleted(object sender, System.Messaging.ReceiveCompletedEventArgs e)
manualResetEvent.Reset();
System.Messaging.Message completeMessage = null;
System.IO.FileStream fileStream = null;
System.IO.StreamWriter streamWriter = null;
string fileName = null;
byte[] bytes = new byte[2500000];
string xmlstr = string.Empty;
try
// Receive the message
completeMessage = this.messageQueue.EndReceive(e.AsyncResult);
completeMessage.BodyStream.Read(bytes, 0, bytes.Length);
System.Text.ASCIIEncoding ascii = new System.Text.ASCIIEncoding();
long len = completeMessage.BodyStream.Length;
int intlen = Convert.ToInt32(len);
xmlstr = ascii.GetString(bytes, 0, intlen);
catch (Exception ex0)
//Error converting message to string
【讨论】:
您是否可以参考一些文档以获取声明 - “您为什么使用时间跨度? - 这是一件坏事,这就是为什么。”快速寻找一些 doco,但找不到任何东西。以上是关于当有大量消息排队时,从 MSMQ 读取速度会变慢的主要内容,如果未能解决你的问题,请参考以下文章