原创JUC包源码分析06 | AbstractQueuedSynchronizer
Posted 开发自由行
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了原创JUC包源码分析06 | AbstractQueuedSynchronizer相关的知识,希望对你有一定的参考价值。
文章导读:
备注:JDK版本:1.8
在文章中,从源码的角度详细分析了AbstractQueuedSynchronizer类获取独占锁和共享锁的实现逻辑,并对锁的释放过程也做了充分的介绍。本文将对条件队列Condition的实现逻辑做细致的分析,并对AbstractQueuedSynchronizer其他的常用方法做源码的分析。
Condition类能够实现多线程的通信机制,类似synchronized的wait和notify操作。Condition类的内部也提供了相似的方法。
public interface Condition {
/**
* 线程等待直到被signal,或者中断
*/
void await() throws InterruptedException;
/**
* 线程等待直到被signal,不会被中断.
*/
void awaitUninterruptibly();
/**
* 线程等待直到被signal、中断或者超时.
*/
long awaitNanos(long nanosTimeout)
throws InterruptedException;
/**
* 线程等待直到被signal、中断或者超时.
*/
boolean await(long time, TimeUnit unit)
throws InterruptedException;
/**
* 线程等待直到被signal、中断或者到达deadline时间.
*/
boolean awaitUntil(Date deadline) throws InterruptedException;
/**
* 唤醒等待的线程.
*/
void signal();
/**
* 唤醒等待的所有线程.
*/
void signalAll();
}
Condition类是借助await()相关的方法使线程进入到等待状态;signal()、signalAll()方法唤醒等待的线程,两者的区别是signal()方法只能唤醒单个等待的线程,而signalAll()方法可以唤醒所有处于等待状态的线程。
既然Condition类是一个接口,那么自然需要相应的实现类来完成内部的逻辑。在JDK底层Condition的实现类有2个,如下图所示:
本文选取AbstractQueuedSynchronizer.ConditionObject类的实现逻辑,做源码层面的分析。
一、线程等待方法
从前面的Condition定义的接口源码,我们可以知道线程等待的方法包含await()、awaitNanos()和awaitUntil()等方法。这些方法实现的通用逻辑基本一致,只有一些超时情况的细节处理存在差异。
1、await()方法
调用await()方法,会使当前线程进入等待队列中进行等待。
/**
* 使线程进入到等待队列
* 1、如果线程被干扰中断,抛出InterruptedException异常.
* 2、阻塞线程,直到线程被signal或者干扰.
*/
public final void await() throws InterruptedException {
// 如果线程被干扰
if (Thread.interrupted())
// 抛出InterruptedException异常
throw new InterruptedException();
// 往等待队列中添加node节点
Node node = addConditionWaiter();
// 释放锁,如果是重入锁,也会释放,返回线程重入锁的次数
int savedState = fullyRelease(node);
int interruptMode = 0;
// 判断节点是否被signal加入到同步队列的尾部
// 如果没有,则证明该线程还是在等待队列中
// 挂起等待的线程
while (!isOnSyncQueue(node)) {
// 挂起当前线程
LockSupport.park(this);
// 检查线程是否被interrupt,被中断,则跳出循环
// 没被中断,且没有被signal则一直处于挂起状态
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
// 到这里为止,出了循环,就意味着线程被signal或者中断
// 也就意味着node节点添加到了同步队列的尾部
// ----------------------------------------------------------
// acquireQueued()方法尝试获取锁,如果被中断,则返回true
// 如果在获取锁的过程中被中断了,并且之前的 interruptMode != THROW_IE,
// 那么也视为在 signal() 之后被中断,设为REINTERRUPT
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
// 只有在 signal() 前中断的线程才会在等待队列中留有nextWaiter节点
// 即会满足这个条件
if (node.nextWaiter != null)
// 遍历等待队列,将状态不是CONDITION的节点从队列中删除
unlinkCancelledWaiters();
if (interruptMode != 0)
// 抛出异常 或 重置中断标识位
reportInterruptAfterWait(interruptMode);
}
代码注释中详细分析了await()方法的执行逻辑,具体经过了以下几个步骤:
如果线程在进入等待队列之前就被中断,则抛出异常
添加该线程封装的node节点到等待队列中
不断轮询,判断该node节点是否进入到同步队列尝试获取锁,同步队列中不存在,则挂起该线程
当执行signal()方法后,会将node节点添加到同步队列尾部
当线程被中断后,设置设置相应的interruptMode ,并将node节点添加到同步队列尾部
节点进入到同步队列的尾部,会尝试获取锁
如果获取锁的过程中,线程被中断,则会设置相应的interruptMode,也就意味着是线程被signal之后,遇到了中断操作
移除signal前中断,状态不是CONDITION的线程
抛出异常或重置线程中断状态
1.1、 往等待队列中添加node节点的addConditionWaiter()方法
该方法的目的是往等待队列中添加该线程封装的node节点对象。
/**
* 往等待队列中添加新的Node(Waiter).
* @return Node
*/
private Node addConditionWaiter() {
Node t = lastWaiter; // 获取等待队列的尾结点
// 如果尾结点的状态不是CONDITION,则会被清除.
if (t != null && t.waitStatus != Node.CONDITION) {
//
unlinkCancelledWaiters();
// 重置等待队列的尾结点
t = lastWaiter;
}
// 封装当前线程、mode为CONDITION的Node节点
Node node = new Node(Thread.currentThread(), Node.CONDITION);
// 等待队列未初始化
if (t == null)
// 设置头节点为当前node
firstWaiter = node;
else
// 设置尾结点的nextWaiter为当前node
t.nextWaiter = node;
// 尾结点指向当前node节点
lastWaiter = node;
return node;
}
// 遍历整个等待队列,舍弃状态不是Node.CONDITION的节点
private void unlinkCancelledWaiters() {
Node t = firstWaiter;
Node trail = null;
while (t != null) {
// 获取后续waiter
Node next = t.nextWaiter;
// 如果节点的waitStatus不是Node.CONDITION状态
if (t.waitStatus != Node.CONDITION) {
t.nextWaiter = null;
if (trail == null)
firstWaiter = next;
else
// 舍弃等待状态不为Node.CONDITION的节点
// 将trail的nextWaiter设置为当前节点的next节点
trail.nextWaiter = next;
if (next == null)
lastWaiter = trail;
}
else
trail = t;
// 依次遍历所有的节点
t = next;
}
}
addConditionWaiter()方法的核心是将调用了await()方法的线程添加到等待队列中,等待队列的数据存储结构类似同步队列,但是等待队列是单向链表。
1.2、轮询判断线程是否添加到同步队列,未添加成功则执行挂起操作的isOnSyncQueue()方法
如果未触发signal()方法或者当前线程未受到干扰中断的操作,则会在等待队列中进行挂起。
/**
* 判断node节点是否进入同步队列
*/
final boolean isOnSyncQueue(Node node) {
// node节点的waitStatus是Node.CONDITION,则一定是在等待队列中
if (node.waitStatus == Node.CONDITION || node.prev == null)
return false;
// 同步队列的next节点才不为null
if (node.next != null) // If has successor, it must be on queue
return true;
// 在同步队列中,从后往前查找node是否在里面
return findNodeFromTail(node);
}
// 从同步队列中查找node节点
private boolean findNodeFromTail(Node node) {
Node t = tail;
for (;;) { // 不断循环
if (t == node) // 如果是尾节点
return true;
if (t == null)
return false;
t = t.prev; // 获取前节点,继续判断
}
}
当node节点进入到同步队列中,则会进行尝试获取锁的流程。相反,如果isOnSyncQueue()方法返回false,则意味着该node节点将持续在等待队列中等待,为了节约CPU资源,会将该线程进行挂起操作。在线程挂起的等待的过程中,如果线程被干扰,则会中断等待的过程。
1.3、node节点进入等待队列后的操作流程
// 返回true,则代表node节点要在等待队列中等待
while (!isOnSyncQueue(node)) {
// 挂起线程
LockSupport.park(this);
// 检测挂起的线程是否被中断
// 被中断则中断循环
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
/**
* 检测挂起的线程的中断状态
* 如果没有被中断,返回 0
* 如果在被signal之前中断了,返回 THROW_IE,表示需要抛出异常
* 如果在signal之后中断了,返回 REINTERRUPT,表示不抛出,只恢复中断位
*/
private int checkInterruptWhileWaiting(Node node) {
return Thread.interrupted() ?
(transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
0;
}
final boolean transferAfterCancelledWait(Node node) {
// 如果CAS成功了,说明还没有被signal加入同步队列
if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
enq(node); // 添加到同步队列
return true;
}
// 已经被signal了,防止还没被加入到同步队列的情况
while (!isOnSyncQueue(node))
Thread.yield();
return false;
}
从上面的业务逻辑中,我们可以很清晰的知道当等待队列中的线程被干扰中断后,会进入到同步队列的尾部。当然,当执行signal()方法后,也会将当前线程所在的node节点添加到同步队列的尾部。
既然存在两种进入同步队列的场景,自然需要对这部分node节点做进一步的处理工作。
// 到这里为止,出了循环,就意味着线程被signal或者中断
// 也就意味着node节点添加到了同步队列的尾部
// ----------------------------------------------------------
// acquireQueued()方法尝试获取锁,如果被中断,则返回true
// 如果在获取锁的过程中被中断了,并且之前的 interruptMode != THROW_IE,
// 那么也视为在 signal() 之后被中断,设为REINTERRUPT
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
// 只有在 signal() 前中断的线程才会在等待队列中留有nextWaiter节点
// 即会满足这个条件
if (node.nextWaiter != null)
// 遍历等待队列,将状态不是CONDITION的节点从队列中删除
unlinkCancelledWaiters();
if (interruptMode != 0)
// 抛出异常 或 重置中断标识位
reportInterruptAfterWait(interruptMode);
// 遍历整个等待队列,舍弃状态不是Node.CONDITION的节点
private void unlinkCancelledWaiters() {
Node t = firstWaiter;
Node trail = null;
while (t != null) {
// 获取后续waiter
Node next = t.nextWaiter;
// 如果节点的waitStatus不是Node.CONDITION状态
if (t.waitStatus != Node.CONDITION) {
t.nextWaiter = null;
if (trail == null)
firstWaiter = next;
else
// 舍弃等待状态不为Node.CONDITION的节点
// 将trail的nextWaiter设置为当前节点的next节点
trail.nextWaiter = next;
if (next == null)
lastWaiter = trail;
}
else
trail = t;
// 依次遍历所有的节点
t = next;
}
}
上述源码注释部分,详细说明了这部分的逻辑,我们不得不感叹源码设计者思维的缜密和优秀。
awaitUntil()、awaitNanos()等方法的源码逻辑与await()的区别只是加了时间超时判断,其他逻辑完全一致。有兴趣的小伙伴建议结合源码和本文进行理解。
二、signal()、signalAll()方法
signal()方法用以唤醒等待队列中等待的第一个线程,并将该node节点添加到同步队列的尾部,进而能够进行锁获取的过程。
signalAll()可以唤醒所有等待队列中的线程。
// 从等待队列中找到第一个线程唤醒
public final void signal() {
// 判断是否独占,类似synchronized,
// 等待的线程只能被让之前将它wait的线程signal
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
// 唤醒第一个node节点线程
Node first = firstWaiter;
if (first != null)
// 执行唤醒操作
doSignal(first);
}
private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
// 从等待队列中移出
first.nextWaiter = null;
} while (!transferForSignal(first) &&
// 关注头节点
(first = firstWaiter) != null);
}
// 将node节点从等待队列到同步队列
final boolean transferForSignal(Node node) {
// 判断node节点是否已经CANNELED
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
// 未CANNELLED,则添加到同步队列尾部
Node p = enq(node);
int ws = p.waitStatus;
// 此时节点还是挂起的,获取锁需要将结点的状态改为SIGNAL
// 如果节点被CANNNELED了,或者CAS状态为SIGNAL失败了,那么就唤醒线程,
// 让其自己走去获取锁的步骤,虽然线程可能会被再次挂起,但这是无害的操作
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
// 唤醒线程
LockSupport.unpark(node.thread);
return true;
}
// ------------------------------------------------------
// signalAll()方法
public final void signalAll() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
// 唤醒所有的节点
doSignalAll(first);
}
private void doSignalAll(Node first) {
lastWaiter = firstWaiter = null;
do {
Node next = first.nextWaiter;
first.nextWaiter = null; // 移除节点
// 将所有未CANNELLED的节点添加到同步队列中
transferForSignal(first);
first = next;
} while (first != null); //循环所有的节点
}
上述源码的注释中,可以看出当调用signal()方法或者signalAll()的时候,都会讲等待队列中的node添加到同步队列中进行锁对象的获取。代码的注释表明了signal()方法和signalAll()方法的区别。
三、其他方法
// 判断是否其他线程的等待获取锁额时间超过当前线程
public final boolean hasQueuedPredecessors() {
Node h = head;
Node s;
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}
// 判断等待队列中是否有节点
public final boolean hasQueuedThreads() {
return head != tail;
}
// 获取独占等待获取锁的线程集合
public final Collection<Thread> getExclusiveQueuedThreads() {
ArrayList<Thread> list = new ArrayList<Thread>();
// 遍历同步队列
for (Node p = tail; p != null; p = p.prev) {
if (!p.isShared()) { // 判断是否是独占
Thread t = p.thread;
if (t != null)
list.add(t); // 添加线程
}
}
return list;
}
// 获取共享等待获取的线程集合
public final Collection<Thread> getSharedQueuedThreads() {
ArrayList<Thread> list = new ArrayList<Thread>();
for (Node p = tail; p != null; p = p.prev) {
if (p.isShared()) { // 共享
Thread t = p.thread;
if (t != null)
list.add(t);
}
}
return list;
}
// 获取等待队列中节点线程集合
public final Collection<Thread> getWaitingThreads(ConditionObject condition) {
if (!owns(condition))
throw new IllegalArgumentException("Not owner");
return condition.getWaitingThreads();
}
// 等待队列中是否有等待的node节点线程
public final boolean hasWaiters(ConditionObject condition) {
if (!owns(condition))
throw new IllegalArgumentException("Not owner");
return condition.hasWaiters();
}
// ..........
本文选择性的对开发中常用的其他方法做了说明。有兴趣的小伙伴,希望可以自己结合源码进行梳理阅读,学习贵在主动和坚持。
以上是关于原创JUC包源码分析06 | AbstractQueuedSynchronizer的主要内容,如果未能解决你的问题,请参考以下文章
JUC同步器框架AbstractQueuedSynchronizer源码图文分析