如何防止此无锁堆栈函数中的未定义行为和 ABA 问题?
Posted
技术标签:
【中文标题】如何防止此无锁堆栈函数中的未定义行为和 ABA 问题?【英文标题】:How can I prevent undefined behavior and the ABA issue in this lock-free stack function? 【发布时间】:2015-11-03 00:43:24 【问题描述】:我目前正在处理 C++11 中的无锁单链表,我的 popFront()
函数有问题 - 或者我至少应该说我知道它会在某些情况。
无论如何,这是我目前拥有的:
std::shared_ptr<T> popFront(void)
auto p = atomic_load(&head);
while(p && !atomic_compare_exchange_weak(&head, &p, p->next))
return p ? p->data : std::shared_ptr<T>();
请注意,head
的类型为 shared_ptr
。
但是,我预计会有一些问题。第一种情况是两个线程正在执行popFront()
,它们都读取相同的head
,一个线程先完成。在第二个线程完成之前,调用者删除了指向的对象,因此第二个线程现在正在使用已删除的内存。第二个问题是经典的 ABA 问题。
这个链表背后的想法是让它是无锁的,所以我想避免在这个函数中强加锁。但不幸的是,我不确定如何解决这些问题。任何建议将不胜感激。
【问题讨论】:
不要修复这个代码,don't fix this code,写更好的代码。你为什么要让自己经历这些?即使你修复了这一切,它的性能也不会比一个简单的锁好,而且它会很脆弱、难以理解、无法修改。 很公平。我想解决此问题的原因有两个:(1) 我想了解如何修改代码以解决问题,以及 (2) 现有实现不包含我需要的功能。跨度> 然后使用锁。它的编写、理解和维护要简单得多。在大多数情况下,它的性能更好。而且您实际上可以确信它是正确的,并且不必永远担心您错过了一些可能失败的奇怪边缘情况。 @David 考虑到这样一个短操作的锁基本上是一个自旋锁(至少在我能想到的所有实现中,内核调用都很昂贵),它的作用与交换,我真的看不出“在大多数情况下”会表现得更好。我同意其他所有内容,但性能声明似乎不太可能。 让我为您搜索一下。 en.wikipedia.org/wiki/Hazard_pointer 是解决无锁结构中 ABA 问题的一种方法。 en.wikipedia.org/wiki/ABA_problem#Workarounds 提到了其他一些人。 【参考方案1】:有很多解决方案可以设计无 ABA 问题的无锁队列。
article 应该提供一些见解,并且可以在 here 找到一些解决此问题的通用工具。
现在,关于您提到的所描述的问题:
在第二个线程完成之前,调用者删除了指向的对象,所以第二个线程现在正在使用 删除的记忆
是的,这是可能发生的,对此的解决方案是使用tagged pointers:在 32 位架构上,最后 2 (or more) 位未使用,因此它们可用于标记,在 64 位架构上,我们至少有 3 个未使用的位。
因此我们可以将指针设置为逻辑删除,但不能通过设置指针的一些未使用位来物理删除它,如下所示:
__inline struct node* setTag(struct node* p, unsigned long TAG)
return (struct node*) ((uintptr_t)p | TAG);
__inline bool isTagged(struct node* p, unsigned long TAG)
return (uintptr_t)p == (uintptr_t)p & ~TAG;
__inline struct node* getUntaggedAddress(struct node* p, unsigned long TAG)
return (struct node*)((uintptr_t)p & ~TAG);
其中 TAG 最多为 4 个(对于 32 位架构),而在 64 位架构上最多为 8 个(2/3 或更多未使用的位,具体取决于计算机架构和字对齐方式)。
现在在进行 CAS 时,我们忽略标记指针 => 因此只对有效指针进行操作。
在队列上进行出列时,我们可以执行如下操作:
int dequeue(qroot* root)
qnode* oldHead;
do
oldHead = root->head;
if (isTagged(root->head)) //disregard tagged addresses
return NULL;
oldHead = getUntaggedAddress(root->head); //we do a CAS only if the old head was unchanged
while (root->head.compare_exchange_strong(oldHead, oldHead->next, std::memory_order_seq_cst));
return &(oldHead->data);
给定
typedef struct qnode
std::atomic<qnode*> next;
int data;
qnode;
typedef struct qroot
std::atomic<qnode*> head; //Dequeue and peek will be performed from head
std::atomic<qnode*> tail; //Enqueue will be performed to tail
qroot;
【讨论】:
【参考方案2】:有助于使线程更容易的一件事是不释放内存。如果您正在使用一堆链表节点,您可能会考虑使用它们的池。您无需释放节点,而是将其返回到池中。这可以解决您的部分问题。
ABA 很简单。每次更改头指针时都需要碰撞计数器。您需要同时用指针原子地写入此计数器。如果使用 32 位寻址,请使用 64 位比较和交换 (CAS),并将计数器存储在额外的 32 位中。如果使用 64 位寻址,请避免 128 位比较和交换,因为它可能很慢(在我们的 Xenon 芯片上,任何高达 64 位的东西都很快)。由于 Windows 和 Linux 都不支持完整的 64 位寻址,因此您可以将部分 64 位用于 ABA。对于 32 位和 64 位寻址模式,我使用联合来执行此操作。
您无需计算每次更改。你只需要抓住每一个变化。即使试图用大量线程尽可能快地改变头部,它仍然很少发生。在现实生活中,它很少很少发生。 IE。你不需要算得很高。我通常使用 4 位并让它滚动。我可能会因此惹上麻烦。如果需要,可以使用更多。
在本例中,我假设为 64 位,并使用 CAS() 进行比较和交换,因此您必须将编译器使用的任何内容替换为 CAS:
typedef unsigned __int64_t U8;
struct TNode
TNode* m_pNext;
;
template<class T>
union THead
struct
U8 m_nABA : 4,
m_pNode:60; // Windows only supports 44 bits addressing anyway.
;
U8 m_n64; // for CAS
// this constructor will make an atomic copy on intel
THead(THead& r) m_n64 = r.m_n64;
T* Node() return (T*)m_pNode;
// changeing Node bumps aba
void Node(T* p) m_nABA++; m_pNode = (U8)p; return this;
;
// pop pNode from head of list.
template<class T>
T* Pop(volatile THead<T>& Head)
while (1) // race loop
// Get an atomic copy of head and call it old.
THead<T> Old(Head);
if (!Old.Node())
return NULL;
// Copy old and call it new.
THead<T> New(Old);
// change New's Node, which bumps internal aba
New.Node(Old.Node()->m_pNext);
// compare and swap New with Head if it still matches Old.
if (CAS(&Head.m_n64, Old.m_n64, New.m_n64))
return Old.Node(); // success
// race, try again
// push pNode onto head of list.
template<class T>
void Push(volatile THead<T>& Head, T* pNode)
while (1) // race loop
// Get an atomic copy of head and call it old.
// Copy old and call it new.
THead<T> Old(Head), New(Old);
// Wire node t Head
pNode->m_pNext = New.Node();
// change New's head ptr, which bumps internal aba
New.Node(pNode);
// compare and swap New with Head if it still matches Old.
if (CAS(&Head.m_n64, Old.m_n64, New.m_n64))
break; // success
// race, try again
【讨论】:
以上是关于如何防止此无锁堆栈函数中的未定义行为和 ABA 问题?的主要内容,如果未能解决你的问题,请参考以下文章
何时在空实例上调用成员函数会导致 C++11 中的未定义行为? [复制]