如何防止此无锁堆栈函数中的未定义行为和 ABA 问题?

Posted

技术标签:

【中文标题】如何防止此无锁堆栈函数中的未定义行为和 ABA 问题?【英文标题】:How can I prevent undefined behavior and the ABA issue in this lock-free stack function? 【发布时间】:2015-11-03 00:43:24 【问题描述】:

我目前正在处理 C++11 中的无锁单链表,我的 popFront() 函数有问题 - 或者我至少应该说我知道它会在某些情况。

无论如何,这是我目前拥有的:

std::shared_ptr<T> popFront(void)

    auto p = atomic_load(&head);
    while(p && !atomic_compare_exchange_weak(&head, &p, p->next))
    
    return p ? p->data : std::shared_ptr<T>();

请注意,head 的类型为 shared_ptr

但是,我预计会有一些问题。第一种情况是两个线程正在执行popFront(),它们都读取相同的head,一个线程先完成。在第二个线程完成之前,调用者删除了指向的对象,因此第二个线程现在正在使用已删除的内存。第二个问题是经典的 ABA 问题。

这个链表背后的想法是让它是无锁的,所以我想避免在这个函数中强加锁。但不幸的是,我不确定如何解决这些问题。任何建议将不胜感激。

【问题讨论】:

不要修复这个代码,don't fix this code,写更好的代码。你为什么要让自己经历这些?即使你修复了这一切,它的性能也不会比一个简单的锁好,而且它会很脆弱、难以理解、无法修改。 很公平。我想解决此问题的原因有两个:(1) 我想了解如何修改代码以解决问题,以及 (2) 现有实现不包含我需要的功能。跨度> 然后使用锁。它的编写、理解和维护要简单得多。在大多数情况下,它的性能更好。而且您实际上可以确信它是正确的,并且不必永远担心您错过了一些可能失败的奇怪边缘情况。 @David 考虑到这样一个短操作的锁基本上是一个自旋锁(至少在我能想到的所有实现中,内核调用都很昂贵),它的作用与交换,我真的看不出“在大多数情况下”会表现得更好。我同意其他所有内容,但性能声明似乎不太可能。 让我为您搜索一下。 en.wikipedia.org/wiki/Hazard_pointer 是解决无锁结构中 ABA 问题的一种方法。 en.wikipedia.org/wiki/ABA_problem#Workarounds 提到了其他一些人。 【参考方案1】:

有很多解决方案可以设计无 ABA 问题的无锁队列。

article 应该提供一些见解,并且可以在 here 找到一些解决此问题的通用工具。

现在,关于您提到的所描述的问题:

在第二个线程完成之前,调用者删除了指向的对象,所以第二个线程现在正在使用 删除的记忆

是的,这是可能发生的,对此的解决方案是使用tagged pointers:在 32 位架构上,最后 2 (or more) 位未使用,因此它们可用于标记,在 64 位架构上,我们至少有 3 个未使用的位。

因此我们可以将指针设置为逻辑删除,但不能通过设置指针的一些未使用位来物理删除它,如下所示:

__inline struct node* setTag(struct node* p, unsigned long TAG)

    return (struct node*) ((uintptr_t)p | TAG);


__inline bool isTagged(struct node* p, unsigned long TAG)

    return (uintptr_t)p == (uintptr_t)p &  ~TAG;


__inline struct node* getUntaggedAddress(struct node* p, unsigned long TAG)

    return (struct node*)((uintptr_t)p &  ~TAG);

其中 TAG 最多为 4 个(对于 32 位架构),而在 64 位架构上最多为 8 个(2/3 或更多未使用的位,具体取决于计算机架构和字对齐方式)。

现在在进行 CAS 时,我们忽略标记指针 => 因此只对有效指针进行操作。

在队列上进行出列时,我们可以执行如下操作:

