Java:两个 WAITING + 一个 BLOCKED 线程, notify() 导致活锁, notifyAll() 没有,为啥?

Posted

技术标签:

【中文标题】Java:两个 WAITING + 一个 BLOCKED 线程, notify() 导致活锁, notifyAll() 没有,为啥?【英文标题】:Java: two WAITING + one BLOCKED threads, notify() leads to a livelock, notifyAll() doesn't, why?Java:两个 WAITING + 一个 BLOCKED 线程, notify() 导致活锁, notifyAll() 没有,为什么? 【发布时间】:2015-01-14 14:52:44 【问题描述】:

当我偶然发现一些我不理解的行为时,我试图使用 Java 同步“原语”(同步、wait()、notify())来实现类似于 Java 的有界 BlockingQueue 接口。

我创建了一个能够存储 1 个元素的队列,创建了两个等待从队列中获取值的线程,启动它们,然后尝试在主线程的同步块中将两个值放入队列中。大多数情况下它都可以工作,但有时等待值的两个线程似乎开始互相唤醒,并且不让主线程进入同步块。

这是我的(简化的)代码:

import java.util.LinkedList;
import java.util.Queue;

public class LivelockDemo 
    private static final int MANY_RUNS = 10000;

    public static void main(String[] args) throws InterruptedException 
        for (int i = 0; i < MANY_RUNS; i++)  // to increase the probability
            final MyBoundedBlockingQueue ctr = new MyBoundedBlockingQueue(1);

            Thread t1 = createObserver(ctr, i + ":1");
            Thread t2 = createObserver(ctr, i + ":2");

            t1.start();
            t2.start();

            System.out.println(i + ":0 ready to enter synchronized block");
            synchronized (ctr) 
                System.out.println(i + ":0 entered synchronized block");
                ctr.addWhenHasSpace("hello");
                ctr.addWhenHasSpace("world");
            

            t1.join();
            t2.join();

            System.out.println();
        
    

    public static class MyBoundedBlockingQueue 
        private Queue<Object> lst = new LinkedList<Object>();;

        private int limit;

        private MyBoundedBlockingQueue(int limit) 
            this.limit = limit;
        

        public synchronized void addWhenHasSpace(Object obj) throws InterruptedException 
            boolean printed = false;
            while (lst.size() >= limit) 
                printed = __heartbeat(':', printed);
                notify();
                wait();
            
            lst.offer(obj);
            notify();
        

        // waits until something has been set and then returns it
        public synchronized Object getWhenNotEmpty() throws InterruptedException 
            boolean printed = false;
            while (lst.isEmpty()) 
                printed = __heartbeat('.', printed); // show progress
                notify();
                wait();
            
            Object result = lst.poll();
            notify();
            return result;
        

        // just to show progress of waiting threads in a reasonable manner
        private static boolean __heartbeat(char c, boolean printed) 
            long now = System.currentTimeMillis();
            if (now % 1000 == 0) 
                System.out.print(c);
                printed = true;
             else if (printed) 
                System.out.println();
                printed = false;
            
            return printed;
        
    

    private static Thread createObserver(final MyBoundedBlockingQueue ctr,
            final String name) 
        return new Thread(new Runnable() 
            @Override
            public void run() 
                try 
                    System.out.println(name + ": saw " + ctr.getWhenNotEmpty());
                 catch (InterruptedException e) 
                    e.printStackTrace(System.err);
                
            
        , name);
    

这是我在“阻塞”时看到的:

(skipped a lot)

85:0 ready to enter synchronized block
85:0 entered synchronized block
85:2: saw hello
85:1: saw world

86:0 ready to enter synchronized block
86:0 entered synchronized block
86:2: saw hello
86:1: saw world

87:0 ready to enter synchronized block
............................................

..........................................................................

..................................................................................
(goes "forever")

但是,如果我将 addWhenHasSpace 和 getWhenNotEmpty 方法的 while(...) 循环中的 notify() 调用更改为 notifyAll(),它“总是”通过。

