在 C++11 中以无锁方式原子交换两个 std::atomic<T*> 对象?
Posted
技术标签:
【中文标题】在 C++11 中以无锁方式原子交换两个 std::atomic<T*> 对象?【英文标题】:Atomic exchange of two std::atomic<T*> objects in a lock-free manner in C++11? 【发布时间】:2013-08-21 05:47:27 【问题描述】:以下代码是原子指针类的骨架,取自PARSEC benchmark suite for shared-memory multiprocessors 中的模拟退火应用程序。
在该应用程序中,中心数据结构是图(更具体地说,是集成电路的网表)。图中的每个节点都有一个属性来指示其物理位置。该算法产生许多线程,每个线程重复随机选择两个节点并交换它们的物理位置,如果这会为芯片带来更好的路由成本。
由于图很大,每个线程都可以选择任意一对节点,唯一可行的解决方案是无锁并发数据结构(CDS)。这就是为什么下面的AtomicPtr
类至关重要(它用于以无锁方式原子地交换指向两个物理位置对象的指针)。
函数atomic_load_acq_ptr()
是在汇编代码中定义的,与std::atomic<T*>::load(memory_order_acquire)
密切对应。
我想使用 C++11 原子来实现该 CDS。
template <typename T>
class AtomicPtr
private:
typedef long unsigned int ATOMIC_TYPE;
T *p __attribute__ ((aligned (8)));
static const T *ATOMIC_NULL;
inline T *Get() const
T *val;
do
val = (T *)atomic_load_acq_ptr((ATOMIC_TYPE *)&p);
while(val == ATOMIC_NULL);
return val;
inline void Swap(AtomicPtr<T> &X)
// Define partial order in which to acquire elements to prevent deadlocks
AtomicPtr<T> *first;
AtomicPtr<T> *last;
// Always process elements from lower to higher memory addresses
if (this < &X)
first = this;
last = &X;
else
first = &X;
last = this;
// Acquire and update elements in correct order
T *valFirst = first->Checkout(); // This sets p to ATOMIC_NULL so all Get() calls will spin.
T *valLast = last->PrivateSet(valFirst);
first->Checkin(valLast); // This restores p to valLast
;
std::atomic<T*>::exchange()
方法只能用于将裸露的T*
指针与std::atomic<T*>
对象交换。如何无锁交换两个std::atomic<T*>
对象?
我能想到的是,下面的AtomicPtr
类本身可以基于std::atomic<T*>
声明:
std::atomic<T*> p;
并将所有atomic_load_acq_ptr()
调用替换为std::atomic<T*>::load(memory_order_acquire)
,并将所有atomic_store_rel_ptr()
调用替换为std::atomic<T*>::store(memory_order_release)
。但我的第一个想法是 std::atomic<T*>
应该替换 AtomicPtr
本身,并且可能有一个聪明的方法可以直接交换 std::atomic<T*>
对象。有什么想法吗?
【问题讨论】:
在 C++11 中没有办法原子交换两个std::atomic
s 的内容。
实际上,我认为这在 x86 / x64 上是不可能的
不可能直接。但是AtomicPtr
类已经通过遵循签出\签入规则来做到这一点: 1- 任何想要进行交换的线程首先检查指针(通过编写上面称为ATOMIC_NULL
的标记值)并且,完成后,将其签入。 2- 如果指针已被签出,任何想要读取(即 Get())指针值的线程都必须继续旋转。
@AhmedNassar:为什么不用std::atomic<T*>
复制这个逻辑?
@Matthieu:这是我已经建议的唯一解决方案。我正在寻找更好的解决方案:)
【参考方案1】:
在我看来,得到你想要的更简单的方法是复制你在这里看到的逻辑。
问题是不可能跨两个原子对象进行原子操作,所以你必须遵循一个过程:
对原子排序(以避免死锁) “锁定”除最后一个以外的所有内容(递增顺序) 对最后一个原子执行操作 执行操作并一次“解锁”其他一个(递减顺序)当然,这是非常不完美的:
非原子:当您忙于锁定变量时,任何尚未锁定的变量都可能改变状态 不是无阻塞:如果由于某种原因线程在锁定变量时被阻塞,则所有其他未决线程也被阻塞;小心避免这里的死锁(如果你有其他锁) 脆弱:锁定变量后的崩溃会让您陷入困境,避免可能抛出和/或使用 RAII 来“锁定”的操作但是,在只有 2 个对象(因此要锁定一个)的情况下,它在实践中应该工作得相对较好。
最后,我想说两句:
为了锁定,您需要能够定义一个标记值,0x01
通常适用于指针。
C++ 标准不保证std::atomic<T*>
是无锁的,您可以使用std::atomic<T*>::is_lock_free()
检查您的特定实现和平台。
【讨论】:
正如我在问题中所建议的那样,移植AtomicPtr
类以使用 C++11 原子而不是自定义的程序集级函数是我唯一的解决方案。 AtomicPtr
类实际上是通过在指针持有标记值时自旋来制作自己的自旋锁。但是非常感谢您详细说明建议解决方案的缺陷。我将它用作基准,而不是在生产代码中。顺便说一句,原始代码已经完全按照您的建议定义了标记值:template<typename T> const T *AtomicPtr<T>::ATOMIC_NULL((T *)((int)NULL + 1));
@AhmedNassar:出于便携性原因,我建议您将NULL
转换为intptr_t
而不是int
; int
在 64 位代码中通常只有 32 位。【参考方案2】:
没有自旋锁的最接近的方法是:
std::atomic<T> a;
std::atomic<T> b;
a = b.exchange(a);
这对于b
是线程安全的。
a
不能同时访问。
【讨论】:
这不是原子的,因为std::atomic<T>::exchange()
接受一个裸露的T
值而不是std::atomic<T>
对象。因此,它会首先调用a
的转换函数到T
,然后将其传递给b.exchange()
。
我从来没有说过它对a
是原子的,它只是对b
的原子。在没有自旋锁的情况下,这是你能做到的最好的。【参考方案3】:
您是否检查过 CAS(比较和交换)操作?
std::atomic<T*> v;
while(!v.compare_exchange_weak(old_value,new_value, std::memory_order_release, memory_order_relaxed))
【讨论】:
但是std::atomic<T*>::compare_exchange_weak()
中的new_value
参数是一个纯T*
而不是std::atomic<T*>
对象。如果之前从std::atomic<T*>
对象中读取了那个裸露的T*
值,则交换不会是原子的。我在这里错过了什么吗?以上是关于在 C++11 中以无锁方式原子交换两个 std::atomic<T*> 对象?的主要内容,如果未能解决你的问题,请参考以下文章