阻塞与唤醒,等待队列的舞台

Posted 毛奇志

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了阻塞与唤醒,等待队列的舞台相关的知识,希望对你有一定的参考价值。

一、前言

上篇的文章中我们介绍了AQS源码中lock方法和unlock方法,这两个方法主要是用来解决并发中互斥的问题,这篇文章我们主要介绍AQS中用来解决线程同步问题的await方法、signal方法和signalAll方法,这几个方法主要对应的是synchronized中的wait方法、notify方法和notifAll方法。

二、await()与signal()/signalAll()的实际应用

2.1 await()与signal()/signalAll()的实际应用

我们实现一个阻塞的队列。

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class MyBlockedQueue<T> 
    final Lock lock = new ReentrantLock();  // 使用Lock,生产者和消费者是竞争同一个锁,使用synchronized,生产者和消费者竞争的是同一个锁对象
    // 条件变量:队列不满
    final Condition notFull = lock.newCondition();
    // 条件变量:队列不空
    final Condition notEmpty = lock.newCondition();
    private volatile List<T> list = new ArrayList<>();

    // 入队
    void enq(T x) 
        lock.lock();
        try 
            while (list.size() == 10) 
                // 等待队列不满
                try 
                    notFull.await();
                 catch (InterruptedException e) 
                    e.printStackTrace();
                
            
            // 省略入队操作
            list.add(x);
            // 入队后, 通知可出队
            notEmpty.signal();
         finally 
            lock.unlock();
        
    

    // 出队
    void deq() 
        lock.lock();
        try 
            while (list.isEmpty()) 
                // 等待队列不空
                try 
                    notEmpty.await();
                 catch (InterruptedException e) 
                    e.printStackTrace();
                
            
            list.remove(0);
            // 出队后,通知可入队
            notFull.signal();
         finally 
            lock.unlock();
        
    

    public List<T> getList() 
        return list;
    


    public static void main(String[] args) throws InterruptedException 
        MyBlockedQueue<Integer> myBlockedQueue = new MyBlockedQueue<>();
        Thread thread1 = new Thread(new Runnable() 
            @Override
            public void run() 
                for (int i = 0; i < 20; i++) 
                    myBlockedQueue.enq(i);
                
            
        );
        Thread thread2 = new Thread(new Runnable() 
            @Override
            public void run() 
                for (int i = 0; i < 10; i++) 
                    myBlockedQueue.deq();
                
            
        );
        thread1.start();
        thread2.start();
        try 
            Thread.sleep(3000);
         catch (InterruptedException e) 
            e.printStackTrace();
        
        System.out.println(Arrays.toString(myBlockedQueue.getList().toArray()));
    

运行的结果如下(输出的是后面的10位):

[10, 11, 12, 13, 14, 15, 16, 17, 18, 19]

我们可以看到condition在多线程的中使用,类似于实现了线程之前的通信:
(1) 当某个条件满足的时候,执行某个线程中操作;
(2) 当某个条件不满足的时候,将当前的线程挂起,等待这个条件满足的时候,其他的线程唤醒当前线程。

在写 生产者-消费者 模型的时候,一个常见的易错点一定要记住, 如果使用Lock,生产者和消费者是竞争同一个锁,如果使用synchronized,生产者和消费者竞争的是同一个锁对象。

2.2 ConditionObject类和Node类

2.2.1 ConditionObject类

ConditionObject类的属性

public class ConditionObject implements Condition, java.io.Serializable 
    private static final long serialVersionUID = 1173984872572414699L;   // 实现Serializable接口,显式指定序列化字段
    private transient Node firstWaiter;   // 作为Node类型,指向等待队列中第一个节点
    private transient Node lastWaiter;    // 作为Node类型,指向等待队列中最后一个节点
    private static final int REINTERRUPT =  1;  // reinterrupt 设置为final变量,命名易懂
    private static final int THROW_IE    = -1;  // throw InterruptedException

ConditionObject类的方法