我的问题是:为什么在这种情况下 notify() 和 notifyAll() 方法的行为会有所不同,以及 notify() 的行为为什么会这样?

我希望这两种方法在这种情况下的行为方式相同(两个线程正在等待,一个被阻塞),因为:

    在我看来,在这种情况下 notifyAll() 只会唤醒另一个线程,与 notify() 相同; 看起来唤醒线程的方法的选择会影响被唤醒的线程(我猜想变成 RUNNABLE)和主线程(已被 BLOCKED)稍后竞争的方式锁——不是我对 javadoc 的期望,也不是在互联网上搜索该主题。

或者也许我完全做错了什么?

【问题讨论】:

你为什么要循环调用notify() wait()?您很可能需要两台显示器 - 一台用于“有东西要消耗”,另一台用于“有空间要填充”。 谢谢,你让我意识到我在做一件愚蠢的事情,不断唤醒线程,同时等待 任何 线程的条件没有改变。我专注于这个问题并且没有看到(明显的)更好的方法。仍然给我留下一个问题,为什么 notify() 和 notifyAll() 在这种情况下表现不同,但由于有更好的方法可以做到这一点,这个问题只是理论上的兴趣。 【参考方案1】:

不用太深入研究您的代码,我可以看到您正在使用单个条件变量来实现一个包含一个生产者和多个消费者的队列。这是一个麻烦的秘诀:如果只有一个条件变量,那么当消费者调用notify() 时,无法知道它会唤醒生产者还是唤醒另一个消费者。

有两种方法可以摆脱这个陷阱:最简单的方法是始终使用notifyAll().

另一种方法是停止使用synchronizedwait()notify(),而是使用java.util.concurrent.locks 中的工具。

单个 ReentrantLock 对象可以为您提供两个(或更多)条件变量。一个生产者专用通知消费者,一个消费者专用通知生产者。

注意:当您切换到使用 ReentrantLocks 时,名称会发生​​变化:o.wait() 变为 c.await()o.notify() 变为 c.signal()

【讨论】:

【参考方案2】:

似乎存在某种使用内在锁定的公平/闯入 - 可能是由于一些优化。我猜,本机代码检查当前线程是否已通知监视器它即将等待并允许它获胜。

synchronized 替换为ReentrantLock,它应该可以正常工作。这里的不同之处在于ReentrantLock 如何处理它已通知的锁的服务员。


更新:

有趣的发现在这里。你看到的是main线程进入之间的竞争

        synchronized (ctr) 
            System.out.println(i + ":0 entered synchronized block");
            ctr.addWhenHasSpace("hello");
            ctr.addWhenHasSpace("world");
        

而另外两个线程进入它们各自的synchronized 区域。如果主线程在两者中的至少一个之前没有进入其同步区域,您将体验到您所描述的这种活锁输出。

似乎正在发生的事情是,如果两个消费者线程都首先命中同步块,它们将在notifywait 之间相互乒乓。 JVM 可能会在线程被阻塞时将等待优先级的线程授予监视器。

【讨论】:

“当线程被阻塞时,JVM 可能会将等待优先级的线程授予监视器。” — 看起来像这样,但仅当 notify() 是使用过,这对我来说仍然是个谜,因为 notifyAll() 可能不会以任何方式影响 BLOCKED 主线程......

以上是关于Java:两个 WAITING + 一个 BLOCKED 线程, notify() 导致活锁, notifyAll() 没有,为啥?的主要内容,如果未能解决你的问题,请参考以下文章

如何在登录页面中使用 BLoC

转:java线程状态说明,Jstack线程状态BLOCKED/TIMED_WAITING/WAITING解释

9.Java常用类(waiting)

Flutter BloC 模式:基于另一个 BloC 的流更新 BloC 流

线程处于 WAITING 状态:java.lang.Thread.State: WAITING (parking) at sun.misc.Unsafe.park(Native Method)

对 Bloc 使用单例