Java 1.6 AbstractQueuedSynchronizer源码解析

Posted Mr-yuenkin

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Java 1.6 AbstractQueuedSynchronizer源码解析相关的知识,希望对你有一定的参考价值。

由于本人水平与表达能力有限,有错误的地方欢迎交流与指正。

1 简介

AQS开放了几个方法交由子类实现(本类中抛出UnsupportedOperationException),分别是:

tryAcquire

tryRelease

tryAcquireShared

tryReleaseShared

isHeldExclusively

子类需要实现上面的5个方法,注意这个方法内部需要考虑线程安全问题。看看JDK1.7下AQS的子类:


   AQS是Java并发框架的一个基础组件,JUC下很多类都是基于它实现的,如:ReentrantLock、ReentrantReadWriterLock、信号量、线程池等。AQS有两种工作模式:独占模式和共享模式。这两种模式典型的应用可以ReentrantReadWriterLock为例:如果一个线程(写线程)以独占模式(acquire)获取到这个锁,那么其他线程(读或写线程)不管以哪种模式尝试获取锁(共享模式是通过acquireShared获取的),都会失败;如果一个线程(读线程)以共享模式获取到这个锁,那么其他读线程可以获取到锁,而写线程会获取失败。


2 Node内部类

static final class Node 
        /** 表示取消状态*/
        static final int CANCELLED =  1;
        /** 表示后面有结点需要unPark*/
        static final int SIGNAL    = -1;
        /** 表示当前结点在Condition的条件队列里*/
        static final int CONDITION = -2;
        /** Marker to indicate a node is waiting in shared mode */
        static final Node SHARED = new Node();
        /** Marker to indicate a node is waiting in exclusive mode */
        static final Node EXCLUSIVE = null;

        /**
         * Status field, taking on only the values:
         *   SIGNAL:     The successor of this node is (or will soon be)
         *               blocked (via park), so the current node must
         *               unpark its successor when it releases or
         *               cancels. To avoid races, acquire methods must
         *               first indicate they need a signal,
         *               then retry the atomic acquire, and then,
         *               on failure, block.
         *   CANCELLED:  This node is cancelled due to timeout or interrupt.
         *               Nodes never leave this state. In particular,
         *               a thread with cancelled node never again blocks.
         *   CONDITION:  This node is currently on a condition queue.
         *               It will not be used as a sync queue node until
         *               transferred. (Use of this value here
         *               has nothing to do with the other uses
         *               of the field, but simplifies mechanics.)
         *   0:          None of the above
         *
         * The values are arranged numerically to simplify use.
         * Non-negative values mean that a node doesn't need to
         * signal. So, most code doesn't need to check for particular
         * values, just for sign.
         *
         * The field is initialized to 0 for normal sync nodes, and
         * CONDITION for condition nodes.  It is modified only using
         * CAS.
         */
        volatile int waitStatus;

  
/**
         * 入队的时候初始化赋值,出队的时候置空。如果前驱结点状态是CANCELED,
         * 那么需要通过prev域寻找一个非CANCELED的结点,将其next域直接指向
         * 后面一个非CANCELED结点。总会有一个这样的结点的,至少头节点就是,
         * 成为头节点只有一种可能:acquire成功了!如果结点被取消了,那它永远
         * 不会acquire成功,而且一个节点只能取消自己对应的结点,不能取消其他的
         */
        volatile Node prev;
        
/**
         * 一旦当前结点调用release方法,就会unPark这个next结点。在入队列的时
         * 候设置这个域,不用的时候设置成null。一旦取消当前结点,我们不能设置
         * 这个域(但是我们发现cancelAcquire函数里是把next指向自己的),通过
* 把状态修改成CANCELED,这样就能绕过这个结点了。在enq函数中,仅当CAS
         * 设置tail成功时才会设置next域。因此,如果某个结点的next域为null,并
         * 不能说明这个结点就是tail。然而,如果我们发现某个结点的next域为null,
         * 那需要做个double-check(从tail开始,利用prev向前进行遍历)。
*/
        volatile Node next;

        /**
         * The thread that enqueued this node.  Initialized on
         * construction and nulled out after use.
         */
        volatile Thread thread;

/**
         * 这个域是Condition条件队列使用的,这个队列是个单向链表,指向下一个节点
         * 。或者值是SHARED,表示共享模式。由于条件队列只能在独占模式下访问,所以
         * 我们只需要一个单向链表就可以保存等待的结点了(ConditionObject类)。
         * 这些节点会转移到AQS的同步等待队列里,然后重新acquire。
         */

        Node nextWaiter;