值得注意的是,synchronized + wait() + notify() 等效于 lock + await() + signal()。所以,Condition 等价于 wait()/notify(),Condition是JUC包中实现的wait()+notify()。

另一个需要注意的地方是,锁池和等待池是独立的两个东西,wait() 阻塞进入等待池并不是同步代码块的结束,当前线程释放锁,但是同步代码块并没有结束,一旦被唤醒,还是要从阻塞的地方开始执行,执行未完成的同步代码块,知道同步代码块结束,才释放同步锁,进入锁池。

2.2.2 Node类

对于Node节点,属性包括七个(重点是前面五个)

volatile int waitStatus; //当前节点等待状态
volatile Node prev; //上一个节点
volatile Node next; //下一个节点
volatile Thread thread; //节点中的值
Node nextWaiter; //下一个等待节点
static final Node SHARED = new Node();  //指示节点共享还是独占,默认初始是共享
static final Node EXCLUSIVE = null;

(1) 处在同步队列中使用到的属性(加锁、解锁)包括:next、prev、thread、waitStatus,所以同步队列是双向非循环链表,涉及的类变量AbstractQueuedSynchronizer类中的head和tail,分别指向同步队列中的头结点和尾节点。

(2) 处在等待队列中使用到的属性(阻塞、唤醒)包括:nextWaiter、thread、 waitStatus,所以等待队列是单向非循环链表,涉及的类变量ConditionObject类中的firstWaiter和lastWaiter,分别指向等待队列中的头结点和尾节点。

(3) AQS队列是工作队列、同步队列,是非循环双向队列:当使用到head tail的时候,就说AQS队列建立起来了,单个线程不使用到head tail,所以AQS队列没有建立起来;

(4) 等待队列是非循环单向队列:当使用firstWaiter lastWaiter的时候,就说等待队列建立起来了。

(5) lock()和unlock()就是操作同步队列:lock()将线程封装到节点里面(此时,节点使用到的属性是thread、nextWaiter、waitStatus),放到同步队列/AQS队列中,unlock()将存放线程的节点从同步队列中拿出来,表示这个线程工作完成。

(6) await()和signal()就是操作等待队列:await()将线程封装到节点里面(此时,节点使用到的属性是thread prev next waitStatus),放到等待队列里面,signal()从等待队列中拿出元素。

问题:为什么负责同步队列的head和tail在AbstractQueuedSynchronizer类中,但是负责等待队列的firstWaiter和lastWaiter在ConditionObject类中?
回答:
(1) 对于线程同步互斥,是直接通过ReentrantLock类对象lock.lock()、lock.unlock()实现的,而ReentrantLock类对象是调用AQS类实现加锁解锁的,所以负责同步队列的head和tail在AbstractQueuedSynchronizer类中;
(2) 对于线程阻塞和唤醒,是通过ReentrantLock类对象lock.newCondition()得到一个对应,condition引用指向这个对象,然后condition.await() condition.signal()实现的,所以负责等待队列的firstWaiter和lastWaiter在ConditionObject类中。

三、await()源码

3.1 Condition.await()执行图

我们再来看看await的源码,具体如下图:

对于上图的解释:
第一个方法插入到等待队列中,第二个方法释放同步锁,第三个方法阻塞当前线程,三个方法是一个整体,不能分开。比如第二个方法先于第三个方法,表示先释放同步锁,再挂起线程,目的是为了避免当前线程没有释放的锁的时候,然后就被挂起,从而导致其他的线程获取不到锁,亦或者导致死锁的情况。

整体流程详细:

第一步,如果某个线程的调用了await的方法,会将这个线程通过CAS和尾插法的方式将这个等待的线程添加到AQS的等待队列中去(通过CAS和尾插法的方式是指:在cas保证线程安全的情况下,使用尾插法三步将这个线程放到一个Node结点中,插入到AQS的等待队列),对应代码Node node = addConditionWaiter();

第二步,将当前的线程进行解锁(对当前线程解锁的目的是为了避免这个线程没有释放的锁的时候,然后就被挂起,从而导致其他的线程获取不到锁,亦或者导致死锁的情况),对应代码 int savedState = fullyRelease(node);

