无锁slist擦除
Posted
技术标签:
【中文标题】无锁slist擦除【英文标题】:Lock-free slist erase 【发布时间】:2019-06-20 15:40:42 【问题描述】:我一直在尝试实现无锁 slist 擦除操作,但我显然遇到了问题。不幸的是我真的很需要它。
为了解决与 ABA cmpxchg 相关的常见问题,我编写了 tagged_ptr“智能指针”类,它嵌入了一个计数器,指向存储在 std::atomic 中的指针。每次通过列表中的 CAS 更新指针时,标记值都会递增:
head.compare_exchange_weak(old, old(newptr))
存储 newptr
并从 old
增加标签。
这允许多写入器事务,但它不能解决同时更新两个指针的问题。 (例如,使用 tagged_ptr 很容易实现堆栈)
见代码here。 第 256 行是 erase() 函数:
bool erase(list_node * node)
std::atomic<tagged_ptr<list_node>>* before;
tagged_ptr<list_node> itr, after;
for(;;)
// Find previous (or head) before-node-ptr
before = &head;
itr = before->load(std::memory_order_acquire);
while(itr)
if(itr.get() == node)
break;
else if(itr.is_void())
// Thread interfered iteration.
before = &head;
itr = before->load(std::memory_order_acquire);
else
// Access next ptr
before = &itr->next;
itr = before->load(std::memory_order_acquire);
after = node->next.load(std::memory_order_acquire);
if(after.is_void() || !itr)
return false;
// Point before-ptr to after. (set head or previous node's next ptr)
if(before->compare_exchange_strong(itr, itr(after)))
// Set node->next to invalid ptr.
// list iterators will see it and restart their operation.
while(!node->next.compare_exchange_weak(after, after().set_void()))
;
return true;
// If *before changed while trying to update it to after, retry search.
在测试代码中,两个线程同时将节点推入列表,两个线程通过数据搜索随机节点并尝试擦除它们。 我遇到的错误是:
列表以某种方式变为循环(列表以空值终止),因此线程永远卡住了,迭代列表永远找不到列表的结尾。【问题讨论】:
不确定我是否正确理解了tagged_ptr
代码。假设您使用的是 32 位 CPU,那么 tag_type
是 uint32
,因此 ptr
和 tag
在内存中的同一位置重叠(它们是相同的,因为填充是 0 字节)。跨度>
如果你在 64 位 CPU 上,那么tag_type
是uint16
,ptr
的低 16 位用于存储tag
。 (这对我来说似乎是错误的)。据我记得,在 32 位机器上使用低 4 位应该是安全的,在 64 位机器上使用高 16 位(不使用从 48 到 64)
你有一些代码来测试你的tagged_ptr
实现吗?
请查看this
我已经测试了 tagged_ptr 类并且它运行正常。在 x86_64 上有一个技巧,我可以使用 cmpxchg8b 而不是 cmpxchg16b(在具有 48 位虚拟地址空间的系统上)逃脱,因为不使用高 16 位地址。 (我知道这有点不稳定!)我已经成功地使用这个类在 RPi2 上运行了一个无锁堆栈。如果 sizeof(void*) == 4 那么 sizeof(tagged_ptr我对您的tagged_ptr
实施有些怀疑。
另外,我对这部分代码有一些疑问:
else if(itr.is_void())
// Thread interfered iteration.
before = &head;
itr = before->load(std::memory_order_acquire);
假设一个线程删除了最后一个节点(您在列表中有 1 个节点并且两个线程都调用了擦除)。剩下的线程将查询头指针,它是无效的。这部分代码将进入无限循环,因为它位于while(itr)
循环中。
这部分也不是原子的:
// Point before-ptr to after. (set head or previous node's next ptr)
if(before->compare_exchange_strong(itr, itr(after)))
// Set node->next to invalid ptr.
// list iterators will see it and restart their operation.
while(!node->next.compare_exchange_weak(after, after().set_void()))
;
return true;
如果before
被第一个CAS 修改,你的node
是一个未附加的指针,仍然指向列表。另一个线程可以将其before
设置为此node
并对其进行修改并返回。
老实说,如果您的列表是循环的,那么调试并不难,只需在调试器下中断并遵循列表即可。你会看到它何时循环,他们可以弄清楚它是如何做到的。您也可以为此使用 valgrind。
tagged_ptr
类很难掌握,使用“set_void()”方法将内部ptr
设置为0xFF..F
,但如果它是“void”,则 while(itr) 中的布尔测试将返回 true。我猜这个名字应该是 invalid 而不是 void 并且它应该在 bool 运算符中返回 false 如果它是(不是 true)。如果 itr 变为“无效”(据我了解,在上面的代码中是可能的)while(itr)
将无限循环。
例如,假设你有:
Head:A -> B -> C
然后在删除一些线程后,您会得到
Thread 2 removing C : Head:A, before = &B on first iteration, exiting the while(itr) loop since itr == C (scheduled here)
Thread 1 removing B : Head:A->C and B->C (scheduled just before line 286 of your example)
Thread 2 resume, and will modify B to B->null (line 283)
and then C->null to C->yourVoid (line 286, then it's scheduled)
Then, thread 1 update B->next to B->yourVoid (useless here for understanding the issue)
You now have A->C->yourVoid
无论何时在这里迭代,你都会有一个无限循环,因为当 itr 搜索到达 C 时,下一步就是成为一个“void”并且从 head 重新开始迭代在这里不会解决任何问题,它会给出相同的结果结果,列表被破坏了。
【讨论】:
好点,但是while(itr)
应该在这种情况下比较 false,因为 head 永远不应该包含 void ptr。 (head.load()
要么有效要么为空,永远不会无效)请注意,before
ptr 在代码中要么是&head
要么是&node->next
,因此如果erase() 删除最后一个节点,cmpxchg 会将空存储到head
来自node->next
问题是发生了一些错误,列表变为循环,没有nullptr终止列表,线程卡在find()中。
是的,我承认 tagged_ptr::set_void() 的命名有点混乱,它的真正意思是“无效”。感谢您分析代码并提供我最初拥有的更结构化的视图。 :)以上是关于无锁slist擦除的主要内容,如果未能解决你的问题,请参考以下文章