使用 ReceiveById 的糟糕的 MSMQ 性能
Posted
技术标签:
【中文标题】使用 ReceiveById 的糟糕的 MSMQ 性能【英文标题】:Awful MSMQ Performance Using ReceiveById 【发布时间】:2012-12-19 11:25:58 【问题描述】:每秒只有 20 条消息!这就是我的全部!这是从队列中查看 50 条消息并使用 ReceiveById 并行接收它们的代码。队列中的消息总数为 500。我也测试了其他数字。但最高限制是每秒 20 条消息!我是不是完全不碍事?
编辑1:
1 - 我需要队列是可恢复的。但有趣的是,即使我将可恢复选项设置为 false;仍然是最高限制是 20 条消息/秒。
2 - 我不得不在这里使用 MSMQ,因为涉及到一些遗留应用程序。但如果这段代码是正确的,并且前 20 名的限制确实存在,我可以说服小组切换。因此,任何更换 MSMQ 的建议(基于实际经验)都非常受欢迎(请注意,我们需要保留我们的消息,以防出现任何类型的故障)。
3 - 我已将 ThreadPool 中的线程数设置为较高的数值,以防万一,但实际上在此代码中,它会导致创建 100 - 200 个线程。我测试了从 50 到 10000 的不同数字,没有任何区别。
4 - 在每个任务中创建一个新的 MessageQueue,因为 ReceiveById 不是线程安全的。
5 - 正如我们在代码中看到的,消息大小非常小;它只是一个字符串加上一个 int。
编辑 2:[非常奇怪的新结果]
我玩过这段代码的每一部分,发现:如果我在我的任务中注释掉 singleLocal.UseJournalQueue = false; 行,我每秒最多可以读取 1200 条消息。不令人印象深刻,但在我的情况下可以接受。 奇怪的是UseJournalQueue的默认值是false;为什么再次将其设置为 false 会在性能上产生如此大的差异?
static partial class Program
static void Main(string[] args)
ThreadPool.SetMaxThreads(15000, 30000);
ThreadPool.SetMinThreads(10000, 20000);
var qName = @".\private$\deep_den";
if (!MessageQueue.Exists(qName))
var q = MessageQueue.Create(qName);
var single = new MessageQueue(qName);
single.UseJournalQueue = false;
single.DefaultPropertiesToSend.AttachSenderId = false;
single.DefaultPropertiesToSend.Recoverable = true;
single.Formatter = new XmlMessageFormatter(new[] typeof(Data) );
var count = 500;
var watch = new Stopwatch();
watch.Start();
for (int i = 0; i < count; i++)
var data = new Data Name = string.Format("name_0", i), Value = i ;
single.Send(new Message(data));
watch.Stop();
Console.WriteLine("sent 0 msec/message", watch.Elapsed.TotalMilliseconds / count);
Console.WriteLine("sent 0 message/sec", count / watch.Elapsed.TotalSeconds);
var enu = single.GetMessageEnumerator2();
watch.Reset();
watch.Start();
while (Interlocked.Read(ref __counter) < count)
var list = new List<Message>();
var peekCount = 50;
while (peekCount > 0 && enu.MoveNext(TimeSpan.FromMilliseconds(10)))
try
list.Add(enu.Current);
peekCount--;
catch (Exception ex2)
Trace.WriteLine(ex2.ToString());
break;
var tlist = new List<Task>();
foreach (var message in list)
var stupid_closure = message;
var t = new Task(() =>
using (var singleLocal = new MessageQueue(qName))
singleLocal.UseJournalQueue = false;
singleLocal.DefaultPropertiesToSend.AttachSenderId = false;
singleLocal.DefaultPropertiesToSend.Recoverable = true;
singleLocal.Formatter = new XmlMessageFormatter(new[] typeof(Data) );
try
// processing the message and insert it into database
// workflow completed here, so we can safely remove the message from queue
var localM = singleLocal.ReceiveById(stupid_closure.Id);
var localSample = (Data)localM.Body;
Interlocked.Increment(ref __counter);
Console.WriteLine(Interlocked.Read(ref __counter));
catch (MessageQueueException ex) if (ex.MessageQueueErrorCode != MessageQueueErrorCode.IOTimeout) Trace.WriteLine(ex.ToString());
catch (Exception ex2) Trace.WriteLine(ex2.ToString());
, TaskCreationOptions.PreferFairness);
tlist.Add(t);
foreach (var t in tlist) t.Start();
Task.WaitAll(tlist.ToArray());
list.Clear();
watch.Stop();
Console.WriteLine("rcvd 0 msec/message", watch.Elapsed.TotalMilliseconds / count);
Console.WriteLine("rcvd 0 message/sec", count / watch.Elapsed.TotalSeconds);
Console.WriteLine("press any key to continue ...");
Console.ReadKey();
static long __counter = 0;
【问题讨论】:
您要求线程池最少 10,000 个?这真的有效吗?似乎是一个非常高的数字。此外,每个任务都创建和使用自己的 MessageQueue 对象吗?尝试只运行一个线程作为实验。 @Chris O 感谢您的评论。我已编辑问题以反映您的观点。 为什么你偷看而不是从循环中拉出来?你的问题不是性能,你的问题是使用了一种糟糕的方法从数据库中取出消息。对于“可恢复”,这就是创建交易的目的。 @TomTom 我不知道如何引入 MSMQ。但是,如果您的意思是首先查看消息处理它,然后在事务中接收它;我不能这样做,因为流程部分 - 在我的项目中 - 需要约 50 毫秒(约 20 条消息/秒)并且我必须处理至少 100 条消息/秒。 @KavehShahbazian 如果您不需要基数,那么只需在工作线程中接收。 【参考方案1】:卡维, 您正在使用的 MessageQueue 对象的构造函数将 UseJournalQueue 属性设置为 true,以防启用 Message Queuing 对象的日志设置。不知何故,它认为 .\private$\deep_den 的日志设置已启用。编辑 - 您使用的是预先创建的队列吗?
【讨论】:
队列在行 var q = MessageQueue.Create(qName);我已经检查了 var single = new MessageQueue(qName);将 UseJournalQueue 的默认值设置为 false (.NET 4.0 VS 2012)。【参考方案2】:Kaveh,我在这里可能完全错了,但我认为你的问题是 XML 序列化。一旦创建了 XmlSerializer,它仍然可能很慢,但构造函数确实需要时间。
我建议要么完全删除序列化并将数据作为字符串读取,要么先创建一个 XmlSerializer 或 XmlMessageFormatter 并将其传递给线程。我会说要小心线程问题,但看起来你已经很好地掌握了这一点。
【讨论】:
我不这么认为,因为删除 [q].UseJournalQueue = false;它工作正常。我想知道为什么将其设置为 false 会在默认值为 false 时导致性能急剧下降。【参考方案3】:在进行基准测试时,将代码保持在最低限度很重要,以避免背景噪音干扰测试。
不幸的是,您的测试非常嘈杂,以至于很难找到导致延迟的确切原因
不要使用线程。多线程很少有帮助,而且通常弊大于利。 只测试一件事。测试 ReceiveById 时不要使用 GetMessageEnumerator2,它的成本很高,您需要在最后将其从结果中删除。 只创建一次 MessageQueue 并重复使用它。我们只测试 ReceiveById 而不是创建新的 MessageQueue 类。我重写了测试并收到了更好的结果 MSMQ 不是块上最快的队列,但它并不慢。
var qName = @".\private$\deep_den";
if (!MessageQueue.Exists(qName))
var q = MessageQueue.Create(qName);
var single = new MessageQueue(qName);
single.UseJournalQueue = true;
single.DefaultPropertiesToSend.AttachSenderId = false;
single.DefaultPropertiesToSend.Recoverable = true;
single.Formatter = new XmlMessageFormatter(new[] typeof(Data) );
var count = 500;
var watch = new Stopwatch();
watch.Start();
for (int i = 0; i < count; i++)
var data = new Data Name = string.Format("name_0", i), Value = i ;
single.Send(new Message(data));
watch.Stop();
Console.WriteLine("sent 0 msec/message", watch.Elapsed.TotalMilliseconds / count);
Console.WriteLine("sent 0 message/sec", count / watch.Elapsed.TotalSeconds);
var enu = single.GetMessageEnumerator2();
watch.Reset();
watch.Start();
var queue = new MessageQueue(qName);
queue.UseJournalQueue = true;
queue.DefaultPropertiesToSend.AttachSenderId = false;
queue.DefaultPropertiesToSend.Recoverable = true;
queue.Formatter = new XmlMessageFormatter(new[] typeof(Data) );
List<Data> lst = new List<Data>();
while (lst.Count != count && enu.MoveNext(TimeSpan.FromDays(1)))
var message = queue.ReceiveById(enu.Current.Id);
lst.Add((Data)message.Body);
watch.Stop();
Console.WriteLine("rcvd 0 msec/message", watch.Elapsed.TotalMilliseconds / count);
Console.WriteLine("rcvd 0 message/sec", count / watch.Elapsed.TotalSeconds);
Console.WriteLine("press any key to continue ...");
Console.ReadKey();
【讨论】:
Ronen 的代码运行得非常快。 (我已将 UseJournalQueue 设置为 false)。我的结果贴在下面(i5 笔记本,普通磁盘,8GB 内存):发送 0.10515438 毫秒/消息发送 9509.82736049606 消息/秒 rcvd 0.1163098 毫秒/消息 rcvd 8597.727792499 消息/秒以上是关于使用 ReceiveById 的糟糕的 MSMQ 性能的主要内容,如果未能解决你的问题,请参考以下文章