为啥从队列中并行读取消息很慢?

Posted

技术标签:

【中文标题】为啥从队列中并行读取消息很慢?【英文标题】:Why it is very slow to read the message from the queue in parallel?为什么从队列中并行读取消息很慢? 【发布时间】:2019-11-16 16:10:21 【问题描述】:

我开发了一个 Windows 服务,它每秒在 MSMQ 中写入 1500 条消息。

在另一项服务中,我正在并行读取队列中的消息并进行处理。但是从队列中读取太慢了,我认为瓶颈是 queue.Receive(TimeSpan.Zero)

不知道为什么阅读慢?

我的服务在具有良好处理能力的服务器上运行。

这是我的代码。

static Task Main()

    GetFromQueueAsync();



private static Task GetFromQueueAsync()

    string queueName = ConfigurationManager.AppSettings["QueueName"].ToString();

    while (true)
    
        var blockArray = Enumerable.Range(0, 30).ToArray();

        Parallel.ForEach(blockArray, (i) =>
        
            MessageQueue queue = new MessageQueue(queueName);

            try
                             
                var message = queue.Receive(TimeSpan.Zero);
                
                message.Formatter = new BinaryMessageFormatter();
                var labelParts = message.Label.Split('_');

                var isValidMessageAddress=  Validate(labelParts);

                if (isValidMessageAddress)
                
                    //call my sysnc method
                
        
        catch (MessageQueueException mqex)
        
            if (mqex.MessageQueueErrorCode == MessageQueueErrorCode.IOTimeout)
            
                return;
            
            else throw;
        
    );

【问题讨论】:

我的主要问题是:为什么不在队列上使用常规循环,并并行运行实际工作?看来您可能会为进入队列而与自己抗争。 我不知道,你的想法是什么?我必须从队列中读取并处理消息。在队列中写入永远不会停止@John 在单个线程中从队列中读取,并将工作分派给其他线程。 将队列new MessageQueue(queueName); 的创建移到循环之外有什么不同吗? @TheodorZoulias 哇,是的。谢谢西奥多。 【参考方案1】:

问:不知道为什么阅读慢?

让我们首先同意,每个流程实现都有一些性能上限,即使使用无限多的功率也是无法打破的。

