使用 ReceiveById 的糟糕的 MSMQ 性能

Posted

技术标签:

【中文标题】使用 ReceiveById 的糟糕的 MSMQ 性能【英文标题】:Awful MSMQ Performance Using ReceiveById 【发布时间】:2012-12-19 11:25:58 【问题描述】:

每秒只有 20 条消息!这就是我的全部!这是从队列中查看 50 条消息并使用 ReceiveById 并行接收它们的代码。队列中的消息总数为 500。我也测试了其他数字。但最高限制是每秒 20 条消息!我是不是完全不碍事?

编辑1:

1 - 我需要队列是可恢复的。但有趣的是,即使我将可恢复选项设置为 false;仍然是最高限制是 20 条消息/秒。

2 - 我不得不在这里使用 MSMQ,因为涉及到一些遗留应用程序。但如果这段代码是正确的,并且前 20 名的限制确实存在,我可以说服小组切换。因此,任何更换 MSMQ 的建议(基于实际经验)都非常受欢迎(请注意,我们需要保留我们的消息,以防出现任何类型的故障)。

3 - 我已将 ThreadPool 中的线程数设置为较高的数值,以防万一,但实际上在此代码中,它会导致创建 100 - 200 个线程。我测试了从 50 到 10000 的不同数字,没有任何区别。

4 - 在每个任务中创建一个新的 MessageQueue,因为 ReceiveById 不是线程安全的。

5 - 正如我们在代码中看到的,消息大小非常小;它只是一个字符串加上一个 int。

编辑 2:[非常奇怪的新结果]

我玩过这段代码的每一部分,发现:如果我在我的任务中注释掉 singleLocal.UseJournalQueue = false; 行,我每秒最多可以读取 1200 条消息。不令人印象深刻,但在我的情况下可以接受。 奇怪的是UseJournalQueue的默认值是false;为什么再次将其设置为 false 会在性能上产生如此大的差异?

static partial class Program

    static void Main(string[] args)
    
        ThreadPool.SetMaxThreads(15000, 30000);
        ThreadPool.SetMinThreads(10000, 20000);

        var qName = @".\private$\deep_den";

        if (!MessageQueue.Exists(qName))
        
            var q = MessageQueue.Create(qName);
        

        var single = new MessageQueue(qName);
        single.UseJournalQueue = false;
        single.DefaultPropertiesToSend.AttachSenderId = false;
        single.DefaultPropertiesToSend.Recoverable = true;
        single.Formatter = new XmlMessageFormatter(new[]  typeof(Data) );

        var count = 500;
        var watch = new Stopwatch();

        watch.Start();
        for (int i = 0; i < count; i++)
        
            var data = new Data  Name = string.Format("name_0", i), Value = i ;

            single.Send(new Message(data));
        
        watch.Stop();

        Console.WriteLine("sent 0 msec/message", watch.Elapsed.TotalMilliseconds / count);
        Console.WriteLine("sent 0 message/sec", count / watch.Elapsed.TotalSeconds);

        var enu = single.GetMessageEnumerator2();

        watch.Reset();
        watch.Start();
        while (Interlocked.Read(ref __counter) < count)
        
            var list = new List<Message>();
            var peekCount = 50;

            while (peekCount > 0 && enu.MoveNext(TimeSpan.FromMilliseconds(10)))
            
                try
                
                    list.Add(enu.Current);
                    peekCount--;
                
                catch (Exception ex2)
                
                    Trace.WriteLine(ex2.ToString());
                    break;
                
            

            var tlist = new List<Task>();
            foreach (var message in list)
            
                var stupid_closure = message;

                var t = new Task(() =>
                
                    using (var singleLocal = new MessageQueue(qName))
                    
                        singleLocal.UseJournalQueue = false;
                        singleLocal.DefaultPropertiesToSend.AttachSenderId = false;
                        singleLocal.DefaultPropertiesToSend.Recoverable = true;
                        singleLocal.Formatter = new XmlMessageFormatter(new[]  typeof(Data) );

                        try
                        
                            // processing the message and insert it into database
                            // workflow completed here, so we can safely remove the message from queue

                            var localM = singleLocal.ReceiveById(stupid_closure.Id);
                            var localSample = (Data)localM.Body;

                            Interlocked.Increment(ref __counter);
                            Console.WriteLine(Interlocked.Read(ref __counter));
                        
                        catch (MessageQueueException ex)  if (ex.MessageQueueErrorCode != MessageQueueErrorCode.IOTimeout) Trace.WriteLine(ex.ToString()); 
                        catch (Exception ex2)  Trace.WriteLine(ex2.ToString()); 
                    
                , TaskCreationOptions.PreferFairness);

                tlist.Add(t);
            

            foreach (var t in tlist) t.Start();

            Task.WaitAll(tlist.ToArray());

            list.Clear();
        
        watch.Stop();
        Console.WriteLine("rcvd 0 msec/message", watch.Elapsed.TotalMilliseconds / count);
        Console.WriteLine("rcvd 0 message/sec", count / watch.Elapsed.TotalSeconds);

        Console.WriteLine("press any key to continue ...");
        Console.ReadKey();
    
    static long __counter = 0;

【问题讨论】:

