c++11 多读取器/多写入器队列使用原子用于对象状态和永久递增索引

Posted

技术标签:

【中文标题】c++11 多读取器/多写入器队列使用原子用于对象状态和永久递增索引【英文标题】:c++11 multi-reader / multi-writer queue using atomics for object state and perpetual incremented indexes 【发布时间】:2016-06-09 22:02:22 【问题描述】:

我正在使用原子和循环缓冲区来实现多读取器线程、多写入器线程对象池。

很难调查,因为检测代码会导致错误消失!

模型

生产者(或编写者线程)向Ring 请求Element,以便“准备”元素。终止时,编写器线程会更改元素状态,以便阅读器可以“使用”它。之后,该元素再次可用于写入。

消费者(或读取器线程)向环请求对象以“读取”对象。 在“释放”对象之后,对象处于state::Ready 状态,例如可供读取器线程使用。 如果没有可用的对象,例如环中的下一个空闲对象不在state::Unused 状态,它可能会失败。

两个类,ElementRing

Element

要写入,写入线程必须成功地将_state 成员从state::Unused 交换为state::LockForWrite 完成后,编写器线程将状态强制为state::Ready(它应该是唯一处理此元素的) 要读取,rader 线程必须成功地将_state 成员从state::Ready 交换为state::LockForRead 完成后,阅读器线程将状态强制为state::Unused(它应该是唯一处理此元素的)

总结:

作者生命周期:state::Unused -> state::LockForWrite -> state::Ready 读者生命周期:state::Ready -> state::LockForRead -> state::Unused

Ring

的向量为 Element ,被视为循环缓冲区。 std::atomic<int64_t> _read, _write; 是用于通过以下方式访问元素的 2 个索引: _elems[ _write % _elems.size() ] 写手, _elems[ _read % _elems.size() ] 供读者阅读。

当阅读器成功LockForRead 一个对象时,_read 索引会增加。 当写入者成功LockForWrite 一个对象时,_write 索引会增加。

main

我们向向量中添加了一些共享相同Ring 的作者和读者线程。每个线程只是尝试 get_read 或 get_write 元素并在之后释放它们。

基于Element 转换,一切都应该没问题,但人们可以观察到环在某些时候被阻塞,就像因为环中的某些元素处于状态state::Ready_write % _elems.size() 索引指向它并且对称,环中的一些元素处于state::Unused 状态,_read % _elems.size() 索引指向它! 两者都 = 死锁

#include<atomic>
#include<vector>
#include<thread>
#include<iostream>
#include<cstdint>

typedef enum : int

    Unused, LockForWrite, Ready,  LockForRead
state;

class Element

    std::atomic<state> _state;
public:
    Element():_state(Unused) 

    // a reader need to successfully make the transition Ready => LockForRead
    bool lock_for_read()  state s = Ready; return _state.compare_exchange_strong(s, LockForRead); 
    void unlock_read()  state s = Unused; _state.store(s); 

    // a reader need to successfully make the transition Unused => LockForWrite
    bool lock_for_write()  state s = Unused; return _state.compare_exchange_strong(s, LockForWrite); 
    void unlock_write()  state s = Ready;  _state.store(s); 
;

class Ring

    std::vector<Element> _elems;
    std::atomic<int64_t> _read, _write;

public:
    Ring(size_t capacity)
        : _elems(capacity), _read(0), _write(0) 

    Element * get_for_read() 
        Element * ret = &_elems[ _read.load() % _elems.size() ];
        if (!ret->lock_for_read()) // if success, the object belongs to the caller thread as reader
            return NULL;
        _read.fetch_add(1); // success! incr _read index 
        return ret;
    
    Element * get_for_write() 
        Element * ret = &_elems[ _write.load() % _elems.size() ];
        if (!ret->lock_for_write())// if success, the object belongs to the caller thread as writer
            return NULL;
        _write.fetch_add(1); // success! incr _write index
        return ret;
    
    void release_read(Element* e)  e->unlock_read();
    void release_write(Element* e)  e->unlock_write();
;

int main()


    const int capacity = 10; // easy to process modulo[![enter image description here][1]][1]

    std::atomic<bool> stop=false;

    Ring ring(capacity);

    std::function<void()> writer_job = [&]()
    
        std::cout << "writer starting" << std::endl;
        Element * e;
        while (!stop)
        
            if (!(e = ring.get_for_write())) 
                continue;
            // do some real writer job ...
            ring.release_write(e);
        
    ;
    std::function<void()> reader_job = [&]()
    
        std::cout << "reader starting" << std::endl;
        Element * e;
        while (!stop)
        
            if (!(e = ring.get_for_read())) 
                continue;
            // do some real reader job ...
            ring.release_read(e);
        
    ;

    int nb_writers = 1;
    int nb_readers = 2;

    std::vector<std::thread> threads;
    threads.reserve(nb_writers + nb_readers);

    std::cout << "adding writers" << std::endl;
    while (nb_writers--)
        threads.push_back(std::thread(writer_job));

    std::cout << "adding readers" << std::endl; 
    while (nb_readers--)
        threads.push_back(std::thread(reader_job));

    // wait user key press, halt in debugger after 1 or 2 seconds
    // in order to reproduce problem and watch ring
    std::cin.get();

    stop = true;

    std::cout << "waiting all threads...\n";
    for (auto & th : threads)
        th.join();

    std::cout << "end" << std::endl;

