从右到左 - 在链后递增作为回报

Posted

技术标签:

【中文标题】从右到左 - 在链后递增作为回报【英文标题】:right to left - increment after chain in return 【发布时间】:2021-01-29 15:18:27 【问题描述】:

我正在尝试构建单个生产者/单个消费者无锁线程安全环形缓冲区。

这是piece的代码:

#include <iostream>

struct RingBuffer 
    static constexpr size_t S = 2;
    
    int data[S];
    size_t start = 0;
    size_t end = 0;

    size_t mask(size_t i) const 
        return i & (S - 1);
    

    bool full() const 
        return end - start == S;
    
    size_t size() const 
        return end - start;
    
    
    void push(int t) 
        size_t i = mask(end);
        data[i] = t;
        end++;
    
    int shift() 
        return data[mask(start++)];
    
;
 
int main()

    RingBuffer ringBuffer;
    
    // emulate t1 that will write
    if (!ringBuffer.full()) 
        int dataToWrite = 10;
        ringBuffer.push(dataToWrite);
    
    
    // emulate t2 that will read
    if (ringBuffer.size() > 0) 
        int dataRead = ringBuffer.shift();
        std::cout << dataRead << std::endl;
    

缓冲区的写入将由单个 t1 线程执行。从缓冲区读取将由单个 t2 线程执行。

就我所学的而言,这里唯一的并发问题可能出在shift 方法中:

return data[mask(start++)];

因为操作的顺序必须是:

    用当前的start 值做mask() 在第 1 点返回的索引处返回 data[] 比,递增start

但代码实际上会执行 1-3-2,而不是 1-2-3。 问题是:

    用这种代码可以到 1-2-3 吗? 使用-O3 优化(gcc),可以改变操作的顺序,使整个未定义? (即在push() 上,将end++ 移动到data[i] = t 之前?)

【问题讨论】:

RingBuffer 的任何部分都不是线程安全的。从多个线程同时调用其方法的程序实际上可以保证包含数据竞争,因此会表现出未定义的行为,除非它提供自己的同步。简单地在一个线程中读取一个非原子变量并在另一个线程中写入它已经是一场数据竞争。例如,编译器可以简单地将 startend 放入寄存器中,并且不再查询它们的内存位置,因为无竞争的程序无法在背后修改它们。 @Igor Tandetnik 可以很容易地修复设置开始和结束原子。完毕。现在:它是线程安全的吗?请注意,我只有单个消费者/单个生产者,没有多个... 你想让shiftpush对称,最后加上start++。正如所写的那样,你增加得太早了,允许写入者在读取器尝试访问它时覆盖元素 - 数据竞争。除此之外,我觉得还可以。 @IgorTandetnik 你会如何解决这个问题?返回后如何增加“start++”? int value = data[mask(start)]; start++; return value; 【参考方案1】:

通常,使用线程安全的环形缓冲区,也需要以线程安全的方式访问它。在main() 中,对full() 的检查与push() 是分开的,这使得即使是完美的ringBuffer 也无法确保线程安全。由于一个生产者和一个消费者的限制,这种方法没有问题。但是,环形缓冲区本身的一致性取决于编写良好的生产者和消费者。例如,如果生产者连续 5 次调用 push() 而不在每次之前检查 full(),那么即使使用单个生产者和消费者,将会覆盖永远不会读取的值。 p>

解决线程安全问题

线程安全问题是双重的。首先,在一个线程中存储在data 中的值不一定在另一个线程中可见,这要归功于现代语言和CPU 的memory model。对于startend 描述的缓冲区的状态 也是如此。此外,对 state 的访问是不受保护的,这为编译器和 CPU 留下了进一步搞砸事情的空间,导致 race-conditions!

出于性能原因,允许编译器和 CPU 重新排序指令并在本地缓存内存中的值。这意味着您的程序不一定以与源代码相同的顺序执行,并且并非所有“内存”值都直接从内存加载或存储到内存。输入从多线程角度进行推理的需求,您就进入了一个非常有趣的专业领域。