第三步,将当前的线程进行park(park之后这个线程只能被动地等待其他的线程调用signal方法将当前的线程unpark),对应代码 LockSupport.park(this);

while (!isOnSyncQueue(node))     // addConditionWaiter()返回的node,不在同步队列中,就park
   LockSupport.park(this);    // 将当前的线程进行park,this表示AbstractQueuedSynchronizer对象,表示当前线程
   if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
       break;

小结:第一个方法使用数据结构插入到等待队列中;
第二个方法使用unpark释放同步锁:unparkSuccessor(h);
第三个方法使用park阻塞当前线程:LockSupport.park(this);

3.2 condition.await()源码解析

condition.await()源码解析,如下:
第一步,addConditionWaiter()方法将添加节点到等待队列中
第二步,addConditionWaiter()返回存放当前线程的新节点后,将节点作为fullyRelease()方法的参数,这个fullyRelease()方法是要将新节点中存放的当前的线程进行解锁,并返回savedState()
第三步,isOnSyncQueue()提供判断,LockSupport.park(this);将当前的线程进行park(park就是阻塞,unpark就是唤醒)

3.2.1 第一步:addConditionWaiter()添加节点到等待队列中

private Node addConditionWaiter() 
   Node t = lastWaiter;
   if (t != null && t.waitStatus != Node.CONDITION) 
       unlinkCancelledWaiters();
       t = lastWaiter;
   
   // 新建一个节点,节点存放当前线程,状态设置为正在等待条件CONDITION
   Node node = new Node(Thread.currentThread(), Node.CONDITION);
   if (t == null)
       firstWaiter = node;
   else
       t.nextWaiter = node;
   lastWaiter = node;
   return node;

addConditionWaiter()添加节点到等待队列中,包含三种情况:

第一种情况,当前等待队列中没有节点(此时firstWaiter和tailWaiter都为null)

第二种情况,当前等待队列中1-n个节点(此时firstWaiter和tailWaiter都不为null,如果一个节点,则首尾指针都指向这个节点,如果大于一个节点,则首尾指针指向相应的节点)

第三种情况,当前等待队列中1-n个节点(此时firstWaiter和tailWaiter都不为null,如果一个节点,则首尾指针都指向这个节点,如果大于一个节点,则首尾指针指向相应的节点),但是尾指针所指向节点不是在等待队列中等待( t.waitStatus != Node.CONDITION)

3.2.1.1 第一种情况:当前等待队列中没有节点

第一种情况,当前等待队列中没有节点(此时firstWaiter和tailWaiter都为null),从addConditionWaiter方法中抽取第一种情况的程序执行,如下:

Node t = lastWaiter;  // 因为要采用尾插法,先将尾指针记录下来
Node node = new Node(Thread.currentThread(), Node.CONDITION);
firstWaiter = node;   // 因为现在等待队列中就只有这一个刚刚新建的Node节点,所以,将负责等待队列的首尾指针都指向这个节点
lastWaiter = node;
return node;   // 返回当前线程新建好的这个节点

程序执行如上,一共五句Java代码:先将lastWaiter记录下来 Node t = lastWaiter;(因为要采用尾插法,先将尾指针记录下来),然后使用当前线程新建一个节点,这个新节点Node的thread属性为当前线程 Node node = new Node(Thread.currentThread(), Node.CONDITION);(表示这个节点存放的就是当前线程,将当前线程放到一个节点中,然后这个节点放入等待队列中),waitStatus=Condition(-2),然后再将负责等待队列的首尾指针都指向这个节点( firstWaiter = node; lastWaiter = node;),因为现在等待队列中就只有这一个刚刚新建的Node节点,最后返回当前线程新建好的这个节点 return node;

3.2.1.2 第二种情况:当前等待队列中1-n个节点