3 独占模式

3.1 acquire

public final void acquire(int arg) 
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            // 如果获取锁过程中(park时)被中断过,现在恢复中断
            selfInterrupt();
    

独占模式下进行请求,忽略中断。方法实现中至少会调用一次tryAcquire方法, 请求成功后方法返回。否则当前线程会排队,可能会重复的阻塞和解除阻塞, 执行tryAcquire方法,直到成功。这个方法可以用来实现Lock的lock方法。

private Node addWaiter(Node mode) 
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) 
            // 先设置了node.prev,但是pred.next要在compareAndSetTail执行
// 成功后才设置的
            node.prev = pred;
            if (compareAndSetTail(pred, node)) 
                pred.next = node;
                return node;
            
        
        enq(node);
        return node;
    
private Node enq(final Node node) 
        for (;;) 
            Node t = tail;
            if (t == null)  // Must initialize
                Node h = new Node(); // Dummy header
                h.next = node;
                node.prev = h;
                if (compareAndSetHead(h)) 
                    tail = node;
                    return h;
                
            
            else 
                node.prev = t;
                if (compareAndSetTail(t, node)) 
                    t.next = node;
                    return t;
                
            
        
    

addWaiter就是创建一个准备入队的node对象并将新节点放入队列尾部。先尝试较短的路径(tail不为空的场景),其实这个代码和enq函数的else分支的代码是一样的;如果尝试成功就直接return,否则,进入enq方法了。从enq方法可以看出,开始时这个队列有个冗余的首结点,执行一段时间后这个首节点表示的是当前获取到锁的结点。

/**
     * Acquires in exclusive uninterruptible mode for thread already in
     * queue. Used by condition wait methods as well as acquire.
     *
     * @param node 当前线程自旋的结点
     * @param arg the acquire argument
     * @return @code true if interrupted while waiting
     */
final boolean acquireQueued(final Node node, int arg) 
        try 
            boolean interrupted = false;
            for (;;) 
                final Node p = node.predecessor();
                // 可以体现出是FIFO,只有第一个等待的Node才有可能获取锁
                if (p == head && tryAcquire(arg)) 
                    // 如果锁定成功,则将当前结点设置为队列的头节点
// 且将node的thread、prev域置为null(头节点是dummy节
// 点,同时如下的help GC)。
                    setHead(node);
                    p.next = null; // help GC
                    return interrupted;
                
                // 如果满足park条件就休眠了(如果锁可用的话,前面的Node会通
// 知),节省CPU;要是休眠过程中(LockSupport.park)被中断了,
// 那么会清除中断标识继续循环。
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            
         catch (RuntimeException ex) 
            cancelAcquire(node);
            throw ex;
        
    

acquireQueued函数就是一个循环,不断尝试获取锁直至获取成功,中间可能会阻塞(park),但是不响应中断(中断后继续循环)。

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) 
        int s = pred.waitStatus;
        if (s < 0)
            return true;
        if (s > 0) 
            // 前面连续的CANCEL状态的Node需要删除掉
            // 不会出现前面所有的结点都是CANCEL的,因为头节点已经获取到锁了,
// 是正在运行中的。因此,最后的pred不会为空。
	    do 
		node.prev = pred = pred.prev;
	     while (pred.waitStatus > 0);
	    pred.next = node;
	
        else
            // 本次循环不需要park,此时将pred节点状态设置成SIGNAL;
            // 第二次循环再进来的时候就需要park了
            compareAndSetWaitStatus(pred, 0, Node.SIGNAL);
        return false;
    

shouldParkAfterFailedAcquire函数作用:当获取锁失败时判断当前线程是否需要挂起来。其实,除非获取锁成功,否则都会挂起来的(基本就是两次机会)。

  private final boolean parkAndCheckInterrupt() 
        LockSupport.park(this);
        // 清除中断标识,并返回清除前的标识
        return Thread.interrupted();
    

acquireQueued异常时会调用cancelAcquire方法:

private void cancelAcquire(Node node) 
	// Ignore if node doesn't exist
        if (node == null)
	    return;

	node.thread = null;

	// 跳过CANCEL状态的结点
	Node pred = node.prev;
	while (pred.waitStatus > 0)
	    node.prev = pred = pred.prev;

	// Getting this before setting waitStatus ensures staleness
      // pred不会为空的,因为头节点是在运行中的,肯定满足条件
	Node predNext = pred.next;

	// 这里没有使用CAS操作,直接赋值了确保是CANCELED
	node.waitStatus = Node.CANCELLED;

	// 如果是尾节点,尝试删除node;
