Java 1.6 AbstractQueuedSynchronizer源码解析
Posted Mr-yuenkin
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Java 1.6 AbstractQueuedSynchronizer源码解析相关的知识,希望对你有一定的参考价值。
由于本人水平与表达能力有限,有错误的地方欢迎交流与指正。
1 简介
AQS开放了几个方法交由子类实现(本类中抛出UnsupportedOperationException),分别是:
tryAcquire
tryRelease
tryAcquireShared
tryReleaseShared
isHeldExclusively
子类需要实现上面的5个方法,注意这个方法内部需要考虑线程安全问题。看看JDK1.7下AQS的子类:
AQS是Java并发框架的一个基础组件,JUC下很多类都是基于它实现的,如:ReentrantLock、ReentrantReadWriterLock、信号量、线程池等。AQS有两种工作模式:独占模式和共享模式。这两种模式典型的应用可以ReentrantReadWriterLock为例:如果一个线程(写线程)以独占模式(acquire)获取到这个锁,那么其他线程(读或写线程)不管以哪种模式尝试获取锁(共享模式是通过acquireShared获取的),都会失败;如果一个线程(读线程)以共享模式获取到这个锁,那么其他读线程可以获取到锁,而写线程会获取失败。
2 Node内部类
static final class Node
/** 表示取消状态*/
static final int CANCELLED = 1;
/** 表示后面有结点需要unPark*/
static final int SIGNAL = -1;
/** 表示当前结点在Condition的条件队列里*/
static final int CONDITION = -2;
/** Marker to indicate a node is waiting in shared mode */
static final Node SHARED = new Node();
/** Marker to indicate a node is waiting in exclusive mode */
static final Node EXCLUSIVE = null;
/**
* Status field, taking on only the values:
* SIGNAL: The successor of this node is (or will soon be)
* blocked (via park), so the current node must
* unpark its successor when it releases or
* cancels. To avoid races, acquire methods must
* first indicate they need a signal,
* then retry the atomic acquire, and then,
* on failure, block.
* CANCELLED: This node is cancelled due to timeout or interrupt.
* Nodes never leave this state. In particular,
* a thread with cancelled node never again blocks.
* CONDITION: This node is currently on a condition queue.
* It will not be used as a sync queue node until
* transferred. (Use of this value here
* has nothing to do with the other uses
* of the field, but simplifies mechanics.)
* 0: None of the above
*
* The values are arranged numerically to simplify use.
* Non-negative values mean that a node doesn't need to
* signal. So, most code doesn't need to check for particular
* values, just for sign.
*
* The field is initialized to 0 for normal sync nodes, and
* CONDITION for condition nodes. It is modified only using
* CAS.
*/
volatile int waitStatus;
/**
* 入队的时候初始化赋值,出队的时候置空。如果前驱结点状态是CANCELED,
* 那么需要通过prev域寻找一个非CANCELED的结点,将其next域直接指向
* 后面一个非CANCELED结点。总会有一个这样的结点的,至少头节点就是,
* 成为头节点只有一种可能:acquire成功了!如果结点被取消了,那它永远
* 不会acquire成功,而且一个节点只能取消自己对应的结点,不能取消其他的
*/
volatile Node prev;
/**
* 一旦当前结点调用release方法,就会unPark这个next结点。在入队列的时
* 候设置这个域,不用的时候设置成null。一旦取消当前结点,我们不能设置
* 这个域(但是我们发现cancelAcquire函数里是把next指向自己的),通过
* 把状态修改成CANCELED,这样就能绕过这个结点了。在enq函数中,仅当CAS
* 设置tail成功时才会设置next域。因此,如果某个结点的next域为null,并
* 不能说明这个结点就是tail。然而,如果我们发现某个结点的next域为null,
* 那需要做个double-check(从tail开始,利用prev向前进行遍历)。
*/
volatile Node next;
/**
* The thread that enqueued this node. Initialized on
* construction and nulled out after use.
*/
volatile Thread thread;
/**
* 这个域是Condition条件队列使用的,这个队列是个单向链表,指向下一个节点
* 。或者值是SHARED,表示共享模式。由于条件队列只能在独占模式下访问,所以
* 我们只需要一个单向链表就可以保存等待的结点了(ConditionObject类)。
* 这些节点会转移到AQS的同步等待队列里,然后重新acquire。
*/
Node nextWaiter;
3 独占模式
3.1 acquire
public final void acquire(int arg)
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
// 如果获取锁过程中(park时)被中断过,现在恢复中断
selfInterrupt();
独占模式下进行请求,忽略中断。方法实现中至少会调用一次tryAcquire方法, 请求成功后方法返回。否则当前线程会排队,可能会重复的阻塞和解除阻塞, 执行tryAcquire方法,直到成功。这个方法可以用来实现Lock的lock方法。
private Node addWaiter(Node mode)
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null)
// 先设置了node.prev,但是pred.next要在compareAndSetTail执行
// 成功后才设置的
node.prev = pred;
if (compareAndSetTail(pred, node))
pred.next = node;
return node;
enq(node);
return node;
private Node enq(final Node node)
for (;;)
Node t = tail;
if (t == null) // Must initialize
Node h = new Node(); // Dummy header
h.next = node;
node.prev = h;
if (compareAndSetHead(h))
tail = node;
return h;
else
node.prev = t;
if (compareAndSetTail(t, node))
t.next = node;
return t;
addWaiter就是创建一个准备入队的node对象并将新节点放入队列尾部。先尝试较短的路径(tail不为空的场景),其实这个代码和enq函数的else分支的代码是一样的;如果尝试成功就直接return,否则,进入enq方法了。从enq方法可以看出,开始时这个队列有个冗余的首结点,执行一段时间后这个首节点表示的是当前获取到锁的结点。
/**
* Acquires in exclusive uninterruptible mode for thread already in
* queue. Used by condition wait methods as well as acquire.
*
* @param node 当前线程自旋的结点
* @param arg the acquire argument
* @return @code true if interrupted while waiting
*/
final boolean acquireQueued(final Node node, int arg)
try
boolean interrupted = false;
for (;;)
final Node p = node.predecessor();
// 可以体现出是FIFO,只有第一个等待的Node才有可能获取锁
if (p == head && tryAcquire(arg))
// 如果锁定成功,则将当前结点设置为队列的头节点
// 且将node的thread、prev域置为null(头节点是dummy节
// 点,同时如下的help GC)。
setHead(node);
p.next = null; // help GC
return interrupted;
// 如果满足park条件就休眠了(如果锁可用的话,前面的Node会通
// 知),节省CPU;要是休眠过程中(LockSupport.park)被中断了,
// 那么会清除中断标识继续循环。
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
catch (RuntimeException ex)
cancelAcquire(node);
throw ex;
acquireQueued函数就是一个循环,不断尝试获取锁直至获取成功,中间可能会阻塞(park),但是不响应中断(中断后继续循环)。
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node)
int s = pred.waitStatus;
if (s < 0)
return true;
if (s > 0)
// 前面连续的CANCEL状态的Node需要删除掉
// 不会出现前面所有的结点都是CANCEL的,因为头节点已经获取到锁了,
// 是正在运行中的。因此,最后的pred不会为空。
do
node.prev = pred = pred.prev;
while (pred.waitStatus > 0);
pred.next = node;
else
// 本次循环不需要park,此时将pred节点状态设置成SIGNAL;
// 第二次循环再进来的时候就需要park了
compareAndSetWaitStatus(pred, 0, Node.SIGNAL);
return false;
shouldParkAfterFailedAcquire函数作用:当获取锁失败时判断当前线程是否需要挂起来。其实,除非获取锁成功,否则都会挂起来的(基本就是两次机会)。
private final boolean parkAndCheckInterrupt()
LockSupport.park(this);
// 清除中断标识,并返回清除前的标识
return Thread.interrupted();
acquireQueued异常时会调用cancelAcquire方法:
private void cancelAcquire(Node node)
// Ignore if node doesn't exist
if (node == null)
return;
node.thread = null;
// 跳过CANCEL状态的结点
Node pred = node.prev;
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
// Getting this before setting waitStatus ensures staleness
// pred不会为空的,因为头节点是在运行中的,肯定满足条件
Node predNext = pred.next;
// 这里没有使用CAS操作,直接赋值了确保是CANCELED
node.waitStatus = Node.CANCELLED;
// 如果是尾节点,尝试删除node;
// 删除失败了也没关系,因为状态已经设置成CANCEL了。
if (node == tail && compareAndSetTail(node, pred))
compareAndSetNext(pred, predNext, null);
else
// 如果pred不是头节点且尝试设置pred.waitStatus=SIGNAL成功了
// 则尝试把pred.next链接到node.next上
if (pred != head
&& (pred.waitStatus == Node.SIGNAL
|| compareAndSetWaitStatus(pred, 0, Node.SIGNAL))
&& pred.thread != null)
// If successor is active, set predecessor's next link
Node next = node.next;
if (next != null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next);
else
// (如果pred是头节点)或者(pred不是头节点但是设置
// pred.waitStatus=SIGNAL失败了),则需要给后面等待锁的线程发个
// unPark信号。pred不是头节点也给后面的Node发unPark信号也没有问
// 题,线程唤醒后还会继续检查状态的,如果还没轮到它还会继续park的。
unparkSuccessor(node);
node.next = node; // help GC
这个函数有些小复杂,有很多CAS操作,这些操作就执行一次,失败就失败了,要细细体会。
private void unparkSuccessor(Node node)
/*
* Try to clear status in anticipation of signalling. It is
* OK if this fails or if status is changed by waiting thread.
*/
compareAndSetWaitStatus(node, Node.SIGNAL, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
// 需要唤醒后面没有被取消的结点;从上面的enq函数我们知道如果结点的next
// 域为null并不能说明它就是tail结点,因此,我们要从tail结点反过来查找
// 距离node最近的不是CANCELED结点。最后,unPark这个结点。
Node s = node.next;
if (s == null || s.waitStatus > 0)
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
if (s != null)
LockSupport.unpark(s.thread);
3.2 release
独占模式下的释放方法,比较简单,如果tryRelease成功了且头结点不空且waitStatus!=0就唤醒后面排队的结点:
public final boolean release(int arg)
if (tryRelease(arg))
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
return false;
3.3 acquireInterruptibly
独占模式,acquire的响应中断的版本,他俩结构很像先tryAcquire,再循环。
// 函数会抛出InterruptedException异常
public final void acquireInterruptibly(int arg) throws InterruptedException
if (Thread.interrupted())
throw new InterruptedException();
if (!tryAcquire(arg))
// 和acquireQueued很像
doAcquireInterruptibly(arg);
private void doAcquireInterruptibly(int arg)
throws InterruptedException
final Node node = addWaiter(Node.EXCLUSIVE);
try
for (;;)
final Node p = node.predecessor();
if (p == head && tryAcquire(arg))
setHead(node);
p.next = null; // help GC
return;
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
break;
catch (RuntimeException ex)
cancelAcquire(node);
throw ex;
// Arrive here only if interrupted
// 这里和acquireQueued不一样,被中断后先cancelAcquire,再出抛异常
cancelAcquire(node);
throw new InterruptedException();
3.4 tryAcquireNanos
独占模式,响应中断且还支持超时,acquireInterruptibly的升级版,结构也很像。
public final boolean tryAcquireNanos(int arg, long nanosTimeout) throws InterruptedException
if (Thread.interrupted())
throw new InterruptedException();
return tryAcquire(arg) ||
doAcquireNanos(arg, nanosTimeout);
private boolean doAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException
long lastTime = System.nanoTime();
final Node node = addWaiter(Node.EXCLUSIVE);
try
for (;;)
final Node p = node.predecessor();
if (p == head && tryAcquire(arg))
setHead(node);
p.next = null; // help GC
return true;
// 不支持<=0的值,直接取消并返回false
if (nanosTimeout <= 0)
cancelAcquire(node);
return false;
// 如果时间<=1000纳秒,继续循环
if (nanosTimeout > spinForTimeoutThreshold &&
shouldParkAfterFailedAcquire(p, node))
LockSupport.parkNanos(this, nanosTimeout);
long now = System.nanoTime();
// 注意:如果此时修改系统时间,会引起误差。系统时间调大可能就直
// 接被cancel或者park的时间变小了;系统时间调小,park的时间
// 就变大了
nanosTimeout -= now - lastTime;
lastTime = now;
// 如果被中断了,就退出循环进入后面的取消流程
if (Thread.interrupted())
break;
catch (RuntimeException ex)
cancelAcquire(node);
throw ex;
// Arrive here only if interrupted
cancelAcquire(node);
throw new InterruptedException();
4 共享模式
4.1 acquireShared
共享模式,忽略中断,一样的思路,先try一下,如果失败了就入队列。有个小区别:tryAcquireShared返回值小于0表示失败,而不再是简单的boolean了。
public final void acquireShared(int arg)
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
private void doAcquireShared(int arg)
// 这里是SHARED,而acquire是EXCLUSIVE。
final Node node = addWaiter(Node.SHARED);
try
boolean interrupted = false;
for (;;)
final Node p = node.predecessor();
if (p == head)
int r = tryAcquireShared(arg);
if (r >= 0)
// 重置head并传播给后面的共享结点
setHeadAndPropagate(node, r);
p.next = null; // help GC
// 如果循环过程中被中断了,恢复中断标识
if (interrupted)
selfInterrupt();
return;
// 如果满足park条件就休眠了(如果锁可用的话,前面的Node会通
// 知),节省CPU;要是休眠过程中(LockSupport.park)被中断了,
// 那么会清除中断标识继续循环。
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
catch (RuntimeException ex)
cancelAcquire(node);
throw ex;
private void setHeadAndPropagate(Node node, int propagate)
// 重置head
setHead(node);
if (propagate > 0 && node.waitStatus != 0)
/*
* 如果s为null,这里就简单化处理了,也直接调用unparkSuccessor方
* 法,不要用复杂的算法计算出第一个shared后继结点了。
*/
Node s = node.next;
if (s == null || s.isShared())
unparkSuccessor(node);
4.2 releaseShared
和release结构很像,除了第一行有个try:
public final boolean releaseShared(int arg)
if (tryReleaseShared(arg))
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
return false;
4.3 acquireSharedInterruptibly
共享模式,响应中断,acquireShared的升级版,函数结构也很类似:
public final void acquireSharedInterruptibly(int arg) throws InterruptedException
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
该函数也会抛出InterruptedException异常。
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException
final Node node = addWaiter(Node.SHARED);
try
for (;;)
final Node p = node.predecessor();
if (p == head)
int r = tryAcquireShared(arg);
if (r >= 0)
setHeadAndPropagate(node, r);
p.next = null; // help GC
return;
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
// 不同点:这里会跳出循环,然后cancel并抛出异常
break;
catch (RuntimeException ex)
cancelAcquire(node);
throw ex;
// Arrive here only if interrupted
cancelAcquire(node);
throw new InterruptedException();
4.4 tryAcquireSharedNanos
共享模式,响应中断,支持超时时间设置,acquireSharedInterruptibly的升级版:
public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout) throws InterruptedException
if (Thread.interrupted())
throw new InterruptedException();
return tryAcquireShared(arg) >= 0 ||
doAcquireSharedNanos(arg, nanosTimeout);
可以看到,带超时参数的获取锁方法都带有boolean返回值的,表示获取锁成功还是失败。doAcquireSharedNanos思路跟上面的doAcquireNanos类似,这里就不重复介绍了,代码在下面:
private boolean doAcquireSharedNanos(int arg, long nanosTimeout)
throws InterruptedException
long lastTime = System.nanoTime();
// 这里是SHARED
final Node node = addWaiter(Node.SHARED);
try
for (;;)
final Node p = node.predecessor();
if (p == head)
int r = tryAcquireShared(arg);
if (r >= 0)
setHeadAndPropagate(node, r);
p.next = null; // help GC
return true;
// 不支持<=0的值,直接取消并返回false
if (nanosTimeout <= 0)
cancelAcquire(node);
return false;
// 如果时间<=1000纳秒,继续循环
if (nanosTimeout > spinForTimeoutThreshold &&
shouldParkAfterFailedAcquire(p, node))
LockSupport.parkNanos(this, nanosTimeout);
long now = System.nanoTime();
// 注意:如果此时修改系统时间,会引起误差。系统时间调大可能就直
// 接被cancel或者park的时间变小了;系统时间调小,park的时间
// 就变大了
nanosTimeout -= now - lastTime;
lastTime = now;
// 如果被中断了,就退出循环进入后面的取消流程
if (Thread.interrupted())
break;
catch (RuntimeException ex)
cancelAcquire(node);
throw ex;
// Arrive here only if interrupted
cancelAcquire(node);
throw new InterruptedException();
5 其他方法
5.1 hasQueuedThreads
查询队列里是否有线程在等待,由于中断和超时引起的取消操作随时都可能发生,这方法不一定实时准确。
public final boolean hasQueuedThreads()
return head != tail;
5.2 hasContended
查询队列是否被多个acquire请求竞争过(导致某个线程阻塞过)。为什么head 不为null就能证明?有竞争就会入队列此时head不为null,但是任务执行完了呢?通过上面的代码知道,head是由队列里刚获得到锁的线程设置的 (把自己设置成head),即使任务执行完也不会修改head,只能由下个入队的线程设置,这样head就永远不会为空了。
public final boolean hasContended()
return head != null;
5.3 getFirstQueuedThread
返回队列里第一个没有获取到锁的线程,如果head等于tail说明队列里没有线程在等待,直接返回null;否则,调用fullGetFirstQueuedThread。
public final Thread getFirstQueuedThread()
// handle only fast path, else relay
return (head == tail)? null : fullGetFirstQueuedThread();
private Thread fullGetFirstQueuedThread()
/*
* The first node is normally h.next. Try to get its
* thread field, ensuring consistent reads: If thread
* field is nulled out or s.prev is no longer head, then
* some other thread(s) concurrently performed setHead in
* between some of our reads. We try this twice before
* resorting to traversal.
*/
// 一般来说这个线程就是head->next,需要保证在并发情况下读一致性:
// 1. (h = head) != null
// 2. (s = h.next) != null
// 3. s.prev = head
// 4. (st = s.thread) != null
// 假设2、3中间被并发的插入了一个setHead方法,执行3时发现s.prev为空了
// 因此,这里需要再试一次(第二次其实也有可能会失败,不过概率已经很小
// 了,就像连续被雷劈两次一样;即使第二次也失败了,还有后面最后一道安
// 全措施,从tail开始向前遍历寻找)。
Node h, s;
Thread st;
if (((h = head) != null && (s = h.next) != null &&
s.prev == head && (st = s.thread) != null) ||
((h = head) != null && (s = h.next) != null &&
s.prev == head && (st = s.thread) != null))
return st;
/*
* Head's next field might not have been set yet, or may have
* been unset after setHead. So we must check to see if tail
* is actually first node. If not, we continue on, safely
* traversing from tail back to head to find first,
* guaranteeing termination.
*/
// head.next也有可能被并发修改,比如上面的1、2中间插入了一个acquireQueued
// 方法,执行完setHeader方法后将head.next置为null,这样条件2就不成立了,
// 因此,就进入下面的最后一道程序了。
Node t = tail;
Thread firstThread = null;
while (t != null && t != head)
Thread tt = t.thread;
if (tt != null)
firstThread = tt;
t = t.prev;
return firstThread;
5.4 isQueued
判断线程是否在队列里(包含头节点了),跟getFirstQueuedThread不一样,没有先从head开始找,直接从tail开始反向搜索,很直接。因为getFirstQueuedThread要找的是第一个,从head开始找效率比较高,从tail开始反向遍历是因为没有其他更好的方法了。isQueued不一样,它是找一个节点,反正都要遍历一遍,从head或tail都一样,时间复杂度都是O(n)。
public final boolean isQueued(Thread thread)
if (thread == null)
throw new NullPointerException();
for (Node p = tail; p != null; p = p.prev)
if (p.thread == thread)
return true;
return false;
5.5 apparentlyFirstQueuedIsExclusive
队列里第一个等待的结点是否是独占模式(不知道这里为啥没考虑并发情况,没有从tail开始遍历)。
final boolean apparentlyFirstQueuedIsExclusive()
Node h, s;
return ((h = head) != null && (s = h.next) != null &&
s.nextWaiter != Node.SHARED);
5.6 isFirst
当前线程是否是队列里的第一个元素。
final boolean isFirst(Thread current)
Node h, s;
return ((h = head) == null ||
((s = h.next) != null && s.thread == current) ||
fullIsFirst(current));
final boolean fullIsFirst(Thread current)
// same idea as fullGetFirstQueuedThread
Node h, s;
Thread firstThread = null;
if (((h = head) != null && (s = h.next) != null &&
s.prev == head && (firstThread = s.thread) != null))
return firstThread == current;
Node t = tail;
while (t != null && t != head)
Thread tt = t.thread;
if (tt != null)
firstThread = tt;
t = t.prev;
return firstThread == current || firstThread == null;
5.7 getQueueLength
队列长度,不包括已经取消的和头节点,因为它俩的thread域都为null。
public final int getQueueLength()
int n = 0;
for (Node p = tail; p != null; p = p.prev)
if (p.thread != null)
++n;
return n;
6 ConditionObject内部类
Condition的使用与原理可以参考其他资料:http://blog.csdn.net/ghsau/article/details/7481142
6.1 await
支持中断的等待方法:
public final void await() throws InterruptedException
if (Thread.interrupted())
throw new InterruptedException();
// 创建一个节点并将其加入condition的等待列表里,如果最后一个节点的waitStatus不
// 是CONDITION,需要将其移除。
Node node = addConditionWaiter();
// 模拟synchronized语义,在wait前要先释放对象锁
int savedState = fullyRelease(node);
int interruptMode = 0;
// 模拟obj.wait语义,阻塞自己
while (!isOnSyncQueue(node))
LockSupport.park(this);
// 检查下是否有中断,如果有,还要区分下是signal前还是signal后。signal前中断的
// 的话要抛出InterruptedException,否则,只需要设置中断标识。怎么判断是前还是
// 后?依据就是node.waitStatus域,如果还是CONDITION,说明还没被signal;如果
// 是0,说明已经被signal函数修改过了。
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
// 模拟synchronized语义,obj.wait被唤醒后要重新获取obj对象锁。
// 执行到这里说明node已经在AQS队列里,否则就走不出循环了^0^。具体保证的方法:
// 1、其他线程调用signal方法;2、如果被中断了,在上面中断检查时根据条件判断是否
// 需要调用enq方法塞进队列。如果acquireQueue被中断了不会抛出异常,只是设置中断
// 标识,和synchronized语义一样不会抛出中断异常。
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null)
// 扫描一下条件等待队列,清除被取消的结点
unlinkCancelledWaiters();
if (interruptMode != 0)
// 不等于0说明有异常,需要处理下
reportInterruptAfterWait(interruptMode);
private Node addConditionWaiter()
Node t = lastWaiter;
// If lastWaiter is cancelled, clean out.
if (t != null && t.waitStatus != Node.CONDITION)
unlinkCancelledWaiters();
t = lastWaiter;
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
addConditionWaiter首先检查当前条件队列里尾节点状态是否是CONDITION,如果是,则遍历整个条件队列,删除所有被取消了的结点(状态不是CONDITION的结点);之后,新建一个节点并插入链表最后一个。
private void unlinkCancelledWaiters()
Node t = firstWaiter;
Node trail = null;
while (t != null)
Node next = t.nextWaiter;
if (t.waitStatus != Node.CONDITION)
t.nextWaiter = null;
if (trail == null)
firstWaiter = next;
else
trail.nextWaiter = next;
if (next == null)
lastWaiter = trail;
else
trail = t;
t = next;
unlinkCancelledWaiters链表操作,删除所有被取消了的结点(状态不是CONDITION的结点)。这个函数在两个地方会被调用:1、在条件队列里等待时被取消了;2、在条件队列里插入一个新节点时发现尾节点被取消了。
final int fullyRelease(Node node)
try
int savedState = getState();
if (release(savedState))
return savedState;
catch (RuntimeException ex)
node.waitStatus = Node.CANCELLED;
throw ex;
// release失败就会走到这了
node.waitStatus = Node.CANCELLED;
throw new IllegalMonitorStateException();
fullyRelease先获取state的值,把它作为参数调用AQS的release方法,并返回state。如果release失败或抛异常,则取消node,抛出异常。
final boolean isOnSyncQueue(Node node)
// 如果waitStatus还是condition或者node.prev还是null,说明还在条件队列里呢。
// 调用signal函数从条件队列挪到AQS同步队列后,waitStatus会被设置成0的;
// 而且,刚入同步队列时prev肯定不是null(AQS中至少是有一个节点的或者有个dummy
// 头节点)。
if (node.waitStatus == Node.CONDITION || node.prev == null)
return false;
// 如果next域有值了,说明已经挪到AQS队列里了。因为条件队列Node的prev、next域
// 都是null的,它是通过nextWaiter域串接的,是个单向链表。
if (node.next != null)
return true;
/*
* node.prev can be non-null, but not yet on queue because
* the CAS to place it on queue can fail. So we have to
* traverse from tail to make sure it actually made it. It
* will always be near the tail in calls to this method, and
* unless the CAS failed (which is unlikely), it will be
* there, so we hardly ever traverse much.
*/
// 走到这里说明node.prev肯定不是null的了,但是还不能说明node就是AQS同步队列里
// 。这个移动操作是在signal里进行的(后面会详细分析),会调用enq函数把node插入// 队尾,enq函数会先设置node的prev域,等tail设置成功后才会设置tail的next域。
// 只有tail设置成功我们才认为node是在AQS同步队列里的。因此,考虑到这个并发场景,
// 我们从tail开始寻找node节点。
return findNodeFromTail(node);
isOnSyncQueue函数,如果node之前是在condition的条件队列里的,现在移到了AQS的同步等待队里了,就返回true。
private int checkInterruptWhileWaiting(Node node)
return (Thread.interrupted()) ?
((transferAfterCancelledWait(node))? THROW_IE : REINTERRUPT) :
0;
checkInterruptWhileWaiting函数返回中断状态:0:未被中断;THROW_IE:在入AQS同步队列前(signal之前)被中断了;REINTERRUPT:在入AQS同步队列后(signal之后)被中断了(这种概率比较小)
final boolean transferAfterCancelledWait(Node node)
// 如果还没被signal唤醒就被中断了,需要手动把node插队列里,保证后面的
// acquireQueued函数不会死循环。
if (compareAndSetWaitStatus(node, Node.CONDITION, 0))
enq(node);
return true;
/*
* If we lost out to a signal(), then we can't proceed
* until it finishes its enq(). Cancelling during an
* incomplete transfer is both rare and transient, so just
* spin.
*/
// 如果走到这里说明signal和interrupt并发了,signal只设置了node.waitStats,
// 还没来得及执行enq函数。因此,这里要等待signal执行完enq函数,保证后面的
// acquireQueued函数不会死循环。
while (!isOnSyncQueue(node))
Thread.yield();
return false;
transferAfterCancelledWait函数表明在signal之前被中断还是之后被中断,依据就是node.waitStatus域的值,如果此时该域的值还是CONDITION,说明还没有被唤醒;否则,肯定就被唤醒过了。
private void reportInterruptAfterWait(int interruptMode)
throws InterruptedException
if (interruptMode == THROW_IE)
throw new InterruptedException();
else if (interruptMode == REINTERRUPT)
selfInterrupt();
reportInterruptAfterWait函数比较简单,根据传递的参数值决定是抛出InterruptedException异常还是仅仅设置下中断标识。
6.2 signal
public final void signal()
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first);
signal函数用来唤醒调用condition.wait的线程。如果不是独占的,抛出异常;否则,唤醒条件队列里的第一个结点。
private void doSignal(Node first)
do
// 重置头节点,如果重置后的头结点为null,那么把尾节点也设置成null
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
// 置空旧的头节点的nextWaiter域,因为旧的头节点已经被删除了
first.nextWaiter = null;
// 如果node被取消了(如被中断了),则跳过该节点,循环继续
while (!transferForSignal(first) &&
(first = firstWaiter) != null);
doSignal函数主要就是通知下一个没被取消的结点,并且重置condition的头结点。
final boolean transferForSignal(Node node)
/*
* If cannot change waitStatus, the node has been cancelled.
*/
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
// 在AQS同步队列中插入node节点,并返回前一个节点p
Node p = enq(node);
int c = p.waitStatus;
// 如果节点p被取消了,或者设置p的waitStatus失败了,直接唤醒node节点了
if (c > 0 || !compareAndSetWaitStatus(p, c, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
transferForSignal函数功能是将一个节点从条件队列移动到AQS的同步等待队列,并将node.waitStatus置为0。
6.3 awaitUninterruptibly
public final void awaitUninterruptibly()
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
boolean interrupted = false;
while (!isOnSyncQueue(node))
LockSupport.park(this);
// 和await不同,这里保存中断状态并清除,不管signal前还是后,继续循环。
if (Thread.interrupted())
interrupted = true;
if (acquireQueued(node, savedState) || interrupted)
selfInterrupt();
和await结构很类似,主要有两个不同点:1、这函数不响应中断,不会抛出InterruptedException异常;2、函数最后没有清除取消了的结点。
6.4 awaitNanos
public final long awaitNanos(long nanosTimeout) throws InterruptedException
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
long lastTime = System.nanoTime();
int interruptMode = 0;
while (!isOnSyncQueue(node))
if (nanosTimeout <= 0L)
// 如果超时时间小于等于0则执行下面中断检查也执行的操作,然后退出循环。执行到这里
// 大多数情况下是还没来得及signal的。
transferAfterCancelledWait(node);
break;
LockSupport.parkNanos(this, nanosTimeout);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
long now = System.nanoTime();
nanosTimeout -= now - lastTime;
lastTime = now;
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null)
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
return nanosTimeout - (System.nanoTime() - lastTime);
awaitNanos函数支持超时,并响应中断,返回剩余的时间(入参的值减去消耗的时间)。
6.5 带参数的await
和awaitNanos很像,不过返回值是boolean。
public final boolean await(long time, TimeUnit unit) throws InterruptedException
if (unit == null)
throw new NullPointerException();
// 单位换算成纳秒
long nanosTimeout = unit.toNanos(time);
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
long lastTime = System.nanoTime();
boolean timedout = false;
int interruptMode = 0;
while (!isOnSyncQueue(node))
if (nanosTimeout <= 0L)
// timout默认是false,只有这里设置了值(nanosTimeout<=0时)
timedout = transferAfterCancelledWait(node);
break;
LockSupport.parkNanos(this, nanosTimeout);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
long now = System.nanoTime();
nanosTimeout -= now - lastTime;
lastTime = now;
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null)
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
return !timedout;
6.6 awaitUtil
返回值也是boolean,和带参数的await方法类似,不同点是通过LockSupport的parkUtil方法进行阻塞的。
6.7 signalAll
public final void signalAll()
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignalAll(first);
private void doSignalAll(Node first)
// 头结点、尾节点都置空,都删了
lastWaiter = firstWaiter = null;
do
Node next = first.nextWaiter;
first.nextWaiter = null;
// 转移到同步队列里了
transferForSignal(first);
first = next;
while (first != null);
doSignalAll函数会把condition的条件队列里所有的结点删除掉,并转移到AQS的等待队列里。
以上是关于Java 1.6 AbstractQueuedSynchronizer源码解析的主要内容,如果未能解决你的问题,请参考以下文章
如何在使用 1.6 运行基本代码时使用 Java 1.7 编写 Junit