为啥在已经使用 seq_cst CAS 的无锁队列中需要 atomic_thread_fence(memory_order_seq_cst)?

Posted

技术标签:

【中文标题】为啥在已经使用 seq_cst CAS 的无锁队列中需要 atomic_thread_fence(memory_order_seq_cst)?【英文标题】:Why is atomic_thread_fence(memory_order_seq_cst) needed in a lock-free queue that already uses seq_cst CAS?为什么在已经使用 seq_cst CAS 的无锁队列中需要 atomic_thread_fence(memory_order_seq_cst)? 【发布时间】:2018-08-02 04:21:44 【问题描述】:

一个无锁队列,只有一个线程执行push和pop,其他线程执行steal。

但是,我不明白为什么steal() 需要std::atomic_thread_fence(std::memory_order_seq_cst)

在我看来,steal()只有一个store操作,即_top.compare_exchange_strong,它有memory_order_seq_cst。那么,为什么它还需要一个 seq_cst 栅栏呢?

template <typename T>
class WorkStealingQueue 
public:
    WorkStealingQueue() : _bottom(1), _top(1)  
    ~WorkStealingQueue()  delete [] _buffer; 

    int init(size_t capacity) 
        if (capacity & (capacity - 1)) 
            LOG(ERROR) << "Invalid capacity=" << capacity
                       << " which must be power of 2";
            return -1;
        

        _buffer = new(std::nothrow) T[capacity];
        _capacity = capacity;
        return 0;
    

    // Steal one item from the queue.
    // Returns true on stolen.
    // May run in parallel with push() pop() or another steal().
    bool steal(T* val) 
        size_t t = _top.load(std::memory_order_acquire);
        size_t b = _bottom.load(std::memory_order_acquire);
        if (t >= b) 
            // Permit false negative for performance considerations.
            return false;
        

        do 
            std::atomic_thread_fence(std::memory_order_seq_cst);
            b = _bottom.load(std::memory_order_acquire);
            if (t >= b) 
                return false;
            
            *val = _buffer[t & (_capacity - 1)];
         while (!_top.compare_exchange_strong(t, t + 1,
                                               std::memory_order_seq_cst,
                                               std::memory_order_relaxed));
        return true;
    

    // Pop an item from the queue.
    // Returns true on popped and the item is written to `val'.
    // May run in parallel with steal().
    // Never run in parallel with push() or another pop().
    bool pop(T* val) 
        const size_t b = _bottom.load(std::memory_order_relaxed);
        size_t t = _top.load(std::memory_order_relaxed);
        if (t >= b) 
            // fast check since we call pop() in each sched.
            // Stale _top which is smaller should not enter this branch.
            return false;
        

        const size_t newb = b - 1;
        _bottom.store(newb, std::memory_order_relaxed);
        std::atomic_thread_fence(std::memory_order_seq_cst);

        t = _top.load(std::memory_order_relaxed);
        if (t > newb) 
            _bottom.store(b, std::memory_order_relaxed);
            return false;
        

        *val = _buffer[newb & (_capacity - 1)];
        if (t != newb) 
            return true;
        

        // Single last element, compete with steal()
        const bool popped = _top.compare_exchange_strong(
            t, t + 1, std::memory_order_seq_cst, std::memory_order_relaxed);
        _bottom.store(b, std::memory_order_relaxed);
        return popped;
    

    // Push an item into the queue.
    // Returns true on pushed.
    // May run in parallel with steal().
    // Never run in parallel with pop() or another push().
    bool push(const T& x) 
        const size_t b = _bottom.load(std::memory_order_relaxed);
        const size_t t = _top.load(std::memory_order_acquire);
        if (b >= t + _capacity)  // Full queue.
            return false;
        

        _buffer[b & (_capacity - 1)] = x;
        _bottom.store(b + 1, std::memory_order_release);
        return true;
    

private:
    DISALLOW_COPY_AND_ASSIGN(WorkStealingQueue);

    std::atomic<size_t> _bottom;
    size_t _capacity;
    T* _buffer;
    std::atomic<size_t> BAIDU_CACHELINE_ALIGNMENT _top;
;

【问题讨论】:

看起来可能是在compare_exchange_strong(..., seq_cst, relaxed)的失败情况下订购负载。与仅使用 seq_cst 作为故障排序相比,这看起来是一种非常低效的解决问题的方法,而且我不明白为什么该线程中的早期存储需要在执行 _bottom.load 之前全局可见,尤其是第一个循环的时间。 你从哪里得到这个代码?你确定它必要的,而不仅仅是矫枉过正吗?看起来只是写得不好,例如无缘无故连续两次_bottom.load 代码来自:github.com/brpc/brpc/blob/master/src/bthread/… 也许作者只是把它放进去,发现它修复了一些使用它的代码中的错误(在某些平台上?),并没有寻找有效的解决方案。或者它可能是旧版本遗留下来的;我没有看git历史。我认为它非常接近于无操作。它可以使steal() 成为一个完整的内存屏障,如果第 2 次提前退出则不会如此。但是如果第一次提前退出,thread_fence 就不会运行。所以这很奇怪。看起来像次优代码,就像在第一个 CAS 之前加载 _bottom 两次一样。而且,使用compare_exchange_strong 也很奇怪。 这段代码是否被广泛使用/有什么理由期望它得到很好的优化?因为它对我来说并不好看,至少对于 x86 而言。在某些商店中使用thread_fencemo_releaseseq_cst 相比是非常不寻常的,但这就是poprelaxed 商店之间所做的事情。我没有看到好处,尤其是对于 AArch64, 【参考方案1】:

您不必使用 seq-cst-fence,但是您必须使 _bottom 上的操作顺序一致。原因是必须保证steal中的加载操作看到pop中写入的更新值。否则,您可能会出现竞争条件,即同一物品可能会被退回两次(一次来自弹出,一次来自窃取)。

为了比较,您可以查看我的 Chase-Lev-Deque 实现:https://github.com/mpoeter/xenium/blob/master/xenium/chase_work_stealing_deque.hpp

【讨论】:

以上是关于为啥在已经使用 seq_cst CAS 的无锁队列中需要 atomic_thread_fence(memory_order_seq_cst)?的主要内容,如果未能解决你的问题,请参考以下文章

是否存在多个读取或写入线程的无锁队列?

CAS无锁队列的实现

CAS 无锁队列

基于共享内存的无锁消息队列设计

在 Folly 的无锁 SPSC 队列中使用 std::memory_order_consume

Linux(程序设计):24---无锁CAS(附无锁队列的实现)