c++11 多读取器/多写入器队列使用原子用于对象状态和永久递增索引
Posted
技术标签:
【中文标题】c++11 多读取器/多写入器队列使用原子用于对象状态和永久递增索引【英文标题】:c++11 multi-reader / multi-writer queue using atomics for object state and perpetual incremented indexes 【发布时间】:2016-06-09 22:02:22 【问题描述】:我正在使用原子和循环缓冲区来实现多读取器线程、多写入器线程对象池。
很难调查,因为检测代码会导致错误消失!
模型
生产者(或编写者线程)向Ring
请求Element
,以便“准备”元素。终止时,编写器线程会更改元素状态,以便阅读器可以“使用”它。之后,该元素再次可用于写入。
消费者(或读取器线程)向环请求对象以“读取”对象。
在“释放”对象之后,对象处于state::Ready
状态,例如可供读取器线程使用。
如果没有可用的对象,例如环中的下一个空闲对象不在state::Unused
状态,它可能会失败。
两个类,Element
和 Ring
Element
:
_state
成员从state::Unused
交换为state::LockForWrite
完成后,编写器线程将状态强制为state::Ready
(它应该是唯一处理此元素的)
要读取,rader 线程必须成功地将_state
成员从state::Ready
交换为state::LockForRead
完成后,阅读器线程将状态强制为state::Unused
(它应该是唯一处理此元素的)
总结:
作者生命周期:state::Unused
-> state::LockForWrite
-> state::Ready
读者生命周期:state::Ready
-> state::LockForRead
-> state::Unused
Ring
Element
,被视为循环缓冲区。
std::atomic<int64_t> _read, _write;
是用于通过以下方式访问元素的 2 个索引:
_elems[ _write % _elems.size() ]
写手,
_elems[ _read % _elems.size() ]
供读者阅读。
当阅读器成功LockForRead
一个对象时,_read
索引会增加。
当写入者成功LockForWrite
一个对象时,_write
索引会增加。
main
:
我们向向量中添加了一些共享相同Ring
的作者和读者线程。每个线程只是尝试 get_read 或 get_write 元素并在之后释放它们。
基于Element
转换,一切都应该没问题,但人们可以观察到环在某些时候被阻塞,就像因为环中的某些元素处于状态state::Ready
,_write % _elems.size()
索引指向它并且对称,环中的一些元素处于state::Unused
状态,_read % _elems.size()
索引指向它! 两者都 = 死锁。
#include<atomic>
#include<vector>
#include<thread>
#include<iostream>
#include<cstdint>
typedef enum : int
Unused, LockForWrite, Ready, LockForRead
state;
class Element
std::atomic<state> _state;
public:
Element():_state(Unused)
// a reader need to successfully make the transition Ready => LockForRead
bool lock_for_read() state s = Ready; return _state.compare_exchange_strong(s, LockForRead);
void unlock_read() state s = Unused; _state.store(s);
// a reader need to successfully make the transition Unused => LockForWrite
bool lock_for_write() state s = Unused; return _state.compare_exchange_strong(s, LockForWrite);
void unlock_write() state s = Ready; _state.store(s);
;
class Ring
std::vector<Element> _elems;
std::atomic<int64_t> _read, _write;
public:
Ring(size_t capacity)
: _elems(capacity), _read(0), _write(0)
Element * get_for_read()
Element * ret = &_elems[ _read.load() % _elems.size() ];
if (!ret->lock_for_read()) // if success, the object belongs to the caller thread as reader
return NULL;
_read.fetch_add(1); // success! incr _read index
return ret;
Element * get_for_write()
Element * ret = &_elems[ _write.load() % _elems.size() ];
if (!ret->lock_for_write())// if success, the object belongs to the caller thread as writer
return NULL;
_write.fetch_add(1); // success! incr _write index
return ret;
void release_read(Element* e) e->unlock_read();
void release_write(Element* e) e->unlock_write();
;
int main()
const int capacity = 10; // easy to process modulo[![enter image description here][1]][1]
std::atomic<bool> stop=false;
Ring ring(capacity);
std::function<void()> writer_job = [&]()
std::cout << "writer starting" << std::endl;
Element * e;
while (!stop)
if (!(e = ring.get_for_write()))
continue;
// do some real writer job ...
ring.release_write(e);
;
std::function<void()> reader_job = [&]()
std::cout << "reader starting" << std::endl;
Element * e;
while (!stop)
if (!(e = ring.get_for_read()))
continue;
// do some real reader job ...
ring.release_read(e);
;
int nb_writers = 1;
int nb_readers = 2;
std::vector<std::thread> threads;
threads.reserve(nb_writers + nb_readers);
std::cout << "adding writers" << std::endl;
while (nb_writers--)
threads.push_back(std::thread(writer_job));
std::cout << "adding readers" << std::endl;
while (nb_readers--)
threads.push_back(std::thread(reader_job));
// wait user key press, halt in debugger after 1 or 2 seconds
// in order to reproduce problem and watch ring
std::cin.get();
stop = true;
std::cout << "waiting all threads...\n";
for (auto & th : threads)
th.join();
std::cout << "end" << std::endl;
这个“watch debugger screeshot”在运行 1 秒后暂停了程序。如您所见,_read
指向标记为state::Unused
的元素 8,因此除了写入者之外,没有任何转换可以解除对该读取器的阻塞,但_write
索引指向状态为 state::Ready
的元素 0!
我的问题:我错过了什么?从结构上讲,我确信序列是正确的,但我缺少一些原子技巧......
操作系统测试:rhel5/gcc 4.1.2、rhel 7/gcc 4.8、win10/ms visual 2015、win10/mingw
【问题讨论】:
尝试 TSAN 进行调查。 1 个阅读器也出现死锁?如果不是,请尝试在阅读器上使用互斥锁并从那里开始调查。 【参考方案1】:Yann's answer 对这个问题的看法是正确的:如果读/写锁和索引增量之间存在延迟,您的线程可以通过乱序读写元素在序列中创建“洞”。修复是验证索引在初始读取和增量之间没有改变,la:
class Element
std::atomic<state> _state;
public:
Element():_state(Unused)
// a reader need to successfully make the transition Ready => LockForRead
bool lock_for_read()
state s = Ready;
return _state.compare_exchange_strong(s, LockForRead);
void abort_read() _state = Ready;
void unlock_read() state s = Unused; _state.store(s);
// a reader need to successfully make the transition Unused => LockForWrite
bool lock_for_write()
state s = Unused;
return _state.compare_exchange_strong(s, LockForWrite);
void abort_write() _state = Unused;
void unlock_write() state s = Ready; _state.store(s);
;
class Ring
std::vector<Element> _elems;
std::atomic<int64_t> _read, _write;
public:
Ring(size_t capacity)
: _elems(capacity), _read(0), _write(0)
Element * get_for_read()
auto i = _read.load();
Element * ret = &_elems[ i % _elems.size() ];
if (ret->lock_for_read())
// if success, the object belongs to the caller thread as reader
if (_read.compare_exchange_strong(i, i + 1))
return ret;
// Woops, reading out of order.
ret->abort_read();
return NULL;
Element * get_for_write()
auto i = _write.load();
Element * ret = &_elems[ i % _elems.size() ];
if (ret->lock_for_write())
// if success, the object belongs to the caller thread as writer
if (_write.compare_exchange_strong(i, i + 1))
return ret;
// Woops, writing out of order.
ret->abort_write();
return NULL;
void release_read(Element* e) e->unlock_read();
void release_write(Element* e) e->unlock_write();
;
【讨论】:
Casey 的解决方案似乎没问题,但也许您可以简单地放宽读/写顺序,不共享 _read 和 _write。由于当数据不可用并且在 OP 解决方案中正确实现了 Element 的锁定时,您仍然在旋转,因此每个线程具有单独的读/写(如果不可用,只需增加模大小)可能会有所帮助。所以每个线程只是扫描列表试图获取锁。原子 int 可以帮助计算剩余的待办任务,以确保在所有完成后终止。共享任务队列也存在健壮和高效的解决方案,例如doc.utwente.nl/93342/1/lace.pdf【参考方案2】:在两个共享计数器 _read 和 _write 的增量周围没有原子部分。 这对我来说看起来很糟糕,你可以毫无意义地切换另一个元素。
想象一下这个场景, 1 位读者 R1 和 1 位作者 W 合作愉快。
Reader 2 执行: Element * ret = &_elems[ _read.load() % _elems.size() ]; 并被推离cpu。
现在 R1 和 W 仍然一起玩,所以 _read 和 _write 的位置现在是任意的 w.r.t。 R2指向的元素ret。
现在在某个时间点 R2 被调度,碰巧 *ret_ 是可读的(同样可能,R1 和 W 绕着这个块转了几次)。
哎呀,如你所见,我们将读取它,并增加“_read”,但 _read 与 _ret 无关。这会创建一种孔,即尚未读取但低于 _read 索引的元素。
因此,创建临界区以确保 _read/_write 的增量在与实际锁相同的语义步骤中完成。
【讨论】:
以上是关于c++11 多读取器/多写入器队列使用原子用于对象状态和永久递增索引的主要内容,如果未能解决你的问题,请参考以下文章