您要求线程池最少 10,000 个?这真的有效吗?似乎是一个非常高的数字。此外,每个任务都创建和使用自己的 MessageQueue 对象吗?尝试只运行一个线程作为实验。 @Chris O 感谢您的评论。我已编辑问题以反映您的观点。 为什么你偷看而不是从循环中拉出来?你的问题不是性能,你的问题是使用了一种糟糕的方法从数据库中取出消息。对于“可恢复”,这就是创建交易的目的。 @TomTom 我不知道如何引入 MSMQ。但是,如果您的意思是首先查看消息处理它,然后在事务中接收它;我不能这样做,因为流程部分 - 在我的项目中 - 需要约 50 毫秒(约 20 条消息/秒)并且我必须处理至少 100 条消息/秒。 @KavehShahbazian 如果您不需要基数,那么只需在工作线程中接收。 【参考方案1】:

卡维, 您正在使用的 MessageQueue 对象的构造函数将 UseJournalQueue 属性设置为 true,以防启用 Message Queuing 对象的日志设置。不知何故,它认为 .\private$\deep_den 的日志设置已启用。编辑 - 您使用的是预先创建的队列吗?

【讨论】:

队列在行 var q = MessageQueue.Create(qName);我已经检查了 var single = new MessageQueue(qName);将 UseJournalQueue 的默认值设置为 false (.NET 4.0 VS 2012)。【参考方案2】:

Kaveh,我在这里可能完全错了,但我认为你的问题是 XML 序列化。一旦创建了 XmlSerializer,它仍然可能很慢,但构造函数确实需要时间。

我建议要么完全删除序列化并将数据作为字符串读取,要么先创建一个 XmlSerializer 或 XmlMessageFormatter 并将其传递给线程。我会说要小心线程问题,但看起来你已经很好地掌握了这一点。

【讨论】:

我不这么认为,因为删除 [q].UseJournalQueue = false;它工作正常。我想知道为什么将其设置为 false 会在默认值为 false 时导致性能急剧下降。【参考方案3】:

在进行基准测试时,将代码保持在最低限度很重要,以避免背景噪音干扰测试。

不幸的是,您的测试非常嘈杂,以至于很难找到导致延迟的确切原因

不要使用线程。多线程很少有帮助,而且通常弊大于利。 只测试一件事。测试 ReceiveById 时不要使用 GetMessageEnumerator2,它的成本很高,您需要在最后将其从结果中删除。 只创建一次 MessageQueue 并重复使用它。我们只测试 ReceiveById 而不是创建新的 MessageQueue 类。

我重写了测试并收到了更好的结果 MSMQ 不是块上最快的队列,但它并不慢。

    var qName = @".\private$\deep_den";

    if (!MessageQueue.Exists(qName))
    
        var q = MessageQueue.Create(qName);
    

    var single = new MessageQueue(qName);
    single.UseJournalQueue = true;
    single.DefaultPropertiesToSend.AttachSenderId = false;
    single.DefaultPropertiesToSend.Recoverable = true;
    single.Formatter = new XmlMessageFormatter(new[]  typeof(Data) );

    var count = 500;
    var watch = new Stopwatch();

    watch.Start();
    for (int i = 0; i < count; i++)
    
        var data = new Data  Name = string.Format("name_0", i), Value = i ;

        single.Send(new Message(data));
    
    watch.Stop();

    Console.WriteLine("sent 0 msec/message", watch.Elapsed.TotalMilliseconds / count);
    Console.WriteLine("sent 0 message/sec", count / watch.Elapsed.TotalSeconds);

    var enu = single.GetMessageEnumerator2();

    watch.Reset();
    watch.Start();

    var queue = new MessageQueue(qName);
    queue.UseJournalQueue = true;
    queue.DefaultPropertiesToSend.AttachSenderId = false;
    queue.DefaultPropertiesToSend.Recoverable = true;
    queue.Formatter = new XmlMessageFormatter(new[]  typeof(Data) );

    List<Data> lst = new List<Data>();
    while (lst.Count != count && enu.MoveNext(TimeSpan.FromDays(1)))
    
        var message = queue.ReceiveById(enu.Current.Id);
        lst.Add((Data)message.Body);
    
    watch.Stop();
    Console.WriteLine("rcvd 0 msec/message", watch.Elapsed.TotalMilliseconds / count);
    Console.WriteLine("rcvd 0 message/sec", count / watch.Elapsed.TotalSeconds);

    Console.WriteLine("press any key to continue ...");
    Console.ReadKey();

【讨论】:

Ronen 的代码运行得非常快。 (我已将 UseJournalQueue 设置为 false)。我的结果贴在下面(i5 笔记本,普通磁盘,8GB 内存):发送 0.10515438 毫秒/消息发送 9509.82736049606 消息/秒 rcvd 0.1163098 毫秒/消息 rcvd 8597.727792499 消息/秒

以上是关于使用 ReceiveById 的糟糕的 MSMQ 性能的主要内容,如果未能解决你的问题,请参考以下文章

如何在C#中使用MSMQ

MSMQ消息队列的使用

私有或公共 MSMQ

如何在 C# 中使用 MSMQ

MSMQ 单(队列)对多(侦听器)方案

如何使用 PowerShell 从 MSMQ 消息队列中删除特定消息