幸运的是,C++ 提供了语言特性来修正这些问题。恕我直言,学习atomic 所提供的东西,尤其是atomic_thread_fence、atomic_flag 和atomic types,是值得的!

无锁实现

不使用锁需要权衡push()shift()“失败的频率比必要的多一点”以确保值不会被覆盖或不会读取垃圾。一般来说,这很复杂,很难解释。幸运的是,一个生产者和一个消费者的限制以及startend 只是增加,这种情况可以更容易地解决。

template<typename T, size_t S>
class SingleProducerSingleConsumerLockFreeQueue 
    static_assert(S > 0);

    std::atomic_size_t start = 0;
    std::atomic_size_t end = 0;
    T data[S];

    static size_t mask(size_t i) 
        return i % S; // compiler optimizes for S == 2^N
    

public:
    size_t size() const noexcept  return end - start; 
    bool empty() const noexcept  return size() == 0; 
    bool full() const noexcept  return size() == S; 

    /**
     * Pushes \c value on the queue, which fails if the queue is full.
     * @param value The value to push.
     * @returns \c true if push was successful, \c false otherwise
     */
    bool push(const T &value) 
        // First load the end offset that we potentially change.
        size_t wr = end.load();
        // Now load the start offset that we will not change
        size_t rd = start.load();
        // Check if the buffer is full. If the other thread calls shift()
        // and increases start, the worst thing that happens is that we
        // return false here while we *could* have written a value, which
        // is better than overwriting an value tat is not yet start.
        if (wr - rd >= S) 
            return false;
        
        data[mask(wr)] = value;
        // Now we update the end offset, where the fence makes sure
        // that information in the data array becomes visible for
        // the thread that uses shift after end.store().
        std::atomic_thread_fence(std::memory_order_release);
        end.store(wr + 1);
        return true;
    

    /**
     * Shifts a value off the queue into \c value, which fails if the queue is empty.
     * @param value Contains the shifted value on success.
     * @returns \c true if shift was successful, \c false otherwise
     */
    bool shift(T &value) 
        // First load the offset that we potentially change.
        size_t rd = start.load();
        // Now load the offset that we will not change
        size_t wr = end.load();
        // Check if the buffer is empty. If the other threads calls
        // push() and increases end, the worst thing that happens is
        // that we return false here and that we *could* have written
        // a value, which is better than reading a value not yet written.
        if (wr <= rd) 
            return false;
        
        // Ensure that information that was written by push() before
        // fetching end.load(), is visible here.
        std::atomic_thread_fence(std::memory_order_acquire);
        value = data[mask(rd)];
        // Update the start offset.
        start.store(rd + 1);
        return true;
    
;

这可以在main()中应用如下:

while (ringBuffer.push(10));
int readValue;
while (ringBuffer.shift(readValue))  
    std::cout << readValue << std::endl;

使用自旋锁?

根据您对“无锁”的定义,可以允许使用spinlock 来保护环形缓冲区状态,使用繁忙循环设置标志if并且只有在尚未设置时。

使用自旋锁的优点是生产者和消费者线程的数量并不重要:实现服务于更一般的用例。缺点是busy waiting。虽然忙碌等待通常被认为是一种糟糕的模式,但消费者和生产者的设计和上下文会极大地影响这是否是一个问题。想象一个完全无锁的环形缓冲区和一个消费者,如下所示:

while(true) 
  int read;
  while (!ringBuffer.shift(read)); // <!-- busy loop!
  do_something_with(read);

忙等待已从环形缓冲区本身转移到消费者代码,这仍然会暂停进度,直到有东西可用。在这种情况下,使用确实使用自旋锁的环形缓冲区可能没问题。

