线程 lambda 中的原子更新
Posted
技术标签:
【中文标题】线程 lambda 中的原子更新【英文标题】:Atomic update in threaded lambda 【发布时间】:2020-03-22 07:44:14 【问题描述】:我认为在线程内以这种方式更新原子值并不好(总和有时看起来不太好)
std::atomic<double> e(0);
auto worker = [&] (size_t begin, size_t end, std::atomic<double> & acc)
double ee = 0;
for(auto k = begin; k != end; ++k)
ee += something[k];
acc.store( acc.load() + ee );
;
std::vector<std::thread> threads(nbThreads);
const size_t grainsize = miniBatchSize / nbThreads;
size_t work_iter = 0;
for(auto it = std::begin(threads); it != std::end(threads) - 1; ++it)
*it = std::thread(worker, work_iter, work_iter + grainsize, std::ref(e));
work_iter += grainsize;
threads.back() = std::thread(worker, work_iter, miniBatchSize, std::ref(e));
for(auto&& i : threads)
i.join();
虽然使用锁保护似乎没问题
std::atomic<double> e(0);
std::mutex m;
auto worker = [&] (size_t begin, size_t end, std::atomic<double> & acc)
double ee = 0;
for(auto k = begin; k != end; ++k)
ee += something[k];
const std::lock_guard<std::mutex> lock(m);
acc.store( acc.load() + ee );
;
std::vector<std::thread> threads(nbThreads);
const size_t grainsize = miniBatchSize / nbThreads;
size_t work_iter = 0;
for(auto it = std::begin(threads); it != std::end(threads) - 1; ++it)
*it = std::thread(worker, work_iter, work_iter + grainsize, std::ref(e));
work_iter += grainsize;
threads.back() = std::thread(worker, work_iter, miniBatchSize, std::ref(e));
for(auto&& i : threads)
i.join();
我是对的,我在这里缺少什么?是 std::ref(e) 的问题吗?
【问题讨论】:
【参考方案1】:您希望加载和存储都作为原子操作发生。目前您的代码可以:
acc.store(acc.load() + ee);
现在假设一个线程在load()
执行后立即被中断(我们将加载的值称为acc_old
)。另一个线程做它的事情(并因此修改acc
),然后第一个线程再次运行。它不会重新加载acc
,因为它已经加载了它的值。所以这个线程现在将更新acc
以包含acc_old + ee
。并且 bam,错误的结果。
请改用fetch_add
或operator+=
。两者都保证整个加法操作的原子行为。即:
acc += ee; // or
acc.fetch_add(ee);
编辑:请注意,这些函数仅支持从 C++20 开始的浮点原子。对于整数类型,它们从 C++11 开始受支持。因此,如果您需要浮点,您可能必须坚持使用互斥锁。在这种情况下,我建议将双精度值和互斥锁包装在一个类中,这样就不会意外地以错误的方式使用它。
【讨论】:
【参考方案2】:问题出在一行:
acc.store( acc.load() + ee );
有2个操作加载和存储,在它们之间的间隔内,另一个线程可以改变值。
不幸的是 atomic 不支持 fetch_add。
你可以试试这个:
auto atomic_fetch_add = [](std::atomic<double>* obj, double arg)
auto expected = obj->load();
while (!atomic_compare_exchange_weak(obj, &expected, expected + arg))
;
return expected;
;
std::atomic<double> e(0);
auto worker = [&] (size_t begin, size_t end, std::atomic<double> & acc)
double ee = 0;
for(auto k = begin; k != end; ++k)
ee += something[k];
// acc.store( acc.load() + ee );
atomic_fetch_add(&acc, ee);
;
std::vector<std::thread> threads(nbThreads);
const size_t grainsize = miniBatchSize / nbThreads;
size_t work_iter = 0;
for(auto it = std::begin(threads); it != std::end(threads) - 1; ++it)
*it = std::thread(worker, work_iter, work_iter + grainsize, std::ref(e));
work_iter += grainsize;
threads.back() = std::thread(worker, work_iter, miniBatchSize, std::ref(e));
for(auto&& i : threads)
i.join();
虽然不能保证 atomic 不使用互斥锁,但您必须检查您的实现。
【讨论】:
以上是关于线程 lambda 中的原子更新的主要内容,如果未能解决你的问题,请参考以下文章
Java学习笔记—多线程(原子类,java.util.concurrent.atomic包,转载)