第二种情况,当前等待队列中1-n个节点(此时firstWaiter和tailWaiter都不为null,如果一个节点,则首尾指针都指向这个节点,如果大于一个节点,则首尾指针指向相应的节点) ,从addConditionWaiter方法中抽取第一种情况的程序执行,如下:

Node t = lastWaiter;  // 因为要采用尾插法,先将尾指针记录下来
Node node = new Node(Thread.currentThread(), Node.CONDITION);
t.nextWaiter = node;  // 尾插法经典两步:(1)当前节点下一个节点为新建节点;(2)等待队列尾指针指向新建节点
lastWaiter = node;
return node;  // 返回新加入等待队列的节点

程序执行为如上,先将lastWaiter记录下来 Node t = lastWaiter;(因为要采用尾插法,先将尾指针记录下来),然后使用当前线程新建一个节点 Node node = new Node(Thread.currentThread(), Node.CONDITION); 然后将当前节点下一个节点为新建节点 t.nextWaiter = node;,在将等待队列尾指针指向新建节点 lastWaiter = node;,最后返回新加入等待队列的节点 return node;(里面存放当前线程) 。

尾插法经典两步:
(1) 当前节点下一个节点为新建节点 t.nextWaiter = node;
(2) 等待队列尾指针指向新建节点 lastWaiter = node;

3.2.1.3 第三种情况:当前等待队列中1-n个节点,但是尾指针所指向节点不是在等待队列中等待

第三种情况,当前等待队列中1-n个节点(此时firstWaiter和tailWaiter都不为null,如果一个节点,则首尾指针都指向这个节点,如果大于一个节点,则首尾指针指向相应的节点),但是尾指针所指向节点不是在等待队列中等待( t.waitStatus != Node.CONDITION),从addConditionWaiter方法中抽取第一种情况的程序执行,如下:

Node t = lastWaiter;     // 记录尾指针所指向节点,为使用尾插法准备
unlinkCancelledWaiters();   //相对于第二种的特殊情况,这里需要处理
t = lastWaiter;   //相对于第二种的特殊情况,这里需要处理
Node node = new Node(Thread.currentThread(), Node.CONDITION);
t.nextWaiter = node;   // 尾插法经典两步
lastWaiter = node;
return node;

第三种情况,执行Java程序一共七句,其中有五句和第二种情况一样,不解释了,看新增的两句

unlinkCancelledWaiters();   
// 解释:解绑所有的处于取消状态的等待者,
// 这个使用canceled,表示已取消状态,这里使用watiers,表示不止一个
t = lastWaiter;   // 解释:重置一下t,继续记录新的尾巴指针指向的节点,为下面尾插法准备

解释一下unlinkCancelledWaiters()程序

private void unlinkCancelledWaiters() 
    Node t = firstWaiter;   // 1、记录等待队列中头指针所指向节点
    // 为什么这里记录头指针指向,因为等待队列是非循环单链表,所以while循环删除已取消结点,只能从头结点开始遍历
    Node trail = null;   // 2、局部变量trail,下面不断移动t,用t来记录当前节点,但是因为等待列表是单链表,所以无法记录当前节点t的上一个节点,所有要在t还没有移动时候,将当前t记录下来放到trail中,然后t再移动
    while (t != null) 
        Node next = t.nextWaiter;   // 3、准备移动,trail记录t,单链表基本操作
        if (t.waitStatus != Node.CONDITION) 
            t.nextWaiter = null;   
            if (trail == null)  // 4、这是执行 trail=t 之前执行的,trail=t 执行之前,不断向后移动,同时不断修改头指针firstWaiter
                 // 4.1 为什么trail=t 执行之前要不断修改头指针firstWaiter?
                 //因为t.waitStatus != Node.CONDITION,当前队列不行,所以要不断修改头指针firstWaiter
                firstWaiter = next;  //唯一一个设置头指针的地方,
                 // 4.2 为什么执行了trail=t之后就不要修改头指针了?
                 // 因为只要找到了为Node.CONDITION的t,就不会删除了,就是保留操作了,就是可以使用的等待队列了
            else   // 这是执行 trail=t 之后执行的
                trail.nextWaiter = next;   //  5、执行了 trail=t 之后,trail -> t -> next,因为t.waitStatus != Node.CONDITION,所以要去掉t,就执行  trail.nextWaiter = next;  变为 trail -> next
                //5.1 为什么trail=t 执行之前不需要删除t,因为这时候trail==null
                //5.2 为什么trail=t 执行之后要删除t,因为这时候firstWaiter确定了,等待队列确定了,当然要删去不合法的t,t.waitStatus != Node.CONDITION
            if (next == null)  // 6、这是最后一次循环执行的,当next为null,表示后面后面没有了,要跳出while循环了,就是设置这是尾指针指向了,但是此时t.waitStatus != Node.CONDITION,不能设置 lastWaiter = t;所以设置为t的前置节点 lastWaiter = trail;   
                lastWaiter = trail;   
        
        else
            trail = t;   // t的上一个记录到trail中
        t = next;  // t移动
    