// 删除失败了也没关系,因为状态已经设置成CANCEL了。
	if (node == tail && compareAndSetTail(node, pred)) 
	    compareAndSetNext(pred, predNext, null);
	 else 
          // 如果pred不是头节点且尝试设置pred.waitStatus=SIGNAL成功了
          // 则尝试把pred.next链接到node.next上
	    if (pred != head
		&& (pred.waitStatus == Node.SIGNAL
		    || compareAndSetWaitStatus(pred, 0, Node.SIGNAL))
		&& pred.thread != null) 

		// If successor is active, set predecessor's next link
		Node next = node.next;
		if (next != null && next.waitStatus <= 0)
		    compareAndSetNext(pred, predNext, next);
	     else 
            // (如果pred是头节点)或者(pred不是头节点但是设置
// pred.waitStatus=SIGNAL失败了),则需要给后面等待锁的线程发个
// unPark信号。pred不是头节点也给后面的Node发unPark信号也没有问
// 题,线程唤醒后还会继续检查状态的,如果还没轮到它还会继续park的。
		unparkSuccessor(node);
	    

	    node.next = node; // help GC
	
    

这个函数有些小复杂,有很多CAS操作,这些操作就执行一次,失败就失败了,要细细体会。

private void unparkSuccessor(Node node) 
        /*
         * Try to clear status in anticipation of signalling.  It is
         * OK if this fails or if status is changed by waiting thread.
         */
        compareAndSetWaitStatus(node, Node.SIGNAL, 0);

        /*
         * Thread to unpark is held in successor, which is normally
         * just the next node.  But if cancelled or apparently null,
         * traverse backwards from tail to find the actual
         * non-cancelled successor.
         */
        // 需要唤醒后面没有被取消的结点;从上面的enq函数我们知道如果结点的next
// 域为null并不能说明它就是tail结点,因此,我们要从tail结点反过来查找
// 距离node最近的不是CANCELED结点。最后,unPark这个结点。
        Node s = node.next;
        if (s == null || s.waitStatus > 0) 
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        
        if (s != null)
            LockSupport.unpark(s.thread);
    

3.2 release

独占模式下的释放方法,比较简单,如果tryRelease成功了且头结点不空且waitStatus!=0就唤醒后面排队的结点:

public final boolean release(int arg) 
        if (tryRelease(arg)) 
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        
        return false;
    

3.3 acquireInterruptibly

独占模式,acquire的响应中断的版本,他俩结构很像先tryAcquire,再循环。

// 函数会抛出InterruptedException异常
public final void acquireInterruptibly(int arg) throws InterruptedException 
        if (Thread.interrupted())
            throw new InterruptedException();
        if (!tryAcquire(arg))
            // 和acquireQueued很像
            doAcquireInterruptibly(arg);
    
private void doAcquireInterruptibly(int arg)
        throws InterruptedException 
        final Node node = addWaiter(Node.EXCLUSIVE);
        try 
            for (;;) 
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) 
                    setHead(node);
                    p.next = null; // help GC
                    return;
                
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    break;
            
         catch (RuntimeException ex) 
            cancelAcquire(node);
            throw ex;
        
        // Arrive here only if interrupted
        // 这里和acquireQueued不一样,被中断后先cancelAcquire,再出抛异常
        cancelAcquire(node);
        throw new InterruptedException();
    

3.4 tryAcquireNanos

    独占模式,响应中断且还支持超时,acquireInterruptibly的升级版,结构也很像。

public final boolean tryAcquireNanos(int arg, long nanosTimeout) throws InterruptedException 
	if (Thread.interrupted())
	    throw new InterruptedException();
	return tryAcquire(arg) ||
	    doAcquireNanos(arg, nanosTimeout);
    
private boolean doAcquireNanos(int arg, long nanosTimeout)
        throws InterruptedException 
        long lastTime = System.nanoTime();
        final Node node = addWaiter(Node.EXCLUSIVE);
        try 
            for (;;) 
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) 
                    setHead(node);
                    p.next = null; // help GC
                    return true;
                
// 不支持<=0的值,直接取消并返回false
                if (nanosTimeout <= 0) 
                    cancelAcquire(node);
                    return false;
                
// 如果时间<=1000纳秒,继续循环
                if (nanosTimeout > spinForTimeoutThreshold &&
                    shouldParkAfterFailedAcquire(p, node))
                    LockSupport.parkNanos(this, nanosTimeout);
                long now = System.nanoTime();
