C ++中具有双缓冲区的单生产者,单消费者数据结构
Posted
技术标签:
【中文标题】C ++中具有双缓冲区的单生产者,单消费者数据结构【英文标题】:Single producer, single consumer data structure with double buffer in C++ 【发布时间】:2014-07-03 04:22:40 【问题描述】:我在 $work 有一个应用程序,我必须在两个按不同频率调度的实时线程之间移动。 (实际的调度超出了我的控制。)应用程序是硬实时的(其中一个线程必须驱动硬件接口),因此线程之间的数据传输应该是无锁和无等待的尽可能。
需要注意的是,只需要传输一个块数据:因为两个线程运行的速率不同,在慢速线程的两次唤醒之间会有两次快线程迭代完成的时候;在这种情况下,可以覆盖写入缓冲区中的数据,以便较慢的线程仅获取最新数据。
换句话说,代替队列,双缓冲解决方案就足够了。这两个缓冲区是在初始化期间分配的,读写线程可以调用该类的方法来获取指向其中一个缓冲区的指针。
C++ 代码:
#include <mutex>
template <typename T>
class ProducerConsumerDoubleBuffer
public:
ProducerConsumerDoubleBuffer()
m_write_busy = false;
m_read_idx = m_write_idx = 0;
~ProducerConsumerDoubleBuffer()
// The writer thread using this class must call
// start_writing() at the start of its iteration
// before doing anything else to get the pointer
// to the current write buffer.
T * start_writing(void)
std::lock_guard<std::mutex> lock(m_mutex);
m_write_busy = true;
m_write_idx = 1 - m_read_idx;
return &m_buf[m_write_idx];
// The writer thread must call end_writing()
// as the last thing it does
// to release the write busy flag.
void end_writing(void)
std::lock_guard<std::mutex> lock(m_mutex);
m_write_busy = false;
// The reader thread must call start_reading()
// at the start of its iteration to get the pointer
// to the current read buffer.
// If the write thread is not active at this time,
// the read buffer pointer will be set to the
// (previous) write buffer - so the reader gets the latest data.
// If the write buffer is busy, the read pointer is not changed.
// In this case the read buffer may contain stale data,
// it is up to the user to deal with this case.
T * start_reading(void)
std::lock_guard<std::mutex> lock(m_mutex);
if (!m_write_busy)
m_read_idx = m_write_idx;
return &m_buf[m_read_idx];
// The reader thread must call end_reading()
// at the end of its iteration.
void end_reading(void)
std::lock_guard<std::mutex> lock(m_mutex);
m_read_idx = m_write_idx;
private:
T m_buf[2];
bool m_write_busy;
unsigned int m_read_idx, m_write_idx;
std::mutex m_mutex;
;
为避免阅读器线程中的数据过时,有效负载结构已进行版本控制。 为了促进线程之间的双向数据传输,使用了上述怪物的两个实例,方向相反。
问题:
这个方案是线程安全的吗?如果坏了,在哪里? 可以在没有互斥锁的情况下完成吗?也许只有内存屏障或 CAS 指令? 可以做得更好吗?【问题讨论】:
我喜欢这个问题。我下班后看一下,明天某个时候回复你。 (我已经实现了其他无锁数据结构,我几乎可以肯定这可以毫不费力地快速完成。) 我也喜欢这个问题。到目前为止似乎是正确的。有有趣的约束,可能允许非常快速的实现。我也会有这个用处。但我还不确定一个好的(更好的)实现。 为什么不使用循环缓冲区?我想这会简化获取最新数据的问题。 好吧...这比我想象的要复杂 :-) 我正在执行一个实现,会及时通知您。 @Chad:如果您使用显式获取和释放内存屏障而不是默认的顺序一致的内存屏障,您将获得更好的性能(在 x86 上要好得多)。此外,%
会减慢速度——考虑将大小强制为 2 的幂,以便您可以改用&
。 Facebook's folly
queue,顺便说一句,具有相同的语义,但我提到的所有改进(他们使用 if
而不是 %
并依赖于分支预测):-)
【参考方案1】:
非常有趣的问题!比我最初想象的要复杂得多:-) 我喜欢无锁解决方案,所以我尝试在下面解决一个。
有很多方法可以考虑这个系统。你可以建模 它作为一个固定大小的循环缓冲区/队列(有两个条目),但随后 您失去更新下一个可用消费值的能力, 因为你不知道消费者是否已经开始阅读最近发布的 值或仍在(可能)读取前一个。所以额外的状态 需要超出标准环形缓冲区才能达到更佳 解决方案。
首先请注意,生产者始终可以安全地写入一个单元格 在任何给定时间点;如果消费者正在读取一个单元格,则 其他可以写。让我们称可以安全写入的单元格 “活动”单元格(可以潜在读取的单元格是任何单元格不是 活跃的)。只有在另一个单元不存在的情况下,才能切换活动单元 当前正在读取。
与始终可以写入的活动单元格不同,非活动单元格可以 只有当它包含一个值时才被读取;一旦该值被消耗,它就消失了。 (这意味着在激进的生产者的情况下避免了活锁;在某些情况下 点,消费者将清空一个单元格并将停止触摸单元格。一次 发生这种情况时,生产者肯定可以发布一个值,而在此之前, 如果消费者不在,它只能发布一个值(更改活动单元格) 读到一半。)
如果存在 一个可以使用的值,则只有消费者可以更改它 事实(无论如何,对于非活动单元);随后的生产可能会改变哪个单元格 处于活动状态且已发布值,但始终准备好读取一个值,直到 它被消耗了。
一旦生产者完成对活动单元格的写入,它可以“发布”这个值 更改哪个单元格是活动单元格(交换索引),前提是消费者是 不在阅读另一个单元格的过程中。如果消费者是在 读取另一个单元格,交换不会发生,但在这种情况下,消费者可以交换 在 它 完成读取值之后,前提是生产者不在 写入(如果是,生产者将在完成后交换)。 事实上,一般来说,消费者在读完之后总是可以交换的(如果它是唯一的 访问系统),因为消费者的虚假交换是良性的:如果存在 另一个单元格中的某些内容,然后交换将导致接下来读取该内容,如果 没有,交换没有任何影响。
所以,我们需要一个共享变量来跟踪活动单元格是什么,我们还需要一个 生产者和消费者都可以指示他们是否处于 手术。我们可以将这三种状态按顺序存储到一个原子变量中 能够一次(原子地)影响它们。 我们还需要一种让消费者检查是否有任何东西的方法 首先是非活动单元,并且两个线程都修改该状态 作为适当的。我尝试了其他一些方法,但最后最简单的方法就是 将此信息也包含在其他原子变量中。这让事情变得很多 推理起来更简单,因为系统中的所有状态变化都是原子的。
我想出了一个无等待的实现(无锁,所有操作都完成 在有限数量的指令中)。
代码时间!
#include <atomic>
#include <cstdint>
template <typename T>
class ProducerConsumerDoubleBuffer
public:
ProducerConsumerDoubleBuffer() : m_state(0)
~ProducerConsumerDoubleBuffer()
// Never returns nullptr
T* start_writing()
// Increment active users; once we do this, no one
// can swap the active cell on us until we're done
auto state = m_state.fetch_add(0x2, std::memory_order_relaxed);
return &m_buf[state & 1];
void end_writing()
// We want to swap the active cell, but only if we were the last
// ones concurrently accessing the data (otherwise the consumer
// will do it for us when *it's* done accessing the data)
auto state = m_state.load(std::memory_order_relaxed);
std::uint32_t flag = (8 << (state & 1)) ^ (state & (8 << (state & 1)));
state = m_state.fetch_add(flag - 0x2, std::memory_order_release) + flag - 0x2;
if ((state & 0x6) == 0)
// The consumer wasn't in the middle of a read, we should
// swap (unless the consumer has since started a read or
// already swapped or read a value and is about to swap).
// If we swap, we also want to clear the full flag on what
// will become the active cell, otherwise the consumer could
// eventually read two values out of order (it reads a new
// value, then swaps and reads the old value while the
// producer is idle).
m_state.compare_exchange_strong(state, (state ^ 0x1) & ~(0x10 >> (state & 1)), std::memory_order_release);
// Returns nullptr if there appears to be no more data to read yet
T* start_reading()
m_readState = m_state.load(std::memory_order_relaxed);
if ((m_readState & (0x10 >> (m_readState & 1))) == 0)
// Nothing to read here!
return nullptr;
// At this point, there is guaranteed to be something to
// read, because the full flag is never turned off by the
// producer thread once it's on; the only thing that could
// happen is that the active cell changes, but that can
// only happen after the producer wrote a value into it,
// in which case there's still a value to read, just in a
// different cell.
m_readState = m_state.fetch_add(0x2, std::memory_order_acquire) + 0x2;
// Now that we've incremented the user count, nobody can swap until
// we decrement it
return &m_buf[(m_readState & 1) ^ 1];
void end_reading()
if ((m_readState & (0x10 >> (m_readState & 1))) == 0)
// There was nothing to read; shame to repeat this
// check, but if these functions are inlined it might
// not matter. Otherwise the API could be changed.
// Or just don't call this method if start_reading()
// returns nullptr -- then you could also get rid
// of m_readState.
return;
// Alright, at this point the active cell cannot change on
// us, but the active cell's flag could change and the user
// count could change. We want to release our user count
// and remove the flag on the value we read.
auto state = m_state.load(std::memory_order_relaxed);
std::uint32_t sub = (0x10 >> (state & 1)) | 0x2;
state = m_state.fetch_sub(sub, std::memory_order_relaxed) - sub;
if ((state & 0x6) == 0 && (state & (0x8 << (state & 1))) == 1)
// Oi, we were the last ones accessing the data when we released our cell.
// That means we should swap, but only if the producer isn't in the middle
// of producing something, and hasn't already swapped, and hasn't already
// set the flag we just reset (which would mean they swapped an even number
// of times). Note that we don't bother swapping if there's nothing to read
// in the other cell.
m_state.compare_exchange_strong(state, state ^ 0x1, std::memory_order_relaxed);
private:
T m_buf[2];
// The bottom (lowest) bit will be the active cell (the one for writing).
// The active cell can only be switched if there's at most one concurrent
// user. The next two bits of state will be the number of concurrent users.
// The fourth bit indicates if there's a value available for reading
// in m_buf[0], and the fifth bit has the same meaning but for m_buf[1].
std::atomic<std::uint32_t> m_state;
std::uint32_t m_readState;
;
请注意,语义是这样的,即消费者永远不能两次读取给定值, 它读取的值总是比它读取的最后一个值更新。也是相当的 内存使用效率高(两个缓冲区,如您的原始解决方案)。我避免了 CAS 循环 因为它们通常比单个原子操作的竞争效率低。
如果你决定使用上面的代码,我建议你先为它写一些全面的(线程化的)单元测试。 和适当的基准。我确实测试了它,但只是勉强。如果您发现任何错误,请告诉我:-)
我的单元测试:
ProducerConsumerDoubleBuffer<int> buf;
std::thread producer([&]()
for (int i = 0; i != 500000; ++i)
int* item = buf.start_writing();
if (item != nullptr) // Always true
*item = i;
buf.end_writing();
);
std::thread consumer([&]()
int prev = -1;
for (int i = 0; i != 500000; ++i)
int* item = buf.start_reading();
if (item != nullptr)
assert(*item > prev);
prev = *item;
buf.end_reading();
);
producer.join();
consumer.join();
至于您最初的实现,我只是粗略地看了看(它更有趣 设计新东西,呵呵),但 david.pfx 的回答似乎解决了你的问题。
【讨论】:
谢谢!干得好!不幸的是,在我的应用程序中,希望阅读器 可以 读取相同的值两次(对于较慢的线程将数据发送到较快的线程的情况)。如果我要使用这个解决方案,我将不得不重新考虑快速线程的阅读器部分的实现。此外,我发现处理内部状态的魔法难以理解和脆弱。例如。重复调用这四种方法中的任何一种都会弄乱状态(我不知道这是否构成错误,但它至少是实现的薄弱环节)。 另外,在我看来,在某些情况下,整个状态变化并不是原子的:你获取 m_state 的值,用它做一些魔术,然后稍后再将它存储回来一些指令。跨度> @user:我实现了最强的设计约束。如果您希望读者继续阅读旧值直到发布新值,只需添加一个缓存项并在成功读取新值后设置它;然后当begin_reading
中的值不可用时,返回指向该值的指针而不是 nullptr(当然,前提是它首先被设置)。无需重新考虑;-)
@user:我同意位操作有点难以理解,但据我所知它是正确的。我不确定您所说的“重复调用”是什么意思,但是对 begin_x 的任何调用肯定应该与对 end_x 的调用匹配?至于状态变化,每个变化本身都是原子的,即使完整的状态变化分布在几个原子更新中。这是因为保证在用户计数高于零时不会进行交换。在任何一个时间点,整个状态都是有效的。我将添加我的单元测试供您阅读。
@user:是的,位操作很脆弱。但是 any 无锁数据结构一开始就非常脆弱——如果不考虑整个算法,就不能考虑任何一行——当多个涉及线程 :-) (当然,除非您正在编辑与同步无关的部分,例如添加缓存的读取值以使第一个值始终可供读取)阅读。)【参考方案2】:
是的,我认为它坏了。
如果读取器连续执行开始/结束/启动,它会将其读取索引更新为写入索引,并可能从写入索引读取数据,即使写入繁忙也是如此。
问题本质上是写入器不知道读取器将使用哪个缓冲区,因此写入器应确保两个缓冲区始终有效。如果需要任何时间将数据写入缓冲区,它就无法做到这一点[除非我误解了此处未显示的一些逻辑。]
是的,我认为它可以在没有锁的情况下使用 CAS 或等效逻辑来完成。我不会尝试在这个空间中表达算法。我确信它存在,但不是我可以第一次正确地写出来。一些网络搜索出现了一些看似合理的候选人。使用 CAS 的无等待 IPC 似乎是一个非常有趣的话题,也是一些研究的主题。
经过进一步思考,算法如下。你需要:
3 个缓冲区:一个供写入者使用,一个供读者使用,一个额外。 缓冲区是有序的:它们形成一个环(但请参阅注释)。 每个缓冲区的状态:空闲、已满、正在写入、正在读取。 一个可以检查缓冲区状态并在单个原子操作中有条件地将状态更改为不同值的函数。我将为此使用 CSET。作者:
Find the first buffer that is FREE or FULL
Fail: assert (should never fail, reader can only use one buffer)
CSET buffer to WRITING
Write into the buffer
CSET buffer to FULL
读者:
Find first buffer that is FULL
Fail: wait (writer may be slow)
CSET buffer to READING
Read and consume buffer
CSET buffer to FREE
注意:此算法不保证缓冲区按到达顺序严格处理,并且没有简单的更改可以做到这一点。如果这很重要,则应在缓冲区上使用由写入器设置的序列号来增强算法,以便读取器可以选择最近的缓冲区。
我将代码保留为实现细节。
CSET 函数很重要。它必须以原子方式测试特定共享内存位置是否等于预期值,如果是,则将其更改为新值。如果成功进行更改,则返回 true,否则返回 false。如果两个线程同时访问同一个位置(并且可能在不同的处理器上),实现必须避免竞争条件。
C++ 标准原子操作库包含一组 atomic_compare_exchange 函数,如果可用的话,它们应该可以达到目的。
【讨论】:
啊,你是对的。可以通过在 end_writing() 中添加对繁忙标志的检查来修补将写入缓冲区暴露给写入器的开始/结束/开始序列的问题,但这会导致不同的问题:对于某些时序组合是可能的阅读器线程永远不会获取任何新数据,因为它总是在编写器忙时被调用。 我认为您的 4 个状态缓冲区(FREE、WRITING、FULL、READING)只有原子状态的变化是 解决方案。但是恕我直言,应该只使用 2 个缓冲区,如果您想确保遵守排序,只需稍微修改“结束写入”即可:当将缓冲区设置为 FULL 时,如果另一个缓冲区也为 FULL,则应设置免费。这保证了在给定时间只有一个缓冲区可以是 FULL,并且与应同时处理只有一个缓冲区的假设相一致。 @SergeBallesta:要求是尽量减少等待。使用 3 个缓冲区,写入器从不等待,读取器从不等待,如果有数据。 @david.pfx ;你是对的,但是有 2 个缓冲区,作家永远不会等待:如果一个缓冲区正在读取而另一个是 FULL,则重新使用 FULL。我承认读取器可能永远不会看到它可以处理的缓冲区,但前提是写入器正在写入 - 因此缓冲区已经包含旧数据 - 并且读取器刚刚结束读取。 看来我确实需要 3 个缓冲区,如果我想保证无论作者处于活动状态,读者总是有一个缓冲区。但是,您确定“设置状态”操作是原子的就足够了吗?例如。读取器进程可能会在写入器找到它并将其状态设置为 WRITING 之间劫持一个 FULL 缓冲区。【参考方案3】:这里是使用 InterlockedExchangePointer()
和 SLIST 的版本。
此解决方案不支持重新读取最后一个缓冲区。但如果需要,可以通过副本和if( NULL == doubleBuffer.beginReader(...) ) use backup copy ...
在读者端完成。
这样做不是因为它很难添加,而是因为它不是很现实。想象一下,您的最后一个已知值变得越来越老——几秒钟、几天、几周。应用程序不太可能仍然想要使用它。因此,将重读功能纳入双缓冲区代码会降低应用程序的灵活性。
双缓冲区有 1 个读指针成员。每当调用 beginRead() 时,都会返回此值并自动替换为 NULL。可以将其视为“读者使用缓冲区。”
使用endRead()
,读取器返回缓冲区并将其添加到 SLIST,其中包含可用于写入操作的缓冲区。
最初,两个缓冲区都添加到 SLIST,读取指针为 NULL。
beginWrite()
从 SLIST 中弹出下一个可用缓冲区。由于endWrite()
的实现方式,该值永远不能为NULL。
最后同样重要的是,endWrite()
以原子方式将读取指针与返回的新写入缓冲区交换,如果读取指针不为 NULL,则将其推送到 SLIST。
因此,即使读取端从不读取,写入端也永远不会耗尽缓冲区。当阅读器阅读时,它会获得最新的已知值(一次!)。
如果有多个并发读取器或写入器,则此实现不安全。但这不是最初的目标。
在丑陋的一面,缓冲区需要是顶部带有一些 SLIST_HEADER 成员的结构。
这里是代码,但请记住,如果你的火星探测器降落在金星上,这不是我的错!
const size_t MAX_DATA_SIZE = 512;
typedef
//__declspec(align(MEMORY_ALLOCATION_ALIGNMENT))
struct DataItem_tag
SLIST_ENTRY listNode;
uint8_t data[MAX_DATA_SIZE];
size_t length;
DataItem_t;
class CDoubleBuffer
SLIST_HEADER m_writePointers;
DataItem_t m_buffers[2];
volatile DataItem_t *m_readPointer;
public:
CDoubleBuffer()
: m_writePointers()
, m_buffers()
, m_readPointer(NULL)
InitializeSListHead(&m_writePointers);
InterlockedPushEntrySList(&m_writePointers, &m_buffers[0].listNode);
InterlockedPushEntrySList(&m_writePointers, &m_buffers[1].listNode);
DataItem_t *beginRead()
DataItem_t *result = reinterpret_cast<DataItem_t*>(InterlockedExchangePointer((volatile PVOID*)&m_readPointer, NULL));
return result;
void endRead(DataItem_t *dataItem)
if (NULL != dataItem)
InterlockedPushEntrySList(&m_writePointers, &dataItem->listNode);
DataItem_t *beginWrite()
DataItem_t *result = reinterpret_cast<DataItem_t*>(InterlockedPopEntrySList(&m_writePointers));
return result;
void endWrite(DataItem_t *dataItem)
DataItem_t *oldReadPointer = reinterpret_cast<DataItem_t*>(InterlockedExchangePointer((volatile PVOID*)&m_readPointer, dataItem));
if (NULL != oldReadPointer)
InterlockedPushEntrySList(&m_writePointers, &oldReadPointer->listNode);
;
这里是它的测试代码。 (对于上面的代码和测试代码,您需要
CDoubleBuffer doubleBuffer;
DataItem_t *readValue;
DataItem_t *writeValue;
// nothing to read yet. Make sure NULL is returned.
assert(NULL == doubleBuffer.beginRead());
doubleBuffer.endRead(NULL); // we got nothing, we return nothing.
// First write without read
writeValue = doubleBuffer.beginWrite();
assert(NULL != writeValue); // if we get NULL here it is a bug.
writeValue->length = 0;
doubleBuffer.endWrite(writeValue);
// Second write without read
writeValue = doubleBuffer.beginWrite();
assert(NULL != writeValue); // if we get NULL here it is a bug.
writeValue->length = 1;
doubleBuffer.endWrite(writeValue);
// Third write without read - works because it reuses the old buffer for the new write.
writeValue = doubleBuffer.beginWrite();
assert(NULL != writeValue); // if we get NULL here it is a bug.
writeValue->length = 2;
doubleBuffer.endWrite(writeValue);
readValue = doubleBuffer.beginRead();
assert(NULL != readValue); // NULL would obviously be a terrible bug.
assert(2 == readValue->length); // We got the latest and greatest?
doubleBuffer.endRead(readValue);
readValue = doubleBuffer.beginRead();
assert(NULL == readValue); // We expect NULL here. Re-reading is not a feature of this implementation!
doubleBuffer.endRead(readValue);
【讨论】:
以上是关于C ++中具有双缓冲区的单生产者,单消费者数据结构的主要内容,如果未能解决你的问题,请参考以下文章