无锁并发栈实现

Posted

技术标签:

【中文标题】无锁并发栈实现【英文标题】:Lock-free concurrent stack implementation 【发布时间】:2018-11-09 10:54:38 【问题描述】:

我一直在玩弄 Java 中无锁堆栈的简单实现。

编辑:请参阅下面的固定/工作版本


您是否发现此实现有任何问题?

本地语言中的类似实现似乎受到the ABA problem 的影响,但我不确定这是否是一个问题;显然没有直接在 Java 中完成指针处理,并且考虑到我关心的是堆栈的结尾,无论是 pop 还是 push,我看不出堆栈的任何非尾元素中的“丢失”任何更改会如何导致问题.

public class LockFreeStack<T extends LockFreeStack.StackItem<T>>

    public abstract static class StackItem<SELF extends StackItem<SELF>>
    
        volatile SELF next;
        // .. data ..
    

    final AtomicReference<T> top = new AtomicReference<T>(null);

    public void push(T item)
    
        T localTop;

        do 
            localTop = top.get();
            item.next = localTop;
         while(!top.compareAndSet(localTop, item));
    

    public T pop()
    
        T localTop;

        do 
            localTop = top.get();
         while(localTop != null && !top.compareAndSet(localTop, localTop.next));

        return localTop;
    

但是,这就是我不明白的。我编写了一个启动几个线程的简单测试;每个人都从预先存在的 LockFreeStack 中弹出项目,然后(后来,从弹出它的同一线程中)将它们推回。 在它弹出之后,我增加一个原子计数器,然后在推回它之前,我减少它。所以我总是希望计数器为 0(在递减之后/在推回堆栈之前)或 1(在弹出和递增之后)。

但是,事实并非如此......

public class QueueTest 
    static class TestStackItem extends LockFreeStack.StackItem<TestStackItem>
    
        final AtomicInteger usageCount = new AtomicInteger(0);

        public void inc() throws Exception
        
            int c = usageCount.incrementAndGet();

            if(c != 1)
                throw new Exception(String.format("Usage count is %d; expected %d", c, 1));
        

        public void dec() throws Exception
        
            int c = usageCount.decrementAndGet();

            if(c != 0)
                throw new Exception(String.format("Usage count is %d; expected %d", c, 0));
        
    

    public final LockFreeStack<TestStackItem> testStack = new LockFreeStack<TestStackItem>();

    public void test()
    
        final int NUM_THREADS = 4;

        for(int i = 0; i < 10; i++)
        
            TestStackItem item = new TestStackItem();
            testStack.push(item);
        

        Thread[] threads = new Thread[NUM_THREADS];
        for(int i = 0; i < NUM_THREADS; i++)
        
            threads[i] = new Thread(new TestRunner());
            threads[i].setDaemon(true);
            threads[i].setName("Thread"+i);
            threads[i].start();
        

        while(true)
        
            Thread.yield();
        

    

    class TestRunner implements  Runnable
    
        @Override
        public void run() 
            try 
                boolean pop = false;
                TestStackItem lastItem = null;
                while (true) 
                    pop = !pop;

                    if (pop) 
                        TestStackItem item = testStack.pop();
                        item.inc();
                        lastItem = item;
                     else 
                        lastItem.dec();
                        testStack.push(lastItem);
                        lastItem = null;
                    
                
             catch (Exception ex)
            
                System.out.println("exception: " + ex.toString());
            
        
    

抛出不确定的异常,例如

exception: java.lang.Exception: Usage count is 1; expected 0
exception: java.lang.Exception: Usage count is 2; expected 1

或来自另一个运行

exception: java.lang.Exception: Usage count is 2; expected 0
exception: java.lang.Exception: Usage count is 3; expected 1
exception: java.lang.Exception: Usage count is 3; expected 1
exception: java.lang.Exception: Usage count is 2; expected 1

所以这里肯定会出现一些类似竞争条件的问题。

这里出了什么问题 - 这确实与 ABA 相关(如果是,那么究竟如何?)还是我错过了其他任何东西?

谢谢!


