使用 std::atomic<int> 索引对大型数组进行异步并行化操作有多安全

Posted

技术标签:

【中文标题】使用 std::atomic<int> 索引对大型数组进行异步并行化操作有多安全【英文标题】:How safe is using an std::atomic<int> index to asynchronously parallelize operation on large array 【发布时间】:2018-11-27 20:18:03 【问题描述】:

我有一个并行例程来对大量指针中的每个对象执行一系列计算,其中计算要求每个线程能够读取所有其他对象,但只会写入一个对象。我设置它类似于下面的

#include <atomic>
#include <thread>

void threadFunction(Object** objects, int n);

std::atomic<int> idx;
int nobjects = 10000;

int main() 
  int nthreads = 4;
  Object** objects = new Object*[nobjects];

  idx = 0;
  std::thread threads[nthreads];
  for (int ii = 0; ii < nthreads; ii ++) 
    threads[ii] = std::thread(threadFunction, objects, ii);
  

  while (idx < nobjects - 1)     // Wait until all the calculations have been done

  for (int ii = 0; ii < nthreads; ii ++) 
    threads[ii].join();
  


void threadFunction(Object** objects, int n) 
  Object* current = NULL;
  while (idx < nobjects - 1) 
    current = objects[idx++];
    // do calculation
  

其中Object 是一个自定义类,但可以替换为用于这些目的的原语。我的问题是这样做有多“安全”?我知道 atomic 类型可以防止部分写入,但我怎么能确定这每次都可以工作,即使是大型数组?

这可能太具体了,但如果我能得到任何帮助,我将不胜感激。谢谢

【问题讨论】:

读写顺序如何?根据读取和写入的顺序,您将获得不同的结果,除非您制作副本,在这种情况下不再有问题 这根本不安全。其他线程可能会在 while 循环的测试和数组访问之间启动。 while 不能保证 idx 在范围内。 如果线程A在前,B将使用更新后的值,如果B在前,A将读取A更新的值 您在idx &lt; nobjects - 1objects[idx++] 之间进行了比赛。当您获得索引时,条件可能不再成立,并且您可能会超出数组的边界。 这将对缓存效率非常不利。你应该给每个线程一个数组的一部分来处理,然后你就不需要任何同步了。如果你有 4 个线程和 1000 个对象,那么线程 1 在索引 0-249 上工作,2 在 250-499 上工作,3 在 500-749 上工作,4 在 750、999 上工作,保证你是安全的。 【参考方案1】:

正如其他人在 cmets 中指出的那样,在检查循环条件和使用 idx 的值之间存在竞争条件。这可能会导致您阅读超出数组末尾的内容。您的线程函数只需要稍作调整:

void threadFunction(Object** objects, int n) 
  Object* current = NULL;
  while (true) 
    int next = idx++;
    if (next < nobjects - 1) 
      current = objects[next];
      // do calculation
     else 
      break;
    
  

一般来说,证明无锁算法是正确的很难,只能通过仔细检查代码来完成。无论如何,数组的大小与该算法的正确性无关。

使用标准库

虽然没有特别提出这个问题,但可能值得指出的是,这一切都可以通过标准库来完成(它避免了棘手的安全问题并处理了分区等问题)。类似于以下内容:

void DoCalculations(Object& obj)

  // details...


// later...

std::vector<std::unique_ptr<Object>> objects = CreateObjects();
std::for_each(
  std::execution::par,
  objects.begin(),
  objects.end(),
  [] (std::unique_ptr<Object> const& p)  DoCalculations(*p); );

【讨论】:

FWIW,你可以使用像for(int index = idx++; index &lt; nobjects - 1; index = idx++) current = objects[index]; // do calculation这样的for循环

以上是关于使用 std::atomic<int> 索引对大型数组进行异步并行化操作有多安全的主要内容,如果未能解决你的问题,请参考以下文章

std::atomic<int>:x.fetch_add(1) 和 x++ 之间的区别;

为啥不能交换 std::atomic<T> ?

C++ error: use of deleted function ‘std::atomic<short unsigned int>::atomic(const std::atomic<short

我可以制作一个线程安全的 std::atomic<vector<int>> 吗?

std::atomic_int 出现“使用已删除函数”错误

我可以将 std::atomic<int64> 放在共享内存中并期望原子操作吗?