// 注意:如果此时修改系统时间,会引起误差。系统时间调大可能就直
// 接被cancel或者park的时间变小了;系统时间调小,park的时间
// 就变大了
                nanosTimeout -= now - lastTime;
                lastTime = now;
// 如果被中断了,就退出循环进入后面的取消流程
                if (Thread.interrupted())
                    break;
            
         catch (RuntimeException ex) 
            cancelAcquire(node);
            throw ex;
        
        // Arrive here only if interrupted
        cancelAcquire(node);
        throw new InterruptedException();
    

4 共享模式

4.1 acquireShared

    共享模式,忽略中断,一样的思路,先try一下,如果失败了就入队列。有个小区别:tryAcquireShared返回值小于0表示失败,而不再是简单的boolean了。

public final void acquireShared(int arg) 
        if (tryAcquireShared(arg) < 0)
            doAcquireShared(arg);
    
private void doAcquireShared(int arg) 
// 这里是SHARED,而acquire是EXCLUSIVE。
        final Node node = addWaiter(Node.SHARED);
        try 
            boolean interrupted = false;
            for (;;) 
                final Node p = node.predecessor();
                if (p == head) 
                    int r = tryAcquireShared(arg);
                    if (r >= 0) 
// 重置head并传播给后面的共享结点
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
// 如果循环过程中被中断了,恢复中断标识
                        if (interrupted)
                            selfInterrupt();
                        return;
                    
                
// 如果满足park条件就休眠了(如果锁可用的话,前面的Node会通
// 知),节省CPU;要是休眠过程中(LockSupport.park)被中断了,
// 那么会清除中断标识继续循环。
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            
         catch (RuntimeException ex) 
            cancelAcquire(node);
            throw ex;
        
    
private void setHeadAndPropagate(Node node, int propagate) 
// 重置head
        setHead(node);
        if (propagate > 0 && node.waitStatus != 0) 
            /*
             * 如果s为null,这里就简单化处理了,也直接调用unparkSuccessor方
* 法,不要用复杂的算法计算出第一个shared后继结点了。
             */
            Node s = node.next;
            if (s == null || s.isShared())
                unparkSuccessor(node);
        
    

4.2 releaseShared

    和release结构很像,除了第一行有个try:

public final boolean releaseShared(int arg) 
        if (tryReleaseShared(arg)) 
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        
        return false;
    

4.3 acquireSharedInterruptibly

    共享模式,响应中断,acquireShared的升级版,函数结构也很类似:

public final void acquireSharedInterruptibly(int arg) throws InterruptedException 
        if (Thread.interrupted())
            throw new InterruptedException();
        if (tryAcquireShared(arg) < 0)
            doAcquireSharedInterruptibly(arg);
    
该函数也会抛出InterruptedException异常。

private void doAcquireSharedInterruptibly(int arg)
        throws InterruptedException 
        final Node node = addWaiter(Node.SHARED);
        try 
            for (;;) 
                final Node p = node.predecessor();
                if (p == head) 
                    int r = tryAcquireShared(arg);
                    if (r >= 0) 
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        return;
                    
                
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
// 不同点:这里会跳出循环,然后cancel并抛出异常
                    break;
            
         catch (RuntimeException ex) 
            cancelAcquire(node);
            throw ex;
        
        // Arrive here only if interrupted
        cancelAcquire(node);
        throw new InterruptedException();
    

4.4 tryAcquireSharedNanos

    共享模式,响应中断,支持超时时间设置,acquireSharedInterruptibly的升级版:

public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout) throws InterruptedException 
	if (Thread.interrupted())
	    throw new InterruptedException();
	return tryAcquireShared(arg) >= 0 ||
	    doAcquireSharedNanos(arg, nanosTimeout);
    
  可以看到,带超时参数的获取锁方法都带有boolean返回值的,表示获取锁成功还是失败。doAcquireSharedNanos思路跟上面的doAcquireNanos类似,这里就不重复介绍了,代码在下面:

private boolean doAcquireSharedNanos(int arg, long nanosTimeout)
        throws InterruptedException 

        long lastTime = System.nanoTime();
// 这里是SHARED
        final Node node = addWaiter(Node.SHARED);
        try 
            for (;;) 
                final Node p = node.predecessor();
                if (p == head) 
                    int r = tryAcquireShared(arg);
                    if (r >= 0) 
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        return true;
                    
                
// 不支持<=0的值,直接取消并返回false
                if (nanosTimeout <= 0) 
                    cancelAcquire(node);
                    return false;
                