接下来,让我们注意,任何 Queue 的概念本质上都是 SERIAL,而不是 CONCURRENT(并发可能会被添加到纯-SERIAL-one-after-another-after-another 消息检索中,但是需要额外的成本,性能方面(吞吐量明显降低,从而成为能够接收 + 信号化 + 处理和控制许多(现在)CONCURRENT-read 请求,内部仍然是纯串行队列头端,用于安全的去队列过程和交付)和 延迟- 明智的(延迟明显增加,通常比原始的、复杂的 pure-SERIAL,垄断者独有的低延迟消息访问机制高很多倍)。

话虽如此,真正的PARALLEL 进程调度(all-read-all-messages-at-the-same-moment (all-served-at-once, synchronously, one may add))根本不会发生在单个队列实例头端,永远不会。

解决办法:

为了更快的吞吐量,最好的方法是在队列实例上保持纯串行、低延迟的头端处理,并将每条消息分派给其他工作线程以处理卸载的消息,独立于仍在(是的,仍然并且永远是)内部等待的其他消息纯-SERIAL消息队列。

有很多方法,如何调度已经卸载的消息内容以进行进一步处理,这种选择取决于您的架构和设计决策。 inproc://ipc:// (protocol-stack-less) 线程间或进程间通信方式都可以很好地利用数百个处理线程或如果您的应用程序需要增长和扩展性能,同时保持尽可能低的延迟,则足够多(并置甚至广泛重新分布)的消息内容处理过程。

当心所有附加成本仍然留在 while(true)...-无限循环中,任何这样的,重复的越多,就会退化 Amdahl's Law computed PARALLEL-code Speedup:

原样代码(由 @TheodorZoulias 报告)重复并重复所有实例化 - 确实没有理由重新实例化并丢弃并重新实例化所有 30 个提议的队列头端,每个每个循环运行。这似乎是有史以来最糟糕的步骤顺序和最糟糕的资源管理:

        while (true) //--------------------------------- INFINITE LOOP
                    //----- NEW ARRAY                CREATED PER-LOOP
            var blockArray = Enumerable.Range(0, 30).ToArray();
                     //----- NEW .ForEach()...      CREATED PER-LOOP                
            // |||||||||||||||||||||||||||||||||||||||||||||||||||||||PAR||| BLOCK
            Parallel.ForEach(blockArray, (i) =>    // ||||||||||||||||PAR||| BLOCK
                    //----- NEW QUEUE                CREATED PER-LOOP x PER-BLOCK
                MessageQueue queue = new MessageQueue(queueName);
                try
                   var message = queue.Receive(TimeSpan.Zero);
                    message.Formatter = new BinaryMessageFormatter();
                    var labelParts = message.Label.Split('_');
                    var isValidMessageAddress=  Validate(labelParts);
                    if (isValidMessageAddress)
                    
                     //--------------------- PER MESSAGE PAYLOAD PROCESSING START
                     // call my sysnc method
                     //--------------------- PER MESSAGE PAYLOAD PROCESSING END
                    
                
                catch (MessageQueueException mqex)
                   if (mqex.MessageQueueErrorCode == MessageQueueErrorCode.IOTimeout)
                       return; //-- <<<< ???
                      //------------ EACH IOTimeout KILLS +ONE POOL-MEMBER
                      //------------ 30th IOTimeout LEAVES THE POOL EMPTY
                    
                    else throw;
                    //-------------- NO ERROR HANDLING
                
            );
            // |||||||||||||||||||||||||||||||||||||||||||||||||||||||PAR||| BLOCK
         //-------------------------------------------- INFINITE-LOOP

可能需要重新制定(加上变量的重用/预分配可能会更有帮助):

         // |||||||||||||||||||||||||||||||||||||||||||||||||||||||||:PAR||| BLOCK
            Parallel.ForEach(blockArray, (i) =>    // |||||||||||||||:PAR||| BLOCK
                    //--- A QUEUE HEAD-END           CREATED ONLY ONCE! PER-BLOCK
                MessageQueue queue = new MessageQueue(queueName);  //:PAR||| BLOCK
                while (true) //----PER-AGENT's [SERIAL]READING LOOP//:PAR||| BLOCK
                                                                  //:PAR||| BLOCK
                    try      //----PER-AGENT's TRY               //:PAR||| BLOCK
                       var            message = queue.Receive( TimeSpan.Zero );
                                       message.Formatter = new BinaryMessageFormatter();
                        if ( Validate( message.Label.Split( '_' )))//:PAR||| BLOCK
                          //------PER-AGENT's---- PER MESSAGE     //:PAR||| BLOCK
                           // call my sysnc method                 //:PAR||| BLOCK
                           //------PER-AGENT's---- PER MESSAGE     //:PAR||| BLOCK
                                                                  //:PAR||| BLOCK
                                                                  //:PAR||| BLOCK
                    catch ...                                      //:PAR||| BLOCK
                    //-------------PER-AGENT's PER EXCEPTION HANDLING:PAR||| BLOCK
                    //                         WITHOUT ANY POOL AGENT:PAR||| BLOCK
                    //                         CANNIBALISATION       :PAR||| BLOCK
                 // --------------PER-AGENT's [SERIAL]READING LOOP  :PAR||| BLOCK
             //|||||||||||||||||||||||||||||||||||||||||||||||||||||:PAR||| BLOCK
         // |||||||||||||||||||||||||||||||||||||||||||||||||||||||||:PAR||| BLOCK

让更多线程用于移动更多消息 - 性能扩展

一旦代码设计概念清晰和健全,如果队列头端管理是问题,高性能消息传递框架允许增加队列专用 I/O 线程的数量。可实现的性能信封远高于每秒几条 k[msg/s] 到几条 ~ 10.000 ~ 100.000 ~ 1.000.000 消息,因此每秒有几千条消息排队等待取消排队绝对不是问题,如果实施了适当的工程(假设队列没有移动一些确实非常大的 BLOB-s,另外一些零复制/指针移动技巧将不得不发生,以保持速度)

【讨论】:

【参考方案2】:

这可能不会缩短您阅读消息的时间,但会为您节省一些使用 Parallel.ForEach 创建的工作线程,这些线程会阻塞,等待消息。您基本上可以订阅新消息并分派您在线程池上进行的处理,并最终拥有更多的同时订阅数量,Parallel.ForEach 将执行此操作。

    MessageQueue queue = new MessageQueue(queueName);
    private void pollQueue(string queueName, int numberOfSimultaneousRequests)
    
        for (int i = 0; i < numberOfSimultaneousRequests; i++)
        
            queue.BeginReceive(TimeSpan.MaxValue, null, ProcessMessage);
        
    

    private void ProcessMessage(IAsyncResult asyncResult)
    
        try
        
            var message = queue.EndReceive(asyncResult);
            //do something with the message here
            //OR
            ThreadPool.QueueUserWorkItem(o =>
            
                //do something with the message in another threadpool thread
            , message);
        
        catch (Exception ex)
        
            //handle error and log

        
        finally
        
            //subscribe again for another message
            queue.BeginReceive(TimeSpan.MaxValue, null, ProcessMessage);
        
    

【讨论】:

以上是关于为啥从队列中并行读取消息很慢?的主要内容,如果未能解决你的问题,请参考以下文章

从远程队列中读取消息

如何使用 Python 中的 stomp 库从队列中读取所有消息?

从 mule 中的队列/主题中读取消息

消息如何从 ActiveMQ 读取并传递到相应的队列?

Apache Flume - 由多个使用者从单个消息队列中提取数据

从 AWS Lambda 读取 SQS 队列