这个“watch debugger screeshot”在运行 1 秒后暂停了程序。如您所见,_read 指向标记为state::Unused 的元素 8,因此除了写入者之外,没有任何转换可以解除对该读取器的阻塞,但_write 索引指向状态为 state::Ready 的元素 0!

我的问题:我错过了什么?从结构上讲,我确信序列是正确的,但我缺少一些原子技巧......

操作系统测试:rhel5/gcc 4.1.2、rhel 7/gcc 4.8、win10/ms visual 2015、win10/mingw

【问题讨论】:

尝试 TSAN 进行调查。 1 个阅读器也出现死锁?如果不是,请尝试在阅读器上使用互斥锁并从那里开始调查。 【参考方案1】:

Yann's answer 对这个问题的看法是正确的:如果读/写锁和索引增量之间存在延迟,您的线程可以通过乱序读写元素在序列中创建“洞”。修复是验证索引在初始读取和增量之间没有改变,la:

class Element

    std::atomic<state> _state;
public:
    Element():_state(Unused) 

    // a reader need to successfully make the transition Ready => LockForRead
    bool lock_for_read() 
        state s = Ready;
        return _state.compare_exchange_strong(s, LockForRead);
    
    void abort_read()  _state = Ready; 
    void unlock_read()  state s = Unused; _state.store(s); 

    // a reader need to successfully make the transition Unused => LockForWrite
    bool lock_for_write() 
        state s = Unused;
        return _state.compare_exchange_strong(s, LockForWrite);
    
    void abort_write()  _state = Unused; 
    void unlock_write()  state s = Ready;  _state.store(s); 
;

class Ring

    std::vector<Element> _elems;
    std::atomic<int64_t> _read, _write;

public:
    Ring(size_t capacity)
        : _elems(capacity), _read(0), _write(0) 

    Element * get_for_read() 
        auto i = _read.load();
        Element * ret = &_elems[ i % _elems.size() ];
        if (ret->lock_for_read()) 
            // if success, the object belongs to the caller thread as reader
            if (_read.compare_exchange_strong(i, i + 1))
                return ret;
            // Woops, reading out of order.
            ret->abort_read();
        
        return NULL;
    
    Element * get_for_write() 
        auto i = _write.load();
        Element * ret = &_elems[ i % _elems.size() ];
        if (ret->lock_for_write()) 
            // if success, the object belongs to the caller thread as writer
            if (_write.compare_exchange_strong(i, i + 1))
                return ret;
            // Woops, writing out of order.
            ret->abort_write();
        
        return NULL;
    
    void release_read(Element* e)  e->unlock_read();
    void release_write(Element* e)  e->unlock_write();
;

【讨论】:

Casey 的解决方案似乎没问题,但也许您可以简单地放宽读/写顺序,不共享 _read 和 _write。由于当数据不可用并且在 OP 解决方案中正确实现了 Element 的锁定时,您仍然在旋转,因此每个线程具有单独的读/写(如果不可用,只需增加模大小)可能会有所帮助。所以每个线程只是扫描列表试图获取锁。原子 int 可以帮助计算剩余的待办任务,以确保在所有完成后终止。共享任务队列也存在健壮和高效的解决方案,例如doc.utwente.nl/93342/1/lace.pdf【参考方案2】:

在两个共享计数器 _read 和 _write 的增量周围没有原子部分。 这对我来说看起来很糟糕,你可以毫无意义地切换另一个元素。

想象一下这个场景, 1 位读者 R1 和 1 位作者 W 合作愉快。

Reader 2 执行: Element * ret = &_elems[ _read.load() % _elems.size() ]; 并被推离cpu。

现在 R1 和 W 仍然一起玩,所以 _read 和 _write 的位置现在是任意的 w.r.t。 R2指向的元素ret。

现在在某个时间点 R2 被调度,碰巧 *ret_ 是可读的(同样可能,R1 和 W 绕着这个块转了几次)。

哎呀,如你所见,我们将读取它,并增加“_read”,但 _read 与 _ret 无关。这会创建一种孔,即尚未读取但低于 _read 索引的元素。

因此,创建临界区以确保 _read/_write 的增量在与实际锁相同的语义步骤中完成。

【讨论】:

以上是关于c++11 多读取器/多写入器队列使用原子用于对象状态和永久递增索引的主要内容,如果未能解决你的问题,请参考以下文章

多线程文件压缩

可以使用原子来减少在读取的主导多线程程序中的锁定吗?

用于多处理同步的类似队列的对象

iOS核心笔记——多线程-原子/非原子属性

Linux 上的管道读取是原子的吗(多个写入器,一个读取器)?

40.C++11多线程