对于unlinkCancelledWaiters()方法的解释:

(1) 记录等待队列中头指针所指向节点 Node t = firstWaiter;

(2) 新建局部变量trail,用来不断移动t,用t来记录当前节点,但是因为等待列表是单链表,所以无法记录当前节点t的上一个节点,所有要在t还没有移动时候,将当前t记录下来放到trail中,然后t再移动 Node trail = null;

(3) 准备移动,trail记录t,这是单链表基本操作:while循环中第一句是 Node next = t.nextWaiter; ,最后一句是 t = next; ,合并起来就是 t= t.nextWaiter; 表示每次while循环都是移动一次t

(4) trail=t 执行之前,不断向后移动,同时不断修改头指针firstWaiter
① 为什么trail=t 执行之前要不断修改头指针firstWaiter?因为t.waitStatus != Node.CONDITION,当前队列不行,所以要不断修改头指针firstWaiter。
② 为什么执行了trail=t之后就不要修改头指针了?因为只要找到了为Node.CONDITION的t,就不会删除了,就是保留操作了,就是可以使用的等待队列了。

(5) 执行了 trail=t 之后,链表变为 trail -> t -> next,因为 t.waitStatus != Node.CONDITION,所以要去掉t,就执行 trail.nextWaiter = next; 将链表变为 trail -> next
① 为什么trail=t 执行之前不需要删除 t ?因为这时候trail==null。
② 为什么trail=t 执行之后要删除 t ?因为这时候firstWaiter确定了,等待队列确定了,当然要删去不合法的t,t.waitStatus != Node.CONDITION。

(6) if (next == null) 这是最后一次循环执行的,当next为null,表示后面后面没有了,要跳出while循环了,就是设置这是尾指针指向了,但是此时t.waitStatus != Node.CONDITION,不能设置 lastWaiter = t;所以设置为t的前置节点 lastWaiter = trail;

现在我来看来一个数据结构的基本问题,等待队列作为单链表,只有next指针,没有prev指针那么它是如何记录上一个节点的。

问题(单链表基础知识):trail是如何记录t的上一节点的
回答:如果代码写成 t= t.nextWaiter; 就直接移动了t指针,那么trail无法记录t的上一个节点,所以我们考虑如下:
步骤1:将t的移动拆分开来, t= t.nextWaiter;变为

Node next = t.nextWaiter;
trail = t;
t = next; 

第一句和第三句实际上就是 t= t.nextWaiter;我们将其拆分开来,在t值还没有修改的时候,在第二句的时候,执行 trail=t ,将t记录下来。

步骤2:最后将三句放在同一个while循环中就实时同步了

 while (t != null)
    Node next = t.nextWaiter;
    trail = t;
    t = next; 
 

3.2.2 第二步:fullyRelease()方法将新节点中存放的当前的线程进行解锁

addConditionWaiter()为等待队列添加新节点,并返回存放当前线程的新节点后,将节点作为fullyRelease() 方法的参数,这个fullyRelease()方法是要将新节点中存放的当前的线程进行解锁,并返回savedState()