// 如果时间<=1000纳秒,继续循环
                if (nanosTimeout > spinForTimeoutThreshold &&
                    shouldParkAfterFailedAcquire(p, node))
                    LockSupport.parkNanos(this, nanosTimeout);
                long now = System.nanoTime();
// 注意:如果此时修改系统时间,会引起误差。系统时间调大可能就直
// 接被cancel或者park的时间变小了;系统时间调小,park的时间
// 就变大了
                nanosTimeout -= now - lastTime;
                lastTime = now;
// 如果被中断了,就退出循环进入后面的取消流程
                if (Thread.interrupted())
                    break;
            
         catch (RuntimeException ex) 
            cancelAcquire(node);
            throw ex;
        
        // Arrive here only if interrupted
        cancelAcquire(node);
        throw new InterruptedException();
    

5 其他方法

5.1 hasQueuedThreads

    查询队列里是否有线程在等待,由于中断和超时引起的取消操作随时都可能发生,这方法不一定实时准确。

public final boolean hasQueuedThreads() 
        return head != tail;
    

5.2 hasContended

    查询队列是否被多个acquire请求竞争过(导致某个线程阻塞过)。为什么head 不为null就能证明?有竞争就会入队列此时head不为null,但是任务执行完了呢?通过上面的代码知道,head是由队列里刚获得到锁的线程设置的 (把自己设置成head),即使任务执行完也不会修改head,只能由下个入队的线程设置,这样head就永远不会为空了。

public final boolean hasContended() 
        return head != null;
    

5.3 getFirstQueuedThread

    返回队列里第一个没有获取到锁的线程,如果head等于tail说明队列里没有线程在等待,直接返回null;否则,调用fullGetFirstQueuedThread。

  public final Thread getFirstQueuedThread() 
        // handle only fast path, else relay
        return (head == tail)? null : fullGetFirstQueuedThread();
    
private Thread fullGetFirstQueuedThread() 
        /*
         * The first node is normally h.next. Try to get its
         * thread field, ensuring consistent reads: If thread
         * field is nulled out or s.prev is no longer head, then
         * some other thread(s) concurrently performed setHead in
         * between some of our reads. We try this twice before
         * resorting to traversal.
         */
        // 一般来说这个线程就是head->next,需要保证在并发情况下读一致性:
// 1. (h = head) != null
// 2. (s = h.next) != null
// 3. s.prev = head
// 4. (st = s.thread) != null
// 假设2、3中间被并发的插入了一个setHead方法,执行3时发现s.prev为空了
// 因此,这里需要再试一次(第二次其实也有可能会失败,不过概率已经很小
// 了,就像连续被雷劈两次一样;即使第二次也失败了,还有后面最后一道安
// 全措施,从tail开始向前遍历寻找)。
        Node h, s;
        Thread st;
        if (((h = head) != null && (s = h.next) != null &&
             s.prev == head && (st = s.thread) != null) ||
            ((h = head) != null && (s = h.next) != null &&
             s.prev == head && (st = s.thread) != null))
            return st;

        /*
         * Head's next field might not have been set yet, or may have
         * been unset after setHead. So we must check to see if tail
         * is actually first node. If not, we continue on, safely
         * traversing from tail back to head to find first,
         * guaranteeing termination.
         */
// head.next也有可能被并发修改,比如上面的1、2中间插入了一个acquireQueued
// 方法,执行完setHeader方法后将head.next置为null,这样条件2就不成立了,
// 因此,就进入下面的最后一道程序了。
        Node t = tail;
        Thread firstThread = null;
        while (t != null && t != head) 
            Thread tt = t.thread;
            if (tt != null)
                firstThread = tt;
            t = t.prev;
        
        return firstThread;
    

5.4 isQueued

判断线程是否在队列里(包含头节点了),跟getFirstQueuedThread不一样,没有先从head开始找,直接从tail开始反向搜索,很直接。因为getFirstQueuedThread要找的是第一个,从head开始找效率比较高,从tail开始反向遍历是因为没有其他更好的方法了。isQueued不一样,它是找一个节点,反正都要遍历一遍,从head或tail都一样,时间复杂度都是O(n)。

public final boolean isQueued(Thread thread) 
        if (thread == null)
            throw new NullPointerException();
        for (Node p = tail; p != null; p = p.prev)
            if (p.thread == thread)
                return true;
        return false;
    

5.5 apparentlyFirstQueuedIsExclusive

    队列里第一个等待的结点是否是独占模式(不知道这里为啥没考虑并发情况,没有从tail开始遍历)。

final boolean apparentlyFirstQueuedIsExclusive() 
        Node h, s;
        return ((h = head) != null && (s = h.next) != null &&
                s.nextWaiter != Node.SHARED);
    