注意:这可行,但似乎不是一个很好的解决方案。它既不是无垃圾的(StampedAtomicReference 在内部创建对象),也不是无锁的好处似乎真的有回报;在我的基准测试中,这在单线程环境中并没有真正更快,并且在同时使用 6 个线程进行测试时,它明显落后于仅在 push/pop 函数周围加锁

根据下面建议的解决方案,这确实是一个 ABA 问题,这个小改动将规避:

public class LockFreeStack<T extends LockFreeStack.StackItem<T>>

    public abstract static class StackItem<SELF extends StackItem<SELF>>
    
        volatile SELF next;
        // .. data ..
    

    private final AtomicStampedReference<T> top = new AtomicStampedReference<T>(null, 0);

    public void push(T item)
    
        int[] stampHolder = new int[1];

        T localTop;

        do 
            localTop = top.get(stampHolder);
            item.next = localTop;
         while(!top.compareAndSet(localTop, item, stampHolder[0], stampHolder[0]+1));
    

    public T pop()
    
        T localTop;
        int[] stampHolder = new int[1];

        do 
            localTop = top.get(stampHolder);
         while(localTop != null && !top.compareAndSet(localTop, localTop.next, stampHolder[0], stampHolder[0]+1));

        return localTop;
    

【问题讨论】:

@Thilo en.wikipedia.org/wiki/ABA_problem 或cs.cmu.edu/~410-s05/lectures/L31_LockFree.pdf 中描述的无锁堆栈 您是否尝试过使用带有常规 LinkedList 的 sycnh 部分?我认为您在检查 i % 2 == 1 时遇到了这个奇怪的循环中的错误 我可以建议codereview.stackexchange.com 作为这个问题的更好网站吗? @JoãoMendes 谢谢-不太确定在哪里发布最好,因为我已经知道实现(或测试)中存在错误,问题是为什么而不是 if ;-)但如果那是更好的地方,肯定会在那里尝试一下! @Bogey 好吧,如果您正在寻找特定错误的帮助,那么这里可能会更好。我以为你需要更多的全球帮助。我抓住了“你看到任何问题”这一行,并且可能误读了问题的其余部分...... :) 【参考方案1】:

在你的测试中你真的不需要这个带有“if condition”和“lastItem”的奇怪循环,你可以通过简单地弹出和推送相同的节点来重现错误。

要解决上述问题,您可以在将其推入堆栈时创建新的 TestStackItem(并将现有计数器传递给新创建的节点),或者您可以使用 AtomicStampedReference 来查看节点是否已被修改。

【讨论】:

谢谢 - 我正在努力避免产生任何垃圾(老实说,出于好奇,这更像是一种理论练习),因此积极尝试避免创建包装节点 - 虽然会研究 AtomicStampedReference ,这听起来很有希望 使用标记引用就像一个魅力,将更新原始主题以供将来参考【参考方案2】:

是的,您的堆栈存在 ABA 问题。

线程 A pop 执行 localTop = top.get() 并读取 localTop.next

其他线程弹出一堆东西并以不同的顺序放回去,但线程 A 的 localTop 仍然是最后一个推送的。

线程 A 的 CAS 成功,但它破坏了堆栈,因为它从 localTop.next 读取的值不再准确。

无锁数据结构在像 Java 这样的垃圾收集语言中比在其他语言中实现起来要容易得多。如果 push() 每次都分配一个新的堆栈项,您的 ABA 问题就会消失。然后StackItem.next 可以是最终的,整个事情变得更容易推理。

【讨论】:

Ahh .. 我的错误是认为使 StackItem.next 易失性意味着即使同时被另一个交换操作更改,它的当前值也会反映 - 但当然不是这样,因为它只读一次进入CAS循环时,确实可能在CAS成功时已经过时。谢谢!这确实解释了它!

以上是关于无锁并发栈实现的主要内容,如果未能解决你的问题,请参考以下文章

操作手册并发中使用拆分引用计数实现无锁堆栈

无锁机制实现并发访问

JUC并发编程 -- 保护共享资源(加锁实现 & 无锁实现)

Java 高并发四:无锁的实际应用

如何实现超高并发的无锁缓存?

Linux(程序设计):24---无锁CAS(附无锁队列的实现)