final int fullyRelease(Node node) 
    boolean failed = true;   // 将标志位默认设置为失败,为什么刚开始的时候默认失败?因为刚开始的时候没有执行,一定要执行之后才设置为成功
    try 
        int savedState = getState();  // 获取当前类变量state,这个state是用来记录当前线程的加锁次数的(因为synchronized和lock都是可重入锁,所以可以加锁多次)
        if (release(savedState))   // 如果释放当前线程成功了,要阻塞,就是要进入等待队列,就要释放同步锁
            failed = false;  // 释放当前线程成功,那么failed标志位设置为false
            return savedState;   // 返回当前线程加锁次数,为0就是完全解锁成功了
         else 
            throw new IllegalMonitorStateException();
        
     finally 
        if (failed) // 如果failed标志位为true,表示释放锁失败,节点状态设置为被取消
            node.waitStatus = Node.CANCELLED;
    

解释一下fullyRelease方法,如下:
(1) 将标志位默认设置为失败 boolean failed = true; ,为什么刚开始的时候默认失败?因为刚开始的时候没有执行,一定要执行之后才设置为成功。
(2) 获取当前类变量state int savedState = getState(); ,这个state是用来记录当前线程的加锁次数的(因为synchronized和lock都是可重入锁,所以可以加锁多次)。
(3) 如果释放当前线程成功了 if (release(savedState)) ,要阻塞,就是要进入等待队列,就要释放同步锁。
(4) 释放当前线程成功,那么failed标志位设置为false failed = false;,并返回当前线程加锁次数,为0就是完全解锁成功了 return savedState;
(5) 如果failed标志位为true if (failed) ,表示释放锁失败,节点状态设置为被取消 node.waitStatus = Node.CANCELLED;

这个fullyRelease方法调用release方法完成释放锁,我们看一下release方法,如下:

public final boolean release(int arg)   // 参数就是线程加锁次数
    if (tryRelease(arg))    // 释放同步锁 上一篇博客讲过  参数是线程加锁次数
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);   
        return true;
    
    return false;

解锁的核心方法就是这个release()方法,fullyRelease()方法只是调用这个release()方法,最终还是由这个release()方法提供判断,tryRelease()只是给这个release()方法提供判断。对于release()释放同步锁的逻辑总共有三种情况:

情况1:只有一个线程加锁,没有形成AQS队列

这个并发过程中,只有一个线程加锁,所以AQS队列没有创建,这里if判断不成立,就是tryRelease()判断为false,release()方法直接返回false;

情况2:两个线程加锁,形成AQS队列,当线程B解锁的时候

在并发加锁过程中(就是上一篇博客中的lock.lock()加锁),线程A加锁成功,线程B也来加锁,但是现在线程A没有解锁,这时候形成一个AQS队列,(也就是一个加锁队列,线程A和线程B都锁在这里,线程A在线程B前面,就是上一篇博客中的),然后线程A解锁完成了,AQS队列中就只剩下线程B,然后线程B来解锁,这个时候线程B就是AQS队列的队首元素,这个时候队首线程B的waitStatus的值为0,if中的if也不会执行(有了AQS队列可以通过第一个if (tryRelease(arg)),但是 if (h != null && h.waitStatus != 0) 判断的时候 h != null 且 h.waitStatus == 0,所以无法通过第二个if)整个方法返回true。

情况3:两个线程加锁,形成AQS队列,当线程A解锁的时候

在并发加锁过程中(就是上一篇博客中的lock.lock()加锁),线程A加锁成功,线程B也来加锁,但是现在线程A没有解锁,这时候形成一个AQS队列,(也就是一个加锁队列,线程A和线程B都锁在这里,线程A在线程B前面,,就是上一篇博客中的),然后线程A先来解锁,这个时候线程A就是AQS队列的队首元素,由于AQS队列中有线程A和线程B两个元素,这个时候队首线程A的waitStatus的值不为0,if中的if执行,unparkSuccessor(h); 解锁头结点的下一个节点(就是解锁线程B),整个方法返回true。