自旋锁用于保证只有一个线程可以改变环形缓冲区的状态,由start,end组成。它为此使用atomic_flag。为了确保写入data 元素的数据对其他线程可见,使用atomic_thread_fence。这是省略了full()size() 方法的代码。

// be sure to #include <atomic> 
class SingleProducerSingleConsumerSpinLockQueue 
  static constexpr size_t S = 2;

  size_t start = 0;
  size_t end = 0;
  std::atomic_flag flag = false; // flag that is used for a busy/spin lock
  int data[S];

  class SpinlockAndFence 
    std::atomic_flag &flag_;

  public:
    SpinlockAndFence(std::atomic_flag &flag) : flag_(flag) 
      // loop until we can set the flag, so other threads keep looping!
      while (flag_.test_and_set(std::memory_order_relaxed))
        ;
      // Ensure that all data that was written in the scope of a
      // SpinlockAndFence in another thread is visible to the scope of this
      // SpinlockAndFence.
      std::atomic_thread_fence(std::memory_order_acquire);
    
    ~SpinlockAndFence() 
      // Clear the flag to allow other threads to set it and ensure that all
      // data that was written in the scope of this SpinlockAndFence is visible
      // to the scope of a SpinlockAndFence in another thread.
      std::atomic_thread_fence(std::memory_order_release);
      flag_.clear(std::memory_order_relaxed);
    
  ;

  static size_t wrap(size_t i)  // static, as it does not use instance values.
    return i % S;  // Compiler optimizes % for & if S is a power of two.
  

public:
  size_t size() const noexcept  
    SpinlockAndFence fence(flag);
    return end - start; 
  
  bool empty() const noexcept  return size() == 0; 
  bool full() const noexcept  return size() == S; 

  /**
   * Pushes \c value on the queue, which fails if the queue is full.
   * @param value The value to push.
   * @returns \c true if push was successful, \c false otherwise
   */
  bool push(int value) 
    SpinlockAndFence fence(flag);
    if (end - start >= S) 
      return false;
     else 
      data[wrap(end++)] = value;
    
    return true;
  
  /**
   * Shifts a value off the queue into \c value, which fails if the queue is empty.
   * @param value Contains the shifted value on success.
   * @returns \c true if shift was successful, \c false otherwise
   */
  bool shift(int &value) 
    SpinlockAndFence fence(flag);
    if (end - start == 0) 
      return false;
     else 
      value = data[wrap(start++)];
    
    return true;
  
;

从实际的push()shift() 中拆分full()size() 可以工作。但它仍然需要在full()push() 中都加锁,因为该锁还使得变量startend 在两个线程中都可见。然而,另一种解决方案是公开 SpinlockAndFence,这会将很多责任放在调用站点。

【讨论】:

关于第一个陈述,这并不完全正确,取决于我所说的上下文:the check for full() is separate from push(), which makes it impossible for even a perfect ringBuffer to ensure thread-safety。如前所述,我的案例是单一的消费者/生产者场景。如果生产者检查它是否已满,一旦检查,它就永远不会在推送时“满”,因为它是唯一推送的演员。所以在这种情况下没有其他人可以“推动”,除了自己:) @markzz 我同意这并不完全正确,应该说明这是一个不好的模式一般来说,但在这种情况下是允许的。 ;-) 我不知道如果 wr - rd &gt;= S)wr &lt;= rd 的评估之后的所有内容都转移到单独的 fullempty 方法,代码是否会变得更清晰。加上未来出错的可能性,例如,如果单个消费者或单个生产者要求被删除。 @markzzz,我更改了第一段以反映您的回复。

以上是关于从右到左 - 在链后递增作为回报的主要内容,如果未能解决你的问题,请参考以下文章

IOS:UIViewAnimationTransitionCurlUp 从右到左

使用 CATransition 从右到左并返回?

从右到左滚动文字效果

从右到左滑动抽屉菜单

如何使用 CGAffineTransform Scale 从右到左添加动画和放大动画

带有从右到左单选按钮的离子警报