原创JUC包源码分析06 | AbstractQueuedSynchronizer

Posted 开发自由行

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了原创JUC包源码分析06 | AbstractQueuedSynchronizer相关的知识,希望对你有一定的参考价值。

文章导读:




备注:JDK版本:1.8


在文章中,从源码的角度详细分析了AbstractQueuedSynchronizer类获取独占锁和共享锁的实现逻辑,并对锁的释放过程也做了充分的介绍。本文将对条件队列Condition的实现逻辑做细致的分析,并对AbstractQueuedSynchronizer其他的常用方法做源码的分析。


Condition类能够实现多线程的通信机制,类似synchronized的wait和notify操作。Condition类的内部也提供了相似的方法。

public interface Condition {
/**     * 线程等待直到被signal,或者中断 */ void await() throws InterruptedException;
/**     * 线程等待直到被signal,不会被中断.     */ void awaitUninterruptibly();
    /**     * 线程等待直到被signal、中断或者超时. */ long awaitNanos(long nanosTimeout) throws InterruptedException;
/** * 线程等待直到被signal、中断或者超时. */ boolean await(long time, TimeUnit unit) throws InterruptedException;
   /**     * 线程等待直到被signal、中断或者到达deadline时间. */ boolean awaitUntil(Date deadline) throws InterruptedException;
/**     * 唤醒等待的线程. */ void signal();
/**     * 唤醒等待的所有线程. */ void signalAll();}

Condition类是借助await()相关的方法使线程进入到等待状态;signal()、signalAll()方法唤醒等待的线程,两者的区别是signal()方法只能唤醒单个等待的线程,而signalAll()方法可以唤醒所有处于等待状态的线程。


既然Condition类是一个接口,那么自然需要相应的实现类来完成内部的逻辑。在JDK底层Condition的实现类有2个,如下图所示:

本文选取AbstractQueuedSynchronizer.ConditionObject类的实现逻辑,做源码层面的分析。


一、线程等待方法

从前面的Condition定义的接口源码,我们可以知道线程等待的方法包含await()、awaitNanos()和awaitUntil()等方法。这些方法实现的通用逻辑基本一致,只有一些超时情况的细节处理存在差异。


1、await()方法

调用await()方法,会使当前线程进入等待队列中进行等待。

