线程 lambda 中的原子更新

Posted

技术标签:

【中文标题】线程 lambda 中的原子更新【英文标题】:Atomic update in threaded lambda 【发布时间】:2020-03-22 07:44:14 【问题描述】:

我认为在线程内以这种方式更新原子值并不好(总和有时看起来不太好)

    std::atomic<double> e(0);

    auto worker = [&] (size_t begin, size_t end, std::atomic<double> & acc) 
      double ee = 0;
      for(auto k = begin; k != end; ++k) 
        ee += something[k];
      
      acc.store( acc.load() + ee );
    ;

    std::vector<std::thread> threads(nbThreads);
    const size_t grainsize = miniBatchSize / nbThreads;

    size_t work_iter = 0;
    for(auto it = std::begin(threads); it != std::end(threads) - 1; ++it) 
      *it = std::thread(worker, work_iter, work_iter + grainsize, std::ref(e));
      work_iter += grainsize;
    
    threads.back() = std::thread(worker, work_iter, miniBatchSize, std::ref(e));

    for(auto&& i : threads) 
      i.join();
    

虽然使用锁保护似乎没问题

    std::atomic<double> e(0);
    std::mutex m;

    auto worker = [&] (size_t begin, size_t end, std::atomic<double> & acc) 
      double ee = 0;
      for(auto k = begin; k != end; ++k) 
        ee += something[k];
      
      
          const std::lock_guard<std::mutex> lock(m);
          acc.store( acc.load() + ee );
      
    ;

    std::vector<std::thread> threads(nbThreads);
    const size_t grainsize = miniBatchSize / nbThreads;

    size_t work_iter = 0;
    for(auto it = std::begin(threads); it != std::end(threads) - 1; ++it) 
      *it = std::thread(worker, work_iter, work_iter + grainsize, std::ref(e));
      work_iter += grainsize;
    
    threads.back() = std::thread(worker, work_iter, miniBatchSize, std::ref(e));

    for(auto&& i : threads) 
      i.join();
    

我是对的,我在这里缺少什么?是 std::ref(e) 的问题吗?

【问题讨论】:

【参考方案1】:

您希望加载和存储都作为原子操作发生。目前您的代码可以:

acc.store(acc.load() + ee);

现在假设一个线程在load() 执行后立即被中断(我们将加载的值称为acc_old)。另一个线程做它的事情(并因此修改acc),然后第一个线程再次运行。它不会重新加载acc,因为它已经加载了它的值。所以这个线程现在将更新acc 以包含acc_old + ee。并且 bam,错误的结果。

请改用fetch_addoperator+=。两者都保证整个加法操作的原子行为。即:

acc += ee; // or
acc.fetch_add(ee);

编辑:请注意,这些函数仅支持从 C++20 开始的浮点原子。对于整数类型,它们从 C++11 开始受支持。因此,如果您需要浮点,您可能必须坚持使用互斥锁。在这种情况下,我建议将双精度值和互斥锁包装在一个类中,这样就不会意外地以错误的方式使用它。

【讨论】:

【参考方案2】:

问题出在一行:

acc.store( acc.load() + ee );

有2个操作加载和存储,在它们之间的间隔内,另一个线程可以改变值。

不幸的是 atomic 不支持 fetch_add。

你可以试试这个:

    auto atomic_fetch_add = [](std::atomic<double>* obj, double arg)
    
        auto expected = obj->load();
        while (!atomic_compare_exchange_weak(obj, &expected, expected + arg))
            ;
        return expected;
    ;

    std::atomic<double> e(0);

    auto worker = [&] (size_t begin, size_t end, std::atomic<double> & acc) 
      double ee = 0;
      for(auto k = begin; k != end; ++k) 
        ee += something[k];
      
      // acc.store( acc.load() + ee );
      atomic_fetch_add(&acc, ee);
    ;

    std::vector<std::thread> threads(nbThreads);
    const size_t grainsize = miniBatchSize / nbThreads;

    size_t work_iter = 0;
    for(auto it = std::begin(threads); it != std::end(threads) - 1; ++it) 
      *it = std::thread(worker, work_iter, work_iter + grainsize, std::ref(e));
      work_iter += grainsize;
    
    threads.back() = std::thread(worker, work_iter, miniBatchSize, std::ref(e));

    for(auto&& i : threads) 
      i.join();
    

虽然不能保证 atomic 不使用互斥锁,但您必须检查您的实现。

【讨论】:

以上是关于线程 lambda 中的原子更新的主要内容,如果未能解决你的问题,请参考以下文章

第七章 Java中的13个原子操作类

多线程与并发Java中的12个原子操作类

Java学习笔记—多线程(原子类,java.util.concurrent.atomic包,转载)

Java并发多线程编程——原子类AtomicInteger的ABA问题及原子更新引用

JAVA多线程学习九-原子性操作类的应用

使用原子和互斥锁 c++ 在类内部进行线程化