Lockfree堆栈与原子

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Lockfree堆栈与原子相关的知识,希望对你有一定的参考价值。

我需要构建一个无锁的堆栈实现。我读了this page,我理解列出的无锁推送操作的功能。

现在,我必须构建一个类似版本的pop操作。这是我到目前为止所做的,但我认为,存在一些并发问题:

template <class T>
bool CASStack<T>::pop(T& ret) {
node<T>* old_head = head.load(std::memory_order_relaxed);

if(old_head == nullptr) {
    return false;
}

// from here on we can assume that there is an element to pop
node<T>* new_head;

do {
    new_head = old_head->next;
} while(!head.compare_exchange_weak(old_head, new_head, std::memory_order_acquire, std::memory_order_relaxed));

ret = old_head->data;

return true;
}

如果我在交换后删除old_head,我想也会遇到麻烦,对吧?

编辑:更新的问题!

答案

你的node<T>* new_head = old_head->next;是一只红鲱鱼;你从不使用这个变量。

在我的评论中建议你需要把它放在do{}while(!CAS)循环中,我以为你在做head.CAS(old_head, new_head)。如果CAS必须重试,这就会产生我正在谈论的问题,即将一个可能过时的指针放入列表中。

但是你实际上正在做head.CAS(old_head, old_head->next),它每次通过循环从更新的old_head生成“所需”值。这实际上是正确的,但很难遵循,所以我建议像这样使用do{}while()

node<T>* pop(std::atomic<node<T>*> &head)
{
    // We technically need acquire (or consume) loads of head because we dereference it.
    node<T>* old_head = head.load(std::memory_order_acquire);

    node<T>* new_head;
    do {
        if(old_head == nullptr) {
           // need to re-check because every retry reloads old_head
           // pop in another thread might have emptied the list
            return nullptr;
        }

        new_head = old_head->next;
        // if head still equals old_head this implies the same relation for new_head
    } while(!head.compare_exchange_weak(old_head, new_head,
                                        std::memory_order_acquire));
    // Note the ordering change: acquire for both success and failure

    return old_head;  // defer deletion until some later time
}

是否允许在compare_exchange_weak中执行old_head->next?这仍然是原子的吗?

CAS仍然是原子的。任何编译的compare_exchange_weak本身都是原子的。但是,编译器在函数调用之前计算args,因此读取old_head->next不是CAS所执行的原子事务的一部分。它已经被单独读成一个临时的。 (使用do{}while循环中的单独变量显式执行此操作很常见。)

如果node::nextatomic<>node成员,你应该考虑你想要用于该负载的内存顺序。但是对于纯栈,它不必是原子的,因为链接列表节点在它们在栈上时永远不会被修改,只有在用正确的next指针推送之前。共享只读访问权限不是竞赛。


用作纯栈还可以减少删除问题:线程无法“窥视”头节点或遍历列表。它们只能在弹出后查看节点内部,并且pop算法确保它们拥有节点的独占所有权(并负责删除它)。

但是pop()本身需要从head节点加载。如果另一个线程与我们竞争并将该head的内存返回给操作系统,我们可能会出错。所以我们确实有一个删除问题like RCU does,就像我在评论中提到的那样。

在大多数C ++实现中,简单地重复使用内存不会成为问题:我们会读取old_head->next的垃圾值,但CAS会失败(因为head指针必须在释放旧的head头对象之前更改)所以我们永远不会对我们加载的伪造价值做任何事情。但它仍然是C ++ UB,因为我们的原子载荷与非原子商店竞争。但是编译器必须证明这种竞争确实发生在允许发出除正常asm以外的任何东西之前,并且所有主流CPU在asm中都没有任何问题。

但除非你能保证free()delete只是把记忆放在一个免费清单上,即他们不在munmaphead的负荷之间进行old_head->next,上述推理并不能使来电者安全立即删除pop的返回值。它只意味着问题不太可能(并且很难通过简单的测试来检测)。


Memory ordering

