使用 std::atomic<int> 索引对大型数组进行异步并行化操作有多安全
Posted
技术标签:
【中文标题】使用 std::atomic<int> 索引对大型数组进行异步并行化操作有多安全【英文标题】:How safe is using an std::atomic<int> index to asynchronously parallelize operation on large array 【发布时间】:2018-11-27 20:18:03 【问题描述】:我有一个并行例程来对大量指针中的每个对象执行一系列计算,其中计算要求每个线程能够读取所有其他对象,但只会写入一个对象。我设置它类似于下面的
#include <atomic>
#include <thread>
void threadFunction(Object** objects, int n);
std::atomic<int> idx;
int nobjects = 10000;
int main()
int nthreads = 4;
Object** objects = new Object*[nobjects];
idx = 0;
std::thread threads[nthreads];
for (int ii = 0; ii < nthreads; ii ++)
threads[ii] = std::thread(threadFunction, objects, ii);
while (idx < nobjects - 1) // Wait until all the calculations have been done
for (int ii = 0; ii < nthreads; ii ++)
threads[ii].join();
void threadFunction(Object** objects, int n)
Object* current = NULL;
while (idx < nobjects - 1)
current = objects[idx++];
// do calculation
其中Object
是一个自定义类,但可以替换为用于这些目的的原语。我的问题是这样做有多“安全”?我知道 atomic
类型可以防止部分写入,但我怎么能确定这每次都可以工作,即使是大型数组?
这可能太具体了,但如果我能得到任何帮助,我将不胜感激。谢谢
【问题讨论】:
读写顺序如何?根据读取和写入的顺序,您将获得不同的结果,除非您制作副本,在这种情况下不再有问题 这根本不安全。其他线程可能会在 while 循环的测试和数组访问之间启动。while
不能保证 idx
在范围内。
如果线程A在前,B将使用更新后的值,如果B在前,A将读取A更新的值
您在idx < nobjects - 1
和objects[idx++]
之间进行了比赛。当您获得索引时,条件可能不再成立,并且您可能会超出数组的边界。
这将对缓存效率非常不利。你应该给每个线程一个数组的一部分来处理,然后你就不需要任何同步了。如果你有 4 个线程和 1000 个对象,那么线程 1 在索引 0-249 上工作,2 在 250-499 上工作,3 在 500-749 上工作,4 在 750、999 上工作,保证你是安全的。
【参考方案1】:
正如其他人在 cmets 中指出的那样,在检查循环条件和使用 idx
的值之间存在竞争条件。这可能会导致您阅读超出数组末尾的内容。您的线程函数只需要稍作调整:
void threadFunction(Object** objects, int n)
Object* current = NULL;
while (true)
int next = idx++;
if (next < nobjects - 1)
current = objects[next];
// do calculation
else
break;
一般来说,证明无锁算法是正确的很难,只能通过仔细检查代码来完成。无论如何,数组的大小与该算法的正确性无关。
使用标准库
虽然没有特别提出这个问题,但可能值得指出的是,这一切都可以通过标准库来完成(它避免了棘手的安全问题并处理了分区等问题)。类似于以下内容:
void DoCalculations(Object& obj)
// details...
// later...
std::vector<std::unique_ptr<Object>> objects = CreateObjects();
std::for_each(
std::execution::par,
objects.begin(),
objects.end(),
[] (std::unique_ptr<Object> const& p) DoCalculations(*p); );
【讨论】:
FWIW,你可以使用像for(int index = idx++; index < nobjects - 1; index = idx++) current = objects[index]; // do calculation
这样的for循环以上是关于使用 std::atomic<int> 索引对大型数组进行异步并行化操作有多安全的主要内容,如果未能解决你的问题,请参考以下文章
std::atomic<int>:x.fetch_add(1) 和 x++ 之间的区别;
C++ error: use of deleted function ‘std::atomic<short unsigned int>::atomic(const std::atomic<short