5.6 isFirst

    当前线程是否是队列里的第一个元素。

final boolean isFirst(Thread current) 
        Node h, s;
        return ((h = head) == null ||
                ((s = h.next) != null && s.thread == current) ||
                fullIsFirst(current));
    
final boolean fullIsFirst(Thread current) 
        // same idea as fullGetFirstQueuedThread
        Node h, s;
        Thread firstThread = null;
        if (((h = head) != null && (s = h.next) != null &&
             s.prev == head && (firstThread = s.thread) != null))
            return firstThread == current;
        Node t = tail;
        while (t != null && t != head) 
            Thread tt = t.thread;
            if (tt != null)
                firstThread = tt;
            t = t.prev;
        
        return firstThread == current || firstThread == null;
    

5.7 getQueueLength

    队列长度,不包括已经取消的和头节点,因为它俩的thread域都为null。

public final int getQueueLength() 
        int n = 0;
        for (Node p = tail; p != null; p = p.prev) 
            if (p.thread != null)
                ++n;
        
        return n;
    

6 ConditionObject内部类

Condition的使用与原理可以参考其他资料:http://blog.csdn.net/ghsau/article/details/7481142

6.1 await

支持中断的等待方法:

public final void await() throws InterruptedException 
            if (Thread.interrupted())
                throw new InterruptedException();
// 创建一个节点并将其加入condition的等待列表里,如果最后一个节点的waitStatus不
// 是CONDITION,需要将其移除。
            Node node = addConditionWaiter();
// 模拟synchronized语义,在wait前要先释放对象锁
            int savedState = fullyRelease(node);
            int interruptMode = 0;
// 模拟obj.wait语义,阻塞自己
            while (!isOnSyncQueue(node)) 
                LockSupport.park(this);
// 检查下是否有中断,如果有,还要区分下是signal前还是signal后。signal前中断的
// 的话要抛出InterruptedException,否则,只需要设置中断标识。怎么判断是前还是
// 后?依据就是node.waitStatus域,如果还是CONDITION,说明还没被signal;如果
// 是0,说明已经被signal函数修改过了。
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
            
// 模拟synchronized语义,obj.wait被唤醒后要重新获取obj对象锁。
// 执行到这里说明node已经在AQS队列里,否则就走不出循环了^0^。具体保证的方法:
// 1、其他线程调用signal方法;2、如果被中断了,在上面中断检查时根据条件判断是否
// 需要调用enq方法塞进队列。如果acquireQueue被中断了不会抛出异常,只是设置中断
// 标识,和synchronized语义一样不会抛出中断异常。
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;
            if (node.nextWaiter != null)
// 扫描一下条件等待队列,清除被取消的结点
                unlinkCancelledWaiters();
            if (interruptMode != 0)
// 不等于0说明有异常,需要处理下
                reportInterruptAfterWait(interruptMode);
        
private Node addConditionWaiter() 
            Node t = lastWaiter;
            // If lastWaiter is cancelled, clean out.
            if (t != null && t.waitStatus != Node.CONDITION) 
                unlinkCancelledWaiters();
                t = lastWaiter;
            
            Node node = new Node(Thread.currentThread(), Node.CONDITION);
            if (t == null)
                firstWaiter = node;
            else
                t.nextWaiter = node;
            lastWaiter = node;
            return node;
        

addConditionWaiter首先检查当前条件队列里尾节点状态是否是CONDITION,如果是,则遍历整个条件队列,删除所有被取消了的结点(状态不是CONDITION的结点);之后,新建一个节点并插入链表最后一个。

private void unlinkCancelledWaiters() 
            Node t = firstWaiter;
            Node trail = null;
            while (t != null) 
                Node next = t.nextWaiter;
                if (t.waitStatus != Node.CONDITION) 
                    t.nextWaiter = null;
                    if (trail == null)
                        firstWaiter = next;
                    else
                        trail.nextWaiter = next;
                    if (next == null)
                        lastWaiter = trail;
                
                else
                    trail = t;
                t = next;
            
        
   unlinkCancelledWaiters链表操作,删除所有被取消了的结点(状态不是CONDITION的结点)。这个函数在两个地方会被调用:1、在条件队列里等待时被取消了;2、在条件队列里插入一个新节点时发现尾节点被取消了。

