Lockfree堆栈与原子
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Lockfree堆栈与原子相关的知识,希望对你有一定的参考价值。
我需要构建一个无锁的堆栈实现。我读了this page,我理解列出的无锁推送操作的功能。
现在,我必须构建一个类似版本的pop操作。这是我到目前为止所做的,但我认为,存在一些并发问题:
template <class T>
bool CASStack<T>::pop(T& ret) {
node<T>* old_head = head.load(std::memory_order_relaxed);
if(old_head == nullptr) {
return false;
}
// from here on we can assume that there is an element to pop
node<T>* new_head;
do {
new_head = old_head->next;
} while(!head.compare_exchange_weak(old_head, new_head, std::memory_order_acquire, std::memory_order_relaxed));
ret = old_head->data;
return true;
}
如果我在交换后删除old_head,我想也会遇到麻烦,对吧?
编辑:更新的问题!
你的node<T>* new_head = old_head->next;
是一只红鲱鱼;你从不使用这个变量。
在我的评论中建议你需要把它放在do{}while(!CAS)
循环中,我以为你在做head.CAS(old_head, new_head)
。如果CAS必须重试,这就会产生我正在谈论的问题,即将一个可能过时的指针放入列表中。
但是你实际上正在做head.CAS(old_head, old_head->next)
,它每次通过循环从更新的old_head
生成“所需”值。这实际上是正确的,但很难遵循,所以我建议像这样使用do{}while()
:
node<T>* pop(std::atomic<node<T>*> &head)
{
// We technically need acquire (or consume) loads of head because we dereference it.
node<T>* old_head = head.load(std::memory_order_acquire);
node<T>* new_head;
do {
if(old_head == nullptr) {
// need to re-check because every retry reloads old_head
// pop in another thread might have emptied the list
return nullptr;
}
new_head = old_head->next;
// if head still equals old_head this implies the same relation for new_head
} while(!head.compare_exchange_weak(old_head, new_head,
std::memory_order_acquire));
// Note the ordering change: acquire for both success and failure
return old_head; // defer deletion until some later time
}
是否允许在compare_exchange_weak中执行
old_head->next
?这仍然是原子的吗?
CAS仍然是原子的。任何编译的compare_exchange_weak
本身都是原子的。但是,编译器在函数调用之前计算args,因此读取old_head->next
不是CAS所执行的原子事务的一部分。它已经被单独读成一个临时的。 (使用do{}while
循环中的单独变量显式执行此操作很常见。)
如果node::next
是atomic<>
的node
成员,你应该考虑你想要用于该负载的内存顺序。但是对于纯栈,它不必是原子的,因为链接列表节点在它们在栈上时永远不会被修改,只有在用正确的next
指针推送之前。共享只读访问权限不是竞赛。
用作纯栈还可以减少删除问题:线程无法“窥视”头节点或遍历列表。它们只能在弹出后查看节点内部,并且pop
算法确保它们拥有节点的独占所有权(并负责删除它)。
但是pop()
本身需要从head
节点加载。如果另一个线程与我们竞争并将该head
的内存返回给操作系统,我们可能会出错。所以我们确实有一个删除问题like RCU does,就像我在评论中提到的那样。
在大多数C ++实现中,简单地重复使用内存不会成为问题:我们会读取old_head->next
的垃圾值,但CAS会失败(因为head
指针必须在释放旧的head头对象之前更改)所以我们永远不会对我们加载的伪造价值做任何事情。但它仍然是C ++ UB,因为我们的原子载荷与非原子商店竞争。但是编译器必须证明这种竞争确实发生在允许发出除正常asm以外的任何东西之前,并且所有主流CPU在asm中都没有任何问题。
但除非你能保证free()
或delete
只是把记忆放在一个免费清单上,即他们不在munmap
和head
的负荷之间进行old_head->next
,上述推理并不能使来电者安全立即删除pop
的返回值。它只意味着问题不太可能(并且很难通过简单的测试来检测)。
Memory ordering
我们加载head
然后期望指针指向有用的值。 (即old_head->next
)。这正是memory_order_consume
给我们的。但它很难使用,并且很难优化编译器只是加强它到acquire
,这使得无法测试使用consume
的代码。所以我们真的需要acquire
我们所有负载的head
。
请注意,从我们弹出的节点中获取值也取决于内存排序,但我认为如果我们不需要old_head->next
,我们可以使用放宽,但在CAS的success
方面(我们至少需要consume
,所以在实践中acquire
)。
(在主流的C ++实现中,我们可能会在除了DEC Alpha AXP之外的所有体系结构上使用relaxed
,这是90年代着名的弱排序RISC。编译器几乎肯定会在加载的指针上创建具有数据依赖性的代码,因为它不会有任何其他方法可以访问它需要的值。除了Alpha之外的所有“普通”硬件都免费提供mo_consume
样式依赖序列。所以使用relaxed
进行测试绝不会出现问题,除非你有一个罕见的Alpha模型实际上可以产生这是硬件的重新排序,以及它的一个有效的C ++ 11实现。但它仍然是“错误的”,并且可能会破坏编译时重新排序,或者我可能会遗漏某些东西而且relaxed
实际上可能在实践中突破而没有内联更复杂的东西+不断传播。)
请注意,这些mo_acquire
加载synchronize-with mo_release
存储在推动当前head
指向的对象的线程中。这可以防止old_head
的非原子载荷与非原子存储器竞争到推动它的线程中的节点。
想象一下,在加载old_head和解除引用old_head-> next之间,cpu被一个中断转移,并且很长时间没有回到这个序列(几天,几周等等)。与此同时,一些其他线程已从您的堆栈中弹出“old_head”,对其进行处理,并将其返回到堆中,并可能将其重新用于另一个对象。
它适用于'推送'的原因是'推送代码'拥有要推送的对象。对于'pop'来说并非如此 - pop正在发现该对象,然后试图获得它的所有权。要使用“无锁”,您必须能够同时执行这两项操作;这使链接列表很难,如果不是不可用的话。
相比之下,使用数组你知道'next'是'top - 1',所以:
do {
x = stack[temp = top];
} while (cswap(&top, temp, temp-1) != temp);
很诱人。问题在于,您需要将生成计数编码为top,以便每个“top”的赋值都是唯一的:
struct uuidx { int index; very_large_int sequence; };
extern (volatile, atomic, whatever) struct uuidx top;
...
struct uuidx temp, next;
do {
x = stack[(temp = top).index];
next = (struct uuidx){.index = temp.index - 1,
.sequence = temp.sequence+1};
} while (cswap(&top, temp, next) != temp)
这是我的解决方案:
template <class T>
bool CASStack<T>::pop(T& ret) {
node<T>* new_head;
// get the current head
node<T>* old_head = head.load(std::memory_order_relaxed);
do {
// it is a null pointer iff our stack is empty
if(old_head == nullptr) {
return false;
}
// otherwise, we can dereference it and access its next node
new_head = old_head->next;
} while(!head.compare_exchange_weak(old_head, new_head, std::memory_order_acquire, std::memory_order_relaxed));
// finally write the popped value into ret
ret = old_head->data;
return true;
}
我非常感谢您的评估。我知道这个代码有两个问题:
1)如果另一个线程在head.load
和nullptr
比较之间推送一个元素,我的算法不会弹出它。我不知道如何解决这个问题。
2)在push
操作中,元素是用new
创建的。如果我在delete old_head;
之前添加return true;
,我的代码会崩溃。所以我知道这个算法有内存泄漏。我可以申请this解决方案吗?
以上是关于Lockfree堆栈与原子的主要内容,如果未能解决你的问题,请参考以下文章