int dequeue(qroot* root)

    qnode* oldHead;

    do
    
        oldHead = root->head;

        if (isTagged(root->head)) //disregard tagged addresses
            return NULL;

        oldHead = getUntaggedAddress(root->head); //we do a CAS only if the old head was unchanged

     while (root->head.compare_exchange_strong(oldHead, oldHead->next, std::memory_order_seq_cst));

    return &(oldHead->data);

给定

typedef struct qnode

    std::atomic<qnode*> next;
    int data;
qnode;

typedef struct qroot

    std::atomic<qnode*> head; //Dequeue and peek will be performed from head
    std::atomic<qnode*> tail; //Enqueue will be performed to tail
qroot;

【讨论】:

【参考方案2】:

有助于使线程更容易的一件事是不释放内存。如果您正在使用一堆链表节点,您可能会考虑使用它们的池。您无需释放节点,而是将其返回到池中。这可以解决您的部分问题。

ABA 很简单。每次更改头指针时都需要碰撞计数器。您需要同时用指针原子地写入此计数器。如果使用 32 位寻址,请使用 64 位比较和交换 (CAS),并将计数器存储在额外的 32 位中。如果使用 64 位寻址,请避免 128 位比较和交换,因为它可能很慢(在我们的 Xenon 芯片上,任何高达 64 位的东西都很快)。由于 Windows 和 Linux 都不支持完整的 64 位寻址,因此您可以将部分 64 位用于 ABA。对于 32 位和 64 位寻址模式,我使用联合来执行此操作。

您无需计算每次更改。你只需要抓住每一个变化。即使试图用大量线程尽可能快地改变头部,它仍然很少发生。在现实生活中,它很少很少发生。 IE。你不需要算得很高。我通常使用 4 位并让它滚动。我可能会因此惹上麻烦。如果需要,可以使用更多。

在本例中,我假设为 64 位,并使用 CAS() 进行比较和交换,因此您必须将编译器使用的任何内容替换为 CAS:

typedef unsigned __int64_t U8;

struct TNode 
    TNode* m_pNext;
;

template<class T>
union THead 
    struct 
        U8  m_nABA : 4,  
            m_pNode:60;  // Windows only supports 44 bits addressing anyway.
    ;
    U8  m_n64; // for CAS
    // this constructor will make an atomic copy on intel 
    THead(THead& r)          m_n64 = r.m_n64; 
    T* Node()                return (T*)m_pNode; 
    // changeing Node bumps aba
    void Node(T* p)          m_nABA++; m_pNode = (U8)p; return this; 
;

// pop pNode from head of list.
template<class T>
T* Pop(volatile THead<T>& Head) 
    while (1)  // race loop
        // Get an atomic copy of head and call it old.
        THead<T> Old(Head);
        if (!Old.Node())
            return NULL;
        // Copy old and call it new.
        THead<T> New(Old);
        // change New's Node, which bumps internal aba
        New.Node(Old.Node()->m_pNext);
        // compare and swap New with Head if it still matches Old.
        if (CAS(&Head.m_n64, Old.m_n64, New.m_n64))
            return Old.Node(); // success
        // race, try again
    


// push pNode onto head of list.
template<class T>
void Push(volatile THead<T>& Head, T* pNode) 
    while (1)  // race loop
        // Get an atomic copy of head and call it old.
        // Copy old and call it new.
        THead<T> Old(Head), New(Old);
        // Wire node t Head
        pNode->m_pNext = New.Node();
        // change New's head ptr, which bumps internal aba
        New.Node(pNode);
        // compare and swap New with Head if it still matches Old.
        if (CAS(&Head.m_n64, Old.m_n64, New.m_n64))
            break; // success
        // race, try again
    

【讨论】:

以上是关于如何防止此无锁堆栈函数中的未定义行为和 ABA 问题?的主要内容,如果未能解决你的问题,请参考以下文章

这是C ++中的未定义行为从悬空指针调用函数

简单的无锁堆栈c ++ 11

何时在空实例上调用成员函数会导致 C++11 中的未定义行为? [复制]

无锁编程:lock-free原理;CAS;ABA问题

StructLayout 和 FieldOffset 的未定义行为

怎么防止堆栈溢出