final int fullyRelease(Node node) 
        try 
            int savedState = getState();
            if (release(savedState))
                return savedState;
         catch (RuntimeException ex) 
            node.waitStatus = Node.CANCELLED;
            throw ex;
        
        // release失败就会走到这了
        node.waitStatus = Node.CANCELLED;
        throw new IllegalMonitorStateException();
    

 fullyRelease先获取state的值,把它作为参数调用AQS的release方法,并返回state。如果release失败或抛异常,则取消node,抛出异常。

final boolean isOnSyncQueue(Node node) 
// 如果waitStatus还是condition或者node.prev还是null,说明还在条件队列里呢。
// 调用signal函数从条件队列挪到AQS同步队列后,waitStatus会被设置成0的;
// 而且,刚入同步队列时prev肯定不是null(AQS中至少是有一个节点的或者有个dummy
// 头节点)。
        if (node.waitStatus == Node.CONDITION || node.prev == null)
            return false;
// 如果next域有值了,说明已经挪到AQS队列里了。因为条件队列Node的prev、next域
// 都是null的,它是通过nextWaiter域串接的,是个单向链表。
        if (node.next != null)
            return true;
        /*
         * node.prev can be non-null, but not yet on queue because
         * the CAS to place it on queue can fail. So we have to
         * traverse from tail to make sure it actually made it.  It
         * will always be near the tail in calls to this method, and
         * unless the CAS failed (which is unlikely), it will be
         * there, so we hardly ever traverse much.
         */
// 走到这里说明node.prev肯定不是null的了,但是还不能说明node就是AQS同步队列里
// 。这个移动操作是在signal里进行的(后面会详细分析),会调用enq函数把node插入// 队尾,enq函数会先设置node的prev域,等tail设置成功后才会设置tail的next域。
// 只有tail设置成功我们才认为node是在AQS同步队列里的。因此,考虑到这个并发场景,
// 我们从tail开始寻找node节点。
        return findNodeFromTail(node);
    
   isOnSyncQueue函数,如果node之前是在condition的条件队列里的,现在移到了AQS的同步等待队里了,就返回true。

private int checkInterruptWhileWaiting(Node node) 
            return (Thread.interrupted()) ?
                ((transferAfterCancelledWait(node))? THROW_IE : REINTERRUPT) :
                0;
        

checkInterruptWhileWaiting函数返回中断状态:0:未被中断;THROW_IE:在入AQS同步队列前(signal之前)被中断了;REINTERRUPT:在入AQS同步队列后(signal之后)被中断了(这种概率比较小)

final boolean transferAfterCancelledWait(Node node) 
// 如果还没被signal唤醒就被中断了,需要手动把node插队列里,保证后面的
// acquireQueued函数不会死循环。
        if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) 
            enq(node);
            return true;
        
        /*
         * If we lost out to a signal(), then we can't proceed
         * until it finishes its enq().  Cancelling during an
         * incomplete transfer is both rare and transient, so just
         * spin.
         */
// 如果走到这里说明signal和interrupt并发了,signal只设置了node.waitStats,
// 还没来得及执行enq函数。因此,这里要等待signal执行完enq函数,保证后面的
// acquireQueued函数不会死循环。
        while (!isOnSyncQueue(node))
            Thread.yield();
        return false;
    

transferAfterCancelledWait函数表明在signal之前被中断还是之后被中断,依据就是node.waitStatus域的值,如果此时该域的值还是CONDITION,说明还没有被唤醒;否则,肯定就被唤醒过了。

private void reportInterruptAfterWait(int interruptMode)
            throws InterruptedException 
            if (interruptMode == THROW_IE)
                throw new InterruptedException();
            else if (interruptMode == REINTERRUPT)
                selfInterrupt();
        

reportInterruptAfterWait函数比较简单,根据传递的参数值决定是抛出InterruptedException异常还是仅仅设置下中断标识。

6.2 signal

public final void signal() 
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            Node first = firstWaiter;
            if (first != null)
                doSignal(first);
        

signal函数用来唤醒调用condition.wait的线程。如果不是独占的,抛出异常;否则,唤醒条件队列里的第一个结点。

private void doSignal(Node first) 
            do 
// 重置头节点,如果重置后的头结点为null,那么把尾节点也设置成null
                if ( (firstWaiter = first.nextWaiter) == null)
                    lastWaiter = null;
// 置空旧的头节点的nextWaiter域,因为旧的头节点已经被删除了
                first.nextWaiter = null;
// 如果node被取消了(如被中断了),则跳过该节点,循环继续
             while (!transferForSignal(first) &&
                     (first = firstWaiter) != null);
        
doSignal函数主要就是通知下一个没被取消的结点,并且重置condition的头结点。

