在类似 LMAX 的破坏者模式中,您如何处理缓慢的消费者?

Posted

技术标签:

【中文标题】在类似 LMAX 的破坏者模式中,您如何处理缓慢的消费者?【英文标题】:In an LMAX disruptor like pattern, how do you handle a slow consumer? 【发布时间】:2014-08-12 04:36:36 【问题描述】:

我有一个问题,如果在 x86 Linux 上运行有多个生产者和单个消费者的 lmax 中断器(如环形缓冲区)中的消费者缓慢,该怎么办。使用类似环形缓冲区的 lmax 模式,您会不断地覆盖数据,但如果消费者速度很慢怎么办。因此,您如何处理在 10 大小的环形缓冲区 0-9 环形槽中,您的消费者位于槽 5 并且现在您的编写者准备开始写入槽 15,这也是缓冲区中的槽 5(即:槽5 = 15 % 10)?处理此问题的典型方法是什么,使得编写者仍然按传入的顺序生成数据,而客户端将以相同的顺序接收数据?这真的是我的问题。下面是关于我的设计的一些细节,它工作正常,只是我目前没有一个好的方法来处理这个问题。有多个线程在写,一个线程在读/p>

设计细节

我有一个环形缓冲区,该设计目前有多个生产者线程和一个消费者线程。这部分设计是存在的,目前无法更改。我正在尝试使用无锁环形缓冲区删除现有的排队系统。我所拥有的如下。

代码在 x86 Linux 上运行,编写器运行多个线程,读取器运行单个线程。读取器和写入器分开一个插槽并且是std::atomic<uint64_t>,因此读取器从插槽0 开始,写入器从插槽1 开始,然后每个写入器首先通过调用@987654323 在写入器序列上执行原子fetch_add(1, std::memory_order::memory_order_acq_rel) 来首先声明一个插槽@ 如下所示,然后使用 compare_and_swap 循环更新阅读器序列,让客户端知道此插槽可用,请参阅 updateSequence

 inline data_type incrementSequence()                                                                                        
        return m_sequence.fetch_add(1,std::memory_order::memory_order_seq_cst);                                                  
       


void updateSequence(data_type aOld, data_type aNew)                                                                         
        while ( !m_sequence.compare_exchange_weak(aOld, aNew, std::memory_order::memory_order_release, std::memory_order_relaxed)
            if  (sequence() < aNew)                                                                                             
                continue;                                                                                                        
                                                                                                                                
            break;                                                                                                               
                                                                                                                                
                       
 inline data_type sequence() const                                                                                           
        return m_sequence.load(std::memory_order::memory_order_acquire);                                                         
           
      

【问题讨论】:

【参考方案1】:

环形缓冲区(或一般的 FIFO——不一定要实现为环形缓冲区)旨在消除突发流量。即使生产者可能会突发地产生数据,但消费者可以处理稳定的输入流。

如果 FIFO 溢出,则意味着以下两种情况之一:

    您的爆发比您计划的要大。通过增加 FIFO 大小(或使其大小动态化)来解决此问题。 您的生产者跑赢了您的消费者。通过增加用于消耗数据的资源(可能更多线程)来解决此问题。

在我看来,您目前正处于第二阶段:您的单一消费者根本不够快,无法跟上生产者的步伐。在这种情况下,唯一真正的选择是通过优化单个消费者或添加更多消费者来加速消费。

这也听起来有点好像您的消费者在处理数据时可能会将他们的输入数据留在 FIFO 中,因此 FIFO 中的位置一直被占用直到消费者完成 处理该输入。如果是这样,您可以通过简单地让消费者在它开始处理时从 FIFO 中删除输入数据来解决您的问题。这会释放该插槽,以便生产者可以继续将输入放入缓冲区。

还有一点:使 FIFO 大小动态化可能是个问题。问题很简单:它可以掩盖这样一个事实,即您确实存在第二个问题,即没有必要的资源来处理消费者端的数据。

假设生产者和消费者都是线程池,平衡系统的最简单方法通常是使用固定大小的 FIFO。如果生产者开始远远领先于消费者以至于 FIFO 溢出,那么生产者开始阻塞。这让消费者线程池消耗更多计算资源(例如,在更多内核上运行)以赶上生产者。但是,这确实取决于能够添加更多消费者,而不是将系统限制为单个消费者。

【讨论】:

以上是关于在类似 LMAX 的破坏者模式中,您如何处理缓慢的消费者?的主要内容,如果未能解决你的问题,请参考以下文章

您如何处理公共 git 存储库中的敏感数据?

您如何处理 fetchxml 结果数据?

您如何处理模型类中的外键关系

您如何处理深度链接插件中的简历场景?

您如何处理排序、分页和过滤的参数?

您如何处理 Spring Data JPA 中 ID 数组的批量删除?