我们加载head然后期望指针指向有用的值。 (即old_head->next)。这正是memory_order_consume给我们的。但它很难使用,并且很难优化编译器只是加强它到acquire,这使得无法测试使用consume的代码。所以我们真的需要acquire我们所有负载的head

请注意,从我们弹出的节点中获取值也取决于内存排序,但我认为如果我们不需要old_head->next,我们可以使用放宽,但在CAS的success方面(我们至少需要consume,所以在实践中acquire)。

(在主流的C ++实现中,我们可能会在除了DEC Alpha AXP之外的所有体系结构上使用relaxed,这是90年代着名的弱排序RISC。编译器几乎肯定会在加载的指针上创建具有数据依赖性的代码,因为它不会有任何其他方法可以访问它需要的值。除了Alpha之外的所有“普通”硬件都免费提供mo_consume样式依赖序列。所以使用relaxed进行测试绝不会出现问题,除非你有一个罕见的Alpha模型实际上可以产生这是硬件的重新排序,以及它的一个有效的C ++ 11实现。但它仍然是“错误的”,并且可能会破坏编译时重新排序,或者我可能会遗漏某些东西而且relaxed实际上可能在实践中突破而没有内联更复杂的东西+不断传播。)

请注意,这些mo_acquire加载synchronize-with mo_release存储在推动当前head指向的对象的线程中。这可以防止old_head的非原子载荷与非原子存储器竞争到推动它的线程中的节点。

另一答案

想象一下,在加载old_head和解除引用old_head-> next之间,cpu被一个中断转移,并且很长时间没有回到这个序列(几天,几周等等)。与此同时,一些其他线程已从您的堆栈中弹出“old_head”,对其进行处理,并将其返回到堆中,并可能将其重新用于另一个对象。

它适用于'推送'的原因是'推送代码'拥有要推送的对象。对于'pop'来说并非如此 - pop正在发现该对象,然后试图获得它的所有权。要使用“无锁”,您必须能够同时执行这两项操作;这使链接列表很难,如果不是不可用的话。

相比之下,使用数组你知道'next'是'top - 1',所以:

do {
   x = stack[temp = top];
} while (cswap(&top, temp, temp-1) != temp);

很诱人。问题在于,您需要将生成计数编码为top,以便每个“top”的赋值都是唯一的:

struct uuidx { int index; very_large_int sequence; };
extern (volatile, atomic, whatever) struct uuidx top;

...
struct uuidx temp, next;
do {
    x = stack[(temp = top).index];
    next = (struct uuidx){.index = temp.index - 1,
                 .sequence = temp.sequence+1};
} while (cswap(&top, temp, next) != temp)
另一答案

这是我的解决方案:

template <class T>
bool CASStack<T>::pop(T& ret) {
    node<T>* new_head;

    // get the current head
    node<T>* old_head = head.load(std::memory_order_relaxed);

    do {
        // it is a null pointer iff our stack is empty
        if(old_head == nullptr) {
            return false;
        }

        // otherwise, we can dereference it and access its next node
        new_head = old_head->next;
    } while(!head.compare_exchange_weak(old_head, new_head, std::memory_order_acquire, std::memory_order_relaxed));

    // finally write the popped value into ret
    ret = old_head->data;
    return true;
}

我非常感谢您的评估。我知道这个代码有两个问题:

1)如果另一个线程在head.loadnullptr比较之间推送一个元素,我的算法不会弹出它。我不知道如何解决这个问题。

2)在push操作中,元素是用new创建的。如果我在delete old_head;之前添加return true;,我的代码会崩溃。所以我知道这个算法有内存泄漏。我可以申请this解决方案吗?

以上是关于Lockfree堆栈与原子的主要内容,如果未能解决你的问题,请参考以下文章

C# Alloc Free编程

代码适用于与单个 html 文件不同的堆栈片段

多线程编程之原子操作

多线程编程之原子操作

原子操作实现无锁队列

替换或删除后台堆栈上现有片段的代码不起作用