final boolean transferForSignal(Node node) 
        /*
         * If cannot change waitStatus, the node has been cancelled.
         */
        if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
            return false;

// 在AQS同步队列中插入node节点,并返回前一个节点p
        Node p = enq(node);
        int c = p.waitStatus;
// 如果节点p被取消了,或者设置p的waitStatus失败了,直接唤醒node节点了
        if (c > 0 || !compareAndSetWaitStatus(p, c, Node.SIGNAL))
            LockSupport.unpark(node.thread);
        return true;
    

transferForSignal函数功能是将一个节点从条件队列移动到AQS的同步等待队列,并将node.waitStatus置为0。

6.3 awaitUninterruptibly

public final void awaitUninterruptibly() 
            Node node = addConditionWaiter();
            int savedState = fullyRelease(node);
            boolean interrupted = false;
            while (!isOnSyncQueue(node)) 
                LockSupport.park(this);
// 和await不同,这里保存中断状态并清除,不管signal前还是后,继续循环。
                if (Thread.interrupted())
                    interrupted = true;
            
            if (acquireQueued(node, savedState) || interrupted)
                selfInterrupt();
        

和await结构很类似,主要有两个不同点:1、这函数不响应中断,不会抛出InterruptedException异常;2、函数最后没有清除取消了的结点。

6.4 awaitNanos

public final long awaitNanos(long nanosTimeout) throws InterruptedException 
            if (Thread.interrupted())
                throw new InterruptedException();
            Node node = addConditionWaiter();
            int savedState = fullyRelease(node);
            long lastTime = System.nanoTime();
            int interruptMode = 0;
            while (!isOnSyncQueue(node)) 
                if (nanosTimeout <= 0L) 
// 如果超时时间小于等于0则执行下面中断检查也执行的操作,然后退出循环。执行到这里
// 大多数情况下是还没来得及signal的。
                    transferAfterCancelledWait(node);
                    break;
                
                LockSupport.parkNanos(this, nanosTimeout);
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;

                long now = System.nanoTime();
                nanosTimeout -= now - lastTime;
                lastTime = now;
            
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;
            if (node.nextWaiter != null)
                unlinkCancelledWaiters();
            if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode);
            return nanosTimeout - (System.nanoTime() - lastTime);
        

awaitNanos函数支持超时,并响应中断,返回剩余的时间(入参的值减去消耗的时间)。

6.5 带参数的await

和awaitNanos很像,不过返回值是boolean。

  public final boolean await(long time, TimeUnit unit) throws InterruptedException 
            if (unit == null)
                throw new NullPointerException();
// 单位换算成纳秒
            long nanosTimeout = unit.toNanos(time);
            if (Thread.interrupted())
                throw new InterruptedException();
            Node node = addConditionWaiter();
            int savedState = fullyRelease(node);
            long lastTime = System.nanoTime();
            boolean timedout = false;
            int interruptMode = 0;
            while (!isOnSyncQueue(node)) 
                if (nanosTimeout <= 0L) 
// timout默认是false,只有这里设置了值(nanosTimeout<=0时)
                    timedout = transferAfterCancelledWait(node);
                    break;
                
                LockSupport.parkNanos(this, nanosTimeout);
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
                long now = System.nanoTime();
                nanosTimeout -= now - lastTime;
                lastTime = now;
            
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;
            if (node.nextWaiter != null)
                unlinkCancelledWaiters();
            if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode);
            return !timedout;
        

6.6 awaitUtil

      返回值也是boolean,和带参数的await方法类似,不同点是通过LockSupport的parkUtil方法进行阻塞的。

6.7 signalAll

public final void signalAll() 
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            Node first = firstWaiter;
            if (first != null)
                doSignalAll(first);
        
private void doSignalAll(Node first) 
// 头结点、尾节点都置空,都删了
            lastWaiter = firstWaiter  = null;
            do 
                Node next = first.nextWaiter;
                first.nextWaiter = null;
// 转移到同步队列里了 
                transferForSignal(first);
                first = next;
             while (first != null);
        
  doSignalAll函数会把condition的条件队列里所有的结点删除掉,并转移到AQS的等待队列里。







以上是关于Java 1.6 AbstractQueuedSynchronizer源码解析的主要内容,如果未能解决你的问题,请参考以下文章

java 1.6 可以使用pageholder吗

使用 Java 1.6 通过 JMS 连接到 SQS

如何在使用 1.6 运行基本代码时使用 Java 1.7 编写 Junit

已更改为 1.6

markdown oracle ojdbc驱动java jdk 1.6兼容

Java 1.6:公共变量不起作用