单一生产者单一消费者的无锁循环缓冲区

Posted

技术标签:

【中文标题】单一生产者单一消费者的无锁循环缓冲区【英文标题】:Lockless circular buffer with single producer singular consumer 【发布时间】:2019-07-25 16:07:53 【问题描述】:

我有一个不能锁定或分配内存的消费者线程和一个可以的生产者线程。 我想实现一个两个位置的循环缓冲区,以便能够从生产者向消费者线程提供数据,并且每当没有新数据可供消费时,消费者只会重新使用已经可用的数据。

这是我现在想出的:

bool newDataAvailable = false;
bool bufferEmpty = true;

foo* currentData = new foo();
foo* newData = new foo();

void consumer() 
  while(true) 
    currentData->doSomething();
    if(newDataAvailable) 
      foo* tmp = currentData;

      // Objects are swapped so the old one can be reused without additional allocations
      currentData = newData;
      newData = tmp;
      newDataAvailable = false;
      bufferEmpty = true;
    
  


void producer() 
  while(true) 
    while(!bufferEmpty)  wait(); 
    newData->init();
    bufferEmpty = false;
    newDataAvailable = true;
  

这种幼稚的实现可以吗?我知道读取和写入变量可以是非原子的,所以我应该使用原子存储,但那些会导致锁。这里需要使用原子存储吗? 另外,我想消除生产者中的主动等待,我想我可以使用std::condition_variable,但它们需要使用互斥锁,我买不起。

【问题讨论】:

【参考方案1】:

在不使用互斥锁的情况下编写共享变量的多线程代码非常困难。 见An Introduction to Lock-Free Programming,Lock Free Buffer。

如果您绝对必须避免使用互斥锁,那么我强烈建议您使用预制的无锁队列,例如Boost.lockfree 或 MPMCQueue 作为轻型非增强替代品。

我知道读取和写入变量可以是非原子的,所以我应该使用原子存储,但这些会导致锁定。

std::atomic 对于所有原始类型(不超过 CPU 的本机大小)通常是无锁的(不使用互斥锁)。 你可以通过调用std::atomic<T>::is_lock_free来检查std::atomic是否会为给定类型使用互斥锁

这里需要使用原子存储吗?

是的,当然。您需要使用互斥锁或原子。

另外,我想消除生产者中的主动等待,我想我可以使用 std::condition_variable

当您不能使用互斥锁时,您唯一的选择是使用自旋锁。 如果在您的上下文中允许,您可以在自旋锁中使用std::this_thread::yield() 以减少 CPU 负载。 (但是互斥锁可能会更快)

编辑: 只有 2 个原子的潜在解决方案是:

std::atomic<foo*> currentData = new foo();
std::atomic<foo*> newData = new foo();

void consumer() 
    foo* activeData = currentData;
    while (true) 
        activeData->doSomething();
        foo* newItem = currentData;
        if (newItem != activeData) 
            newData = activeData;
            activeData = newItem;
        
    


void producer() 
    while (true) 
        foo* reusedData = newData;
        if (!reusedData)
            continue;
        newData = nullptr;
        reusedData->init();
        currentData = reusedData;
    

【讨论】:

我不能使用互斥锁,因为消费者线程的优先级更高,等待生产者可能会导致优先级反转。你认为仅仅使用和 atomic bool(假设它实际上是无锁的)会使我的示例代码工作吗? @FrancescoBertolaccini 如果您将所有 4 个变量设为原子并正确安排加载/存储,它将是安全的。尽管请注意,使原子正确是非常困难的,并且可能会导致很多难以调试的问题。我强烈建议使用经过彻底测试的现有库。 @FrancescoBertolaccini 为什么不对消费者和生产者使用相同的优先级?由于它们的紧密耦合(消费者等待生产者生产下一个项目,生产者等待消费者处理该项目),他们无论如何都需要轮流。 因为消费者没有等待,它只是继续对旧数据进行操作。这是一个音频应用程序,所以如果消费者被阻止我会结结巴巴,但生产者可以随心所欲地提供数据,消费者会相应地进行调整 @FrancescoBertolaccini 好的,感谢您的详细解释 :) 如果您有任何问题,请告诉我 :) 原子的工作方式就像一个通道:消费者将其新项目推送到 currentData,之后,制片人会捡起它。然后生产者通过newData 变量将他的旧数据项发送回消费者。之后,消费者从newData 拿起物品,“翻新”它并通过currentData 将其推回生产者

以上是关于单一生产者单一消费者的无锁循环缓冲区的主要内容,如果未能解决你的问题,请参考以下文章

在 Folly 的无锁 SPSC 队列中使用 std::memory_order_consume

使用无锁队列(环形缓冲区)注意事项

C ++ 11中无锁的多生产者/消费者队列

在无锁实现中没有互斥锁的条件变量

dpdk无锁队列rte_ring实现分析

dpdk无锁队列rte_ring实现分析