/** * 使线程进入到等待队列  * 1、如果线程被干扰中断,抛出InterruptedException异常. * 2、阻塞线程,直到线程被signal或者干扰. */public final void await() throws InterruptedException {    // 如果线程被干扰 if (Thread.interrupted())        // 抛出InterruptedException异常 throw new InterruptedException();    // 往等待队列中添加node节点 Node node = addConditionWaiter();    // 释放锁,如果是重入锁,也会释放,返回线程重入锁的次数 int savedState = fullyRelease(node); int interruptMode = 0;    // 判断节点是否被signal加入到同步队列的尾部    // 如果没有,则证明该线程还是在等待队列中    // 挂起等待的线程 while (!isOnSyncQueue(node)) {        // 挂起当前线程  LockSupport.park(this);        // 检查线程是否被interrupt,被中断,则跳出循环        // 没被中断,且没有被signal则一直处于挂起状态 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; }     // 到这里为止,出了循环,就意味着线程被signal或者中断    // 也就意味着node节点添加到了同步队列的尾部 // ----------------------------------------------------------    // acquireQueued()方法尝试获取锁,如果被中断,则返回true // 如果在获取锁的过程中被中断了,并且之前的 interruptMode != THROW_IE,    // 那么也视为在 signal() 之后被中断,设为REINTERRUPT if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT;    // 只有在 signal() 前中断的线程才会在等待队列中留有nextWaiter节点    // 即会满足这个条件 if (node.nextWaiter != null) // 遍历等待队列,将状态不是CONDITION的节点从队列中删除 unlinkCancelledWaiters();  if (interruptMode != 0) // 抛出异常 或 重置中断标识位 reportInterruptAfterWait(interruptMode);}

代码注释中详细分析了await()方法的执行逻辑,具体经过了以下几个步骤:

  • 如果线程在进入等待队列之前就被中断,则抛出异常

  • 添加该线程封装的node节点到等待队列中

  • 不断轮询,判断该node节点是否进入到同步列尝试获取锁,同步队列中不存在,则挂起该线程

    • 当执行signal()方法后,会将node节点添加到同步队列尾部

    • 当线程被中断后,设置设置相应的interruptMode ,并将node节点添加到同步队列尾部

  • 节点进入到同步队列的尾部,会尝试获取锁

    • 如果获取锁的过程中,线程被中断,则会设置相应的interruptMode,也就意味着是线程被signal之后,遇到了中断操作 

    • 移除signal前中断,状态不是CONDITION的线程

    • 抛出异常或重置线程中断状态


1.1、 往等待队列中添加node节点的addConditionWaiter()方法

该方法的目的是往等待队列中添加该线程封装的node节点对象。

/** * 往等待队列中添加新的Node(Waiter). * @return Node */private Node addConditionWaiter() {    Node t = lastWaiter; // 获取等待队列的尾结点    // 如果尾结点的状态不是CONDITION,则会被清除. if (t != null && t.waitStatus != Node.CONDITION) {        //  unlinkCancelledWaiters();        // 重置等待队列的尾结点 t = lastWaiter; }    // 封装当前线程、mode为CONDITION的Node节点 Node node = new Node(Thread.currentThread(), Node.CONDITION);    // 等待队列未初始化 if (t == null)        // 设置头节点为当前node firstWaiter = node; else // 设置尾结点的nextWaiter为当前node t.nextWaiter = node;    // 尾结点指向当前node节点 lastWaiter = node; return node;}
// 遍历整个等待队列,舍弃状态不是Node.CONDITION的节点private void unlinkCancelledWaiters() { Node t = firstWaiter; Node trail = null; while (t != null) {        // 获取后续waiter Node next = t.nextWaiter;        // 如果节点的waitStatus不是Node.CONDITION状态 if (t.waitStatus != Node.CONDITION) { t.nextWaiter = null; if (trail == null) firstWaiter = next; else                // 舍弃等待状态不为Node.CONDITION的节点                // 将trail的nextWaiter设置为当前节点的next节点 trail.nextWaiter = next; if (next == null) lastWaiter = trail; } else trail = t;        // 依次遍历所有的节点 t = next; }}

addConditionWaiter()方法的核心是将调用了await()方法的线程添加到等待队列中,等待队列的数据存储结构类似同步队列,但是等待队列是单向链表。

【原创】JUC包源码分析06 | AbstractQueuedSynchronizer


1.2、轮询判断线程是否添加到同步队列,未添加成功则执行挂起操作的isOnSyncQueue()方法

如果未触发signal()方法或者当前线程未受到干扰中断的操作,则会在等待队列中进行挂起。

/** * 判断node节点是否进入同步队列 */final boolean isOnSyncQueue(Node node) {    // node节点的waitStatus是Node.CONDITION,则一定是在等待队列中 if (node.waitStatus == Node.CONDITION || node.prev == null) return false;    // 同步队列的next节点才不为null if (node.next != null) // If has successor, it must be on queue return true;    // 在同步队列中,从后往前查找node是否在里面 return findNodeFromTail(node);}// 从同步队列中查找node节点private boolean findNodeFromTail(Node node) { Node t = tail;    for (;;) { // 不断循环        if (t == node) // 如果是尾节点 return true; if (t == null) return false;        t = t.prev; // 获取前节点,继续判断 }}

当node节点进入到同步队列中,则会进行尝试获取锁的流程。相反,如果isOnSyncQueue()方法返回false,则意味着该node节点将持续在等待队列中等待,为了节约CPU资源,会将该线程进行挂起操作。在线程挂起的等待的过程中,如果线程被干扰,则会中断等待的过程。


1.3、node节点进入等待队列后的操作流程

// 返回true,则代表node节点要在等待队列中等待while (!isOnSyncQueue(node)) {     // 挂起线程 LockSupport.park(this);    // 检测挂起的线程是否被中断    // 被中断则中断循环 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break;}/*** 检测挂起的线程的中断状态 * 如果没有被中断,返回 0* 如果在被signal之前中断了,返回 THROW_IE,表示需要抛出异常* 如果在signal之后中断了,返回 REINTERRUPT,表示不抛出,只恢复中断位*/ private int checkInterruptWhileWaiting(Node node) { return Thread.interrupted() ? (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) : 0;}
final boolean transferAfterCancelledWait(Node node) { // 如果CAS成功了,说明还没有被signal加入同步队列 if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {        enq(node); // 添加到同步队列 return true; }    // 已经被signal了,防止还没被加入到同步队列的情况 while (!isOnSyncQueue(node)) Thread.yield(); return false;}

从上面的业务逻辑中,我们可以很清晰的知道当等待队列中的线程被干扰中断后,会进入到同步队列的尾部。当然,当执行signal()方法后,也会将当前线程所在的node节点添加到同步队列的尾部。


既然存在两种进入同步队列的场景,自然需要对这部分node节点做进一步的处理工作。

// 到这里为止,出了循环,就意味着线程被signal或者中断// 也就意味着node节点添加到了同步队列的尾部// ----------------------------------------------------------// acquireQueued()方法尝试获取锁,如果被中断,则返回true// 如果在获取锁的过程中被中断了,并且之前的 interruptMode != THROW_IE,// 那么也视为在 signal() 之后被中断,设为REINTERRUPTif (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT;// 只有在 signal() 前中断的线程才会在等待队列中留有nextWaiter节点// 即会满足这个条件if (node.nextWaiter != null) // 遍历等待队列,将状态不是CONDITION的节点从队列中删除 unlinkCancelledWaiters(); if (interruptMode != 0) // 抛出异常 或 重置中断标识位 reportInterruptAfterWait(interruptMode);  // 遍历整个等待队列,舍弃状态不是Node.CONDITION的节点private void unlinkCancelledWaiters() { Node t = firstWaiter; Node trail = null; while (t != null) { // 获取后续waiter Node next = t.nextWaiter; // 如果节点的waitStatus不是Node.CONDITION状态 if (t.waitStatus != Node.CONDITION) { t.nextWaiter = null; if (trail == null) firstWaiter = next; else // 舍弃等待状态不为Node.CONDITION的节点 // 将trail的nextWaiter设置为当前节点的next节点 trail.nextWaiter = next; if (next == null) lastWaiter = trail; } else trail = t; // 依次遍历所有的节点 t = next; }}

上述源码注释部分,详细说明了这部分的逻辑,我们不得不感叹源码设计者思维的缜密和优秀。


awaitUntil()、awaitNanos()等方法的源码逻辑与await()的区别只是加了时间超时判断,其他逻辑完全一致。有兴趣的小伙伴建议结合源码和本文进行理解。


二、signal()、signalAll()方法

signal()方法用以唤醒等待队列中等待的第一个线程,并将该node节点添加到同步队列的尾部,进而能够进行锁获取的过程。

signalAll()可以唤醒所有等待队列中的线程。

// 从等待队列中找到第一个线程唤醒public final void signal() {     // 判断是否独占,类似synchronized,     // 等待的线程只能被让之前将它wait的线程signal if (!isHeldExclusively()) throw new IllegalMonitorStateException();    // 唤醒第一个node节点线程 Node first = firstWaiter; if (first != null) // 执行唤醒操作 doSignal(first);}
private void doSignal(Node first) { do { if ( (firstWaiter = first.nextWaiter) == null) lastWaiter = null;        // 从等待队列中移出 first.nextWaiter = null; } while (!transferForSignal(first) &&             // 关注头节点 (first = firstWaiter) != null);}
// 将node节点从等待队列到同步队列final boolean transferForSignal(Node node) {    // 判断node节点是否已经CANNELED if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) return false;
    // 未CANNELLED,则添加到同步队列尾部 Node p = enq(node); int ws = p.waitStatus;     // 此时节点还是挂起的,获取锁需要将结点的状态改为SIGNAL    // 如果节点被CANNNELED了,或者CAS状态为SIGNAL失败了,那么就唤醒线程,    // 让其自己走去获取锁的步骤,虽然线程可能会被再次挂起,但这是无害的操作 if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))        // 唤醒线程 LockSupport.unpark(node.thread); return true;}
// ------------------------------------------------------// signalAll()方法public final void signalAll() { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter; if (first != null)        // 唤醒所有的节点 doSignalAll(first);}
private void doSignalAll(Node first) { lastWaiter = firstWaiter = null; do { Node next = first.nextWaiter;        first.nextWaiter = null// 移除节点        // 将所有未CANNELLED的节点添加到同步队列中 transferForSignal(first); first = next;    } while (first != null); //循环所有的节点}

上述源码的注释中,可以看出当调用signal()方法或者signalAll()的时候,都会讲等待队列中的node添加到同步队列中进行锁对象的获取。代码的注释表明了signal()方法和signalAll()方法的区别。


三、其他方法

// 判断是否其他线程的等待获取锁额时间超过当前线程public final boolean hasQueuedPredecessors() { Node h = head; Node s; return h != t && ((s = h.next) == null || s.thread != Thread.currentThread());}
// 判断等待队列中是否有节点public final boolean hasQueuedThreads() { return head != tail;}
// 获取独占等待获取锁的线程集合public final Collection<Thread> getExclusiveQueuedThreads() { ArrayList<Thread> list = new ArrayList<Thread>();    // 遍历同步队列 for (Node p = tail; p != null; p = p.prev) {        if (!p.isShared()) { // 判断是否是独占 Thread t = p.thread; if (t != null)                list.add(t); // 添加线程 } } return list;}
// 获取共享等待获取的线程集合public final Collection<Thread> getSharedQueuedThreads() { ArrayList<Thread> list = new ArrayList<Thread>(); for (Node p = tail; p != null; p = p.prev) {        if (p.isShared()) {  // 共享 Thread t = p.thread; if (t != null) list.add(t); } } return list;}
// 获取等待队列中节点线程集合public final Collection<Thread> getWaitingThreads(ConditionObject condition) { if (!owns(condition)) throw new IllegalArgumentException("Not owner"); return condition.getWaitingThreads();}
// 等待队列中是否有等待的node节点线程public final boolean hasWaiters(ConditionObject condition) { if (!owns(condition)) throw new IllegalArgumentException("Not owner"); return condition.hasWaiters();}
// ..........

本文选择性的对开发中常用的其他方法做了说明。有兴趣的小伙伴,希望可以自己结合源码进行梳理阅读,学习贵在主动和坚持。


因作者能力有限,文中可能出现描述不清的地方,不足之处请指出!希望与你一起共进步^

本公众号 旨在记录 人开发工作、学习过程中的所得、所感和所想,欢迎一起交流学习

以上是关于原创JUC包源码分析06 | AbstractQueuedSynchronizer的主要内容,如果未能解决你的问题,请参考以下文章

JUC同步器框架AbstractQueuedSynchronizer源码图文分析

JUC源码分析-集合篇PriorityBlockingQueue

JUC包中的CountDownLatch源码实现分析

JUC并发编程与源码分析

JUC-AQS源码分析

JUC-AQS源码分析