最后,这里解释一下tryRelease(),很简单,如下:

protected final boolean tryRelease(int releases) 
    int c = getState() - releases;    // 线程加锁次数-线程加锁次数=0
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    boolean free = false;   // 默认释放成功为false
    if (c == 0)   // 加锁次数为0
        free = true;    // 标志位释放同步锁成功
        setExclusiveOwnerThread(null);  // 独占线程为null
    
    setState(c);  // 重新设置类变量state 线程加锁次数
    return free;

小结一下fullyRelease()、release()、tryRelease()三个方法:调用链路是 fullyRelease() -> release() -> tryRelease(),三个方法中,解锁的核心方法就是这个release()方法,fullyRelease()方法只是调用这个release()方法,最终还是由这个release()方法提供判断,tryRelease()只是给这个release()方法提供判断。

3.2.3 第三步:阻塞当前线程

isOnSyncQueue()提供判断,LockSupport.park(this);将当前的线程进行park(解释:park就是阻塞,unpark就是唤醒)

final boolean isOnSyncQueue(Node node) 
    // 当前处于等待状态,而且同步队列中没有前驱者  
    if (node.waitStatus == Node.CONDITION || node.prev == null)   
        return false;
    // 如果这个node节点在同步队列中有后继者,他一定在同步队列中,
    // prev和next是作用于同步队列的指针,nextWaiter作用于等待队列的指针
    if (node.next != null) 
        return true;
    // 上面两个条件都不满足,都被else了  
    // node.next==null node.prev!=null&&node.waitStatus != Node.CONDITION
    return findNodeFromTail(node);  

// 能够进入这个方法的,一定是node.next==null node.prev!=null,
// 所以,在同步队列中,从后往前遍历,找到这个节点返回true,一直到最前面都没找到,返回false
private boolean findNodeFromTail(Node node)    
    Node t = tail;
    for (;;) 
        if (t == node)
            return true;
        if (t == null)
            return false;
        t = t.prev;
    

最后执行LockSupport.park(this),完成了将当前的线程进行park(解释:park就是阻塞,unpark就是唤醒)。

四、signal()/signalAll()源码

4.1 condition.notify()整体流程图

我们再来看看signal方法和signalAll方法的源码

当某个线程调用signal方法或者signalAll方法,其大致的步骤是:

第一步,将等待队列中的节点放到AQS同步执行队列中(每个节点Node里面存放着线程thread),具体地,signal方法会将当前的等待队列中第一个等待的线程的节点加入到原来的AQS队列中去,而signalAll方法是将等待队列中的所有的等待线程的节点全部加入到原来的AQS的队列中去。

第二步,获取同步锁:让他们重新获取锁,进行unpark。

第三步,唤醒当前线程:线程被唤醒,执行对应的线程中代码。

4.2 condition.notify()源码解析

4.2.1 等待队列中的节点放到AQS同步执行队列中

condition.notify()方法作用:将等待队列中的节点放到AQS同步执行队列中,其中每个节点Node里面存放着线程thread

condition.notify()方法步骤分为两步
步骤1:等待队列中的移除第一个节点
步骤2:同步队列中的添加这个节点

4.2.1.1 等待队列中的移除第一个节点

public final void signal() 
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    Node first = firstWaiter;    // 将等待队列中头指针指向的节点记录下来,因为我们要删去等待队列中第一个节点
    if (first != null)
        doSignal(first);

private void doSignal(Node first) 
    do 
        if ( (firstWaiter = first.nextWaiter) == null)   // 下一个等待元素为空,表示等待队列中只有一

以上是关于阻塞与唤醒,等待队列的舞台的主要内容,如果未能解决你的问题,请参考以下文章

阻塞与唤醒,等待队列的舞台

阻塞与唤醒,等待队列的舞台

阻塞与唤醒,等待队列的舞台

Java多线程:阻塞队列与等待唤醒机制初探

进程的休眠与唤醒(等待队列)

在Linux驱动程序中,使用等待队列的作用?