jdk1.8 J.U.C并发源码阅读------AQS之conditionObject内部类分析
Posted Itzel_yuki
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了jdk1.8 J.U.C并发源码阅读------AQS之conditionObject内部类分析相关的知识,希望对你有一定的参考价值。
一、继承关系
public class ConditionObject implements Condition, java.io.Serializable
实现了Condition接口和Serializable接口,是AbstractQueuedSynchronizer类的一个内部类。
二、成员变量
/** First node of condition queue. */
private transient Node firstWaiter;//Condition队列的头指针
/** Last node of condition queue. */
private transient Node lastWaiter;//Condition队列的尾指针
Condition队列的节点类型是AQS中另一个内部类Node类型的。
Node源码如下:
static final class Node
//标志Node的状态:独占状态。
static final Node SHARED = new Node();
//共享状态
static final Node EXCLUSIVE = null;
//因为超时或者中断,node会被设置成取消状态,被取消的节点时不会参与到竞争中的,会一直保持取消状态不会转变为其他状态;
//CLH队列中使用
static final int CANCELLED = 1;
//该节点的后继节点被阻塞,当前节点释放锁或者取消的时候(cancelAcquire)需要唤醒后继者。
//CLH队列中使用
static final int SIGNAL = -1;
//CONDITION队列中的状态,CLH队列中节点没有该状态,当将一个node从CONDITION队列中transfer到CLH队列中时,状态由CONDITION转换成0
//CLH队列不使用,CONDITION队列中使用
static final int CONDITION = -2;
//该状态表示下一次节点如果是Shared的,则无条件获取锁。
//CLH队列中使用
static final int PROPAGATE = -3;
//当一个新的node在CLH队列中被创建时初始化为0,在CONDITION队列中创建时被初始化为CONDITION状态
volatile int waitStatus;
//队列中的前驱节点
//CONDITION队列中使用
volatile Node prev;
//CLH队列中使用,指向下一个节点的引用
volatile Node next;
//当前线程
volatile Thread thread;
//CONDITION队列中指向下一个node的指针,CLH队列中不使用
Node nextWaiter;
final boolean isShared()
return nextWaiter == SHARED;
//返回前驱节点
final Node predecessor() throws NullPointerException
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
Node() // Used to establish initial head or SHARED marker
Node(Thread thread, Node mode) // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
Node(Thread thread, int waitStatus) // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread;
/** Mode meaning to reinterrupt on exit from wait */
private static final int REINTERRUPT = 1;
/** Mode meaning to throw InterruptedException on exit from wait */
private static final int THROW_IE = -1;
三、方法分析
(1)await():实现考虑中断的等待,若thread中断,则抛出异常。
总结:
await只能在当前线程获取了锁之后调用。因此CLH队列和CONDITION队列的情况为:当前处于CLH队列队首的节点调用await方法,新new一个node,添加到CONDITION队列队尾,然后在CLH队列队首释放当前线程占有的锁,唤醒后继节点。当前线程以新node的形式在CONDITION队列中park,等待被唤醒。
具体过程:
step1:将该线程封装成node,新节点的状态为CONDITION,添加到队列尾部
step2:尝试释放当前线程占有的锁,释放成功,则调用unparkSuccessor方法唤醒该节点在CLH队列中的后继节点。
step3:在while循环中调用isOnSyncQueue方法检测node是否再次transfer到CLH队列中(其他线程调用signal或signalAll时,该线程可能从CONDITION队列中transfer到CLH队列中),如果没有,则park当前线程,等待唤醒,同时调用checkInterruptWhileWaiting检测当前线程在等待过程中是否发生中断,设置interruptMode的值来标志中断状态。如果检测到当前线程已经处于CLH队列中了,则跳出while循环。
step4:调用acquireQueued阻塞方法来在CLH队列中获取锁。
step5:检查interruptMode的状态,在最后调用reportInterruptAfterWait统一抛出异常或发生中断。
基本流程:首先将node加入condition队列,然后释放锁,挂起当前线程等待唤醒,唤醒后重新在CLH队列中调用acquireQueued获取锁。(实现Object.wait方法的功能)
注意:
step1和step2的顺序不能颠倒,否则CONDITION队列尾部添加新节点可能需要考虑并发的情况。首先将该节点添加到队列尾部,然后在CLH队列首节点(当前线程拥有锁,一定在CLH队列的队首)释放锁并唤醒后继节点。
CONDITION队列节点状态分析:CONDITION状态和CANCELLED状态,新new的一个节点状态为CONDITION,当该节点在CLH队列队首执行fullyRelease时,释放锁失败,则该node在CONDITION队列中的状态会变成CANCELLED。(还可能为0,当超时将node从CONDITION队列中加到CLH队列中,该节点在CONDITION队列中的nextWaiter连接没有取消,此时该节点状态为0,不过后面调用unlinkCancelledWaiters将清除CONDITION队列中所有非CONDITION状态的节点)
await系列方法和signal系列都是在当前线程占有锁时才能正确执行,否则会抛出异常。
源码分析
public final void await() throws InterruptedException
if (Thread.interrupted())
throw new InterruptedException();
//将该线程添加到CONDITION队列中
Node node = addConditionWaiter();
//该节点加入condition队列中等待,await则需要释放掉当前线程占有的锁
int savedState = fullyRelease(node);
int interruptMode = 0;
//判断该节点是否在CLH队列中
while (!isOnSyncQueue(node))
//不在,则阻塞该节点
LockSupport.park(this);
//在阻塞的过程中发生中断
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
//出了while循环,代表线程被唤醒,并且已经将该node从CONDITION队列transfer到了CLH队列中
//acquireQueued在队列中获取锁,会阻塞当前线程,并且在上面while循环等待的过程中没有发生异常,则修改interruptMode状态为REINTERRUPT
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
//该节点调用transferAfterCancelledWait添加到CLH队列中的,此时该节点的nextWaiter不为null,需要调用unlinkCancelledWaiters将该节点从CONDITION队列中删除,该节点的状态为0
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
//如果interruptMode不为0,则代表该线程在上面过程中发生了中断或者抛出了异常,则调用reportInterruptAfterWait方法在此处抛出异常
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
//首先检查尾节点是否为cancelled状态的节点,如果是则调用unlinkCancelledWaiters删除CONDITION队列中所有cancelled状态的节点,不是,则直接将该新创建的节点添加到CONDITION队列的末尾。
private Node addConditionWaiter()
//尾指针
Node t = lastWaiter;
//如果尾节点状态是cancelled,则调用unlinkCancelledWaiters方法删除CONDITION链表中所有cancelled状态的节点
if (t != null && t.waitStatus != Node.CONDITION)
unlinkCancelledWaiters();
//t为新的尾节点
t = lastWaiter;
//创建一个node节点,状态为CONDITION
Node node = new Node(Thread.currentThread(), Node.CONDITION);
//添加到队尾
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
//遍历一次CONDITION链表,删除状态为CANCELLED的节点。
private void unlinkCancelledWaiters()
//首节点
Node t = firstWaiter;
Node trail = null;
while (t != null)
//下一个节点
Node next = t.nextWaiter;
//如果t的状态是cancelled的,则需要删除t
if (t.waitStatus != Node.CONDITION)
//清除t的nextWaiter连接
t.nextWaiter = null;
//删除的是首节点
if (trail == null)
firstWaiter = next;
else
//直接将前一个节点的连接指向该节点的下一个节点
trail.nextWaiter = next;
//设置新的尾节点
if (next == null)
lastWaiter = trail;
//状态为CONDITION的节点不需要清除
else
trail = t;
t = next;
//完全释放锁,释放成功则返回,失败则将当前节点的状态设置成cancelled表示当前节点失效
final int fullyRelease(Node node)
boolean failed = true;
try
//获取当前锁重入的次数
int savedState = getState();
//释放锁
if (release(savedState))
//释放成功
failed = false;
return savedState;
else
throw new IllegalMonitorStateException();
finally
//释放锁失败,则当前节点的状态变为cancelled(此时该节点在CONDITION队列中)
if (failed)
node.waitStatus = Node.CANCELLED;
//尝试释放锁,释放成功则调用unparkSuccessor唤醒后继节点
public final boolean release(int arg)
//调用tryRelease释放锁。
if (tryRelease(arg))
//释放成功,则查看head节点状态,如果不为null且状态不为0(为0表示没有后继或者当前节点已经unparkSuccessor过),则调用unparkSuccessor唤醒后继节点
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
return false;
//判断该节点是否在CLH队列中
final boolean isOnSyncQueue(Node node)
//如果该节点的状态为CONDITION(该状态只能在CONDITION队列中出现,CLH队列中不会出现CONDITION状态),或者该节点的prev指针为null,则该节点一定不在CLH队列中
if (node.waitStatus == Node.CONDITION || node.prev == null)
return false;
//如果该节点的next(不是nextWaiter,next指针在CLH队列中指向下一个节点)状态不为null,则该节点一定在CLH队列中
if (node.next != null) // If has successor, it must be on queue
return true;
//否则只能遍历CLH队列(从尾节点开始遍历)查找该节点
return findNodeFromTail(node);
//从尾节点开始,使用prev指针,遍历整个CLH队列
private boolean findNodeFromTail(Node node)
Node t = tail;
//从尾节点开始,使用prev指针,开始遍历整个CLH队列
for (;;)
//找到该节点
if (t == node)
return true;
//遍历完成,没有找到该节点
if (t == null)
return false;
t = t.prev;
//在等待后发生中断,在此处根据interruptMode统一处理
private void reportInterruptAfterWait(int interruptMode)
throws InterruptedException
if (interruptMode == THROW_IE)
throw new InterruptedException();
else if (interruptMode == REINTERRUPT)
selfInterrupt();
(2)awaitNanos:如果当前线程发生中断,则抛出异常;超时则强制transfer到CLH队列中(但是在CONDITION队列中的nextWaiter连接并没有取消)
与await的区别:
(1)超时则强制将该节点从CONDITION队列transfer到CLH队列中。
(2)阻塞调用的是LockSupport.parkNanos(this, nanosTimeout),带有时间
(3)每次循环都要更新nanosTimeout,如果超时则发生(1)
public final long awaitNanos(long nanosTimeout)
throws InterruptedException
if (Thread.interrupted())
throw new InterruptedException();
//新创建一个CONDITION状态的节点,并加入队列尾部
Node node = addConditionWaiter();
//node(此时处于CLH队列队首)释放占有的锁(在CLH队列中出队了),并且唤醒后继节点
int savedState = fullyRelease(node);
//根据nanosTimeout,计算deadline
final long deadline = System.nanoTime() + nanosTimeout;
int interruptMode = 0;
//检测当前节点是否处于CLH队列中,没有则park当前线程,等待signal唤醒(从而将node节点从CONDITION队列中transfer到CLH队列中)
while (!isOnSyncQueue(node))
//如果超时,则调用transferAfterCancelledWait将当前Node强制transfer到CLH队列中
if (nanosTimeout <= 0L)
transferAfterCancelledWait(node);
break;
//nanosTimeout大于spinForTimeoutThreshold,则调用parkNanos等待nanosTimeout时间
if (nanosTimeout >= spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
//park的过程中发生中断,则跳出循环
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
//更新nanosTimeout
nanosTimeout = deadline - System.nanoTime();
//出了while循环,代表线程被唤醒,并且已经将该node从CONDITION队列transfer到了CLH队列中,或者发生中断
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
//该节点调用transferAfterCancelledWait添加到CLH队列中的,此时该节点的nextWaiter不为null,需要调用unlinkCancelledWaiters将该节点从CONDITION队列中删除
if (node.nextWaiter != null)
unlinkCancelledWaiters();
//统一处理上面发生的中断或者异常情况。
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
//返回距离超时还剩多长时间
return deadline - System.nanoTime();
final boolean transferAfterCancelledWait(Node node)
//将该节点状态由CONDITION变成0,调用enq将该节点从CONDITION队列添加到CLH队列中(但是在CONDITION队列中的nextWaiter连接并没有取消)
if (compareAndSetWaitStatus(node, Node.CONDITION, 0))
enq(node);
return true;
//循环检测该node是否已经成功添加到CLH队列中
while (!isOnSyncQueue(node))
Thread.yield();
return false;
(3)awaitUntil:在deadline时间之前没有被唤醒,则强制transfer到CLH队列中(但是在CONDITION队列中的nextWaiter连接并没有取消),发生中断则抛出异常
public final boolean awaitUntil(Date deadline)
throws InterruptedException
//获取绝对时间
long abstime = deadline.getTime();
if (Thread.interrupted())
throw new InterruptedException();
//新new一个node,添加到CLH队列的尾部
Node node = addConditionWaiter();
//释放锁
int savedState = fullyRelease(node);
boolean timedout = false;
int interruptMode = 0;
//循环检测该node是否已经成功添加到CLH队列中
while (!isOnSyncQueue(node))
//超时,强制transfer到CLH队列中(但是在CONDITION队列中的nextWaiter连接并没有取消)
if (System.currentTimeMillis() > abstime)
timedout = transferAfterCancelledWait(node);
break;
//park到abstime时间
LockSupport.parkUntil(this, abstime);
//发生异常,跳出循环
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
//已经在CLH队列中了,或者抛出了异常
//调用acquireQueued(同步阻塞方法)在CLH队列中获取锁
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
//该节点调用transferAfterCancelledWait添加到CLH队列中的,此时该节点的nextWaiter不为null,需要调用unlinkCancelledWaiters将该节点从CONDITION队列中删除
if (node.nextWaiter != null)
unlinkCancelledWaiters();
//统一处理上面发生的中断或者异常情况。
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
//返回是否超时
return !timedout;
(4)await(long time, TimeUnit unit):中断则抛出异常;超时强制转换到CLH队列中(在CONDITION队列中的nextWaiter连接并没有取消,此时同时处于CLH队列和CONDITION队列中)
public final boolean await(long time, TimeUnit unit)
throws InterruptedException
long nanosTimeout = unit.toNanos(time);
if (Thread.interrupted())
throw new InterruptedException();
//新new一个node,添加到CONDITION队列末尾
Node node = addConditionWaiter();
//释放锁(此节点在CLH队列中拥有锁,此时是CLH队列头结点)并唤醒后继节点。
int savedState = fullyRelease(node);
//计算deadline
final long deadline = System.nanoTime() + nanosTimeout;
boolean timedout = false;
int interruptMode = 0;
//检测当前节点是否处于CLH队列中,没有则park当前线程,等待signal唤醒(从而将node节点从CONDITION队列中transfer到CLH队列中)
while (!isOnSyncQueue(node))
if (nanosTimeout <= 0L)
timedout = transferAfterCancelledWait(node);
break;
//park
if (nanosTimeout >= spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
//更新nanosTimeout
nanosTimeout = deadline - System.nanoTime();
//在CLH队列中获取锁,或者发生中断
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null)
unlinkCancelledWaiters();
//统一处理上面发生的中断或者异常情况。
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
return !timedout;
(5)awaitUninterruptibly:忽略中断的等待
public final void awaitUninterruptibly()
//将该线程封装成node,新节点的状态为CONDITION,添加到队列尾部
Node node = addConditionWaiter();
//在CLH队列首部释放占有的锁
int savedState = fullyRelease(node);
boolean interrupted = false;
//循环检测该node是否已经成功添加到CLH队列中
while (!isOnSyncQueue(node))
LockSupport.park(this);
//用interrupted保存中断标志,不抛出异常
if (Thread.interrupted())
interrupted = true;
//在CLH队列中获取锁 或者 interrupted发生中断了,则调用selfInterrupt发生中断
//acquireQueued是一个阻塞方法
if (acquireQueued(node, savedState) || interrupted)
selfInterrupt();
(6)signal:对CONDITION队列中第一个CONDITION状态的节点(将该节点以及前面的CANCELLED状态的节点从CONDITION队列中出队),将该节点从CONDITION队列中添加到CLH队列末尾,同时需要设置该节点在CLH队列中前驱节点的状态(若前驱节点为cancelled状态或者给前驱节点执行CAS操作失败,则需要调用park操作在此处唤醒该线程,否则就是在CLH队列中设置前驱节点的signal状态成功,则不用在此处唤醒该线程,唤醒工作交给前驱节点,可以少进行一次park和unpark操作)
//唤醒CONDITION队列中首部的第一个CONDITION状态的节点
public final void signal()
//判断锁是否被当前线程独占,如果不是,则当前线程不能signal其他线程
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
//CONDITION队列不为null,则doSignal方法将唤醒CONDITION队列中所有的节点线程
if (first != null)
doSignal(first);
//对CONDITION队列中从首部开始的第一个CONDITION状态的节点,执行transferForSignal操作,将node从CONDITION队列中转换到CLH队列中,同时修改CLH队列中原先尾节点的状态
private void doSignal(Node first)
do
//当前循环将first节点从CONDITION队列transfer到CLH队列
//从CONDITION队列中删除first节点,调用transferForSignal将该节点添加到CLH队列中,成功则跳出循环
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
while (!transferForSignal(first) &&
(first = firstWaiter) != null);
//两步操作,首先enq将该node添加到CLH队列中,其次若CLH队列原先尾节点为CANCELLED或者对原先尾节点CAS设置成SIGNAL失败,则唤醒node节点;否则该节点在CLH队列总前驱节点已经是signal状态了,唤醒工作交给前驱节点(节省了一次park和unpark操作)
final boolean transferForSignal(Node node)
//如果CAS失败,则当前节点的状态为CANCELLED
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
//enq将node添加到CLH队列队尾,返回node的prev节点p
Node p = enq(node);
int ws = p.waitStatus;
//如果p是一个取消了的节点,或者对p进行CAS设置失败,则唤醒node节点,让node所在线程进入到acquireQueue方法中,重新进行相关操作
//否则,由于该节点的前驱节点已经是signal状态了,不用在此处唤醒await中的线程,唤醒工作留给CLH队列中前驱节点
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
(7)signalAll:将CONDITION队列中所有node出队,逐个添加到CLH队列末尾,同时修改它们在CLH队列中前驱节点的状态,修改为signal成功,则不用在此处唤醒该节点的线程,唤醒工作交给CLH队列中的前驱节点,否则需要在此处park当前线程。
public final void signalAll()
//查看当前线程是否独占锁,若不是,则当前线程没有权限执行signalAll操作,抛出异常
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
//唤醒CONDITION队列中所有节点,同时transfer到CLH队列中
if (first != null)
doSignalAll(first);
private void doSignalAll(Node first)
lastWaiter = firstWaiter = null;
do
//将first节点从CONDITION队列中出队
Node next = first.nextWaiter;
first.nextWaiter = null;
//将first节点在CLH队列中入队,同时可能需要执行unpark操作
transferForSignal(first);
//更新first的指向
first = next;
while (first != null);
以上是关于jdk1.8 J.U.C并发源码阅读------AQS之conditionObject内部类分析的主要内容,如果未能解决你的问题,请参考以下文章
jdk1.8 J.U.C并发源码阅读------ReentrantLock源码解析
jdk1.8 J.U.C并发源码阅读------ReentrantLock源码解析
jdk1.8 J.U.C并发源码阅读------CountDownLatch源码解析
jdk1.8 J.U.C并发源码阅读------CountDownLatch源码解析