Java并发控制的基础类AbstractQueuedSynchronizer的实现原理简介

Posted 蜀中孤鹰

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Java并发控制的基础类AbstractQueuedSynchronizer的实现原理简介相关的知识,希望对你有一定的参考价值。

1.引子

Lock接口的主要实现类ReentrantLock 内部主要是利用一个Sync类型的成员变量sync来委托Lock锁接口的实现,而Sync继承于AbstractQueuedSynchronizer,且大多数java.util.concurrent包下的并发工具类都是利用AbstractQueuedSynchronizer同步器来委托实现的,它是用来构建锁或者其他同步组件的基础框架。要想弄明白并发的原理,必须先搞清楚AbstractQueuedSynchronizer的实现机制。

可以看出 AbstractQueuedSynchronizer的直接父类是AbstractOwnableSynchronizer。

 AbstractOwnableSynchronizer的类定义

    public abstract class AbstractOwnableSynchronizer
            implements java.io.Serializable {

        private static final long serialVersionUID = 3737899427754241961L;

        protected AbstractOwnableSynchronizer() {
        }

        private transient Thread exclusiveOwnerThread;

        protected final void setExclusiveOwnerThread(Thread thread) {
            exclusiveOwnerThread = thread;
        }

        protected final Thread getExclusiveOwnerThread() {
            return exclusiveOwnerThread;
        }
    }

 

 这个类中只有Thread类型exclusiveOwnerThread成员变量的一对setter/getter方法,用来设置/获取独占线程,正如其名中的“Ownable",此类就是"一个线程可能专有的同步器。"这一对setter/getter对于独占锁类型的并发工具特别有用。而ReentrantLock是一个可重入排他锁(排他锁又称独占锁),因此ReentrantLock这类中常用到这两个方法。

2.同步器的基本用法

AbstractQueuedSynchronizer的类部分注释如下:

 

英文直译的意思是:此类提供用于实现阻塞锁及相关功能的框架依靠先进先出(FIFO)等待队列的同步器(semaphores, events, 等)。 为大多数依赖单个int类型原子值表示状态的同步器提供了实现基础。 子类必须定义更改此状态的受保护方法,并定义该状态对于获取或释放此对象而言意味着什么。 鉴于这些,此类中的其他方法将执行所有排队和阻塞机制。 子类可以维护其他状态字段,但是在同步方面仅跟踪使用方法{@link #getState},{@ link #setState}和{@ link#compareAndSetState}操作的原子更新的{@code int}值。

 * Provides a framework for implementing blocking locks and related
 * synchronizers (semaphores, events, etc) that rely on
 * first-in-first-out (FIFO) wait queues.  This class is designed to
 * be a useful basis for most kinds of synchronizers that rely on a
 * single atomic {@code int} value to represent state. Subclasses
 * must define the protected methods that change this state, and which
 * define what that state means in terms of this object being acquired
 * or released.  Given these, the other methods in this class carry
 * out all queuing and blocking mechanics. Subclasses can maintain
 * other state fields, but only the atomically updated {@code int}
 * value manipulated using methods {@link #getState}, {@link
 * #setState} and {@link #compareAndSetState} is tracked with respect
 * to synchronization.
 *
 * <p>Subclasses should be defined as non-public internal helper
 * classes that are used to implement the synchronization properties
 * of their enclosing class.  Class
 * {@code AbstractQueuedSynchronizer} does not implement any
 * synchronization interface.  Instead it defines methods such as
 * {@link #acquireInterruptibly} that can be invoked as
 * appropriate by concrete locks and related synchronizers to
 * implement their public methods.
 *
 * <p>This class supports either or both a default <em>exclusive</em>
 * mode and a <em>shared</em> mode. When acquired in exclusive mode,
 * attempted acquires by other threads cannot succeed. Shared mode
 * acquires by multiple threads may (but need not) succeed. This class
 * does not &quot;understand&quot; these differences except in the
 * mechanical sense that when a shared mode acquire succeeds, the next
 * waiting thread (if one exists) must also determine whether it can
 * acquire as well. Threads waiting in the different modes share the
 * same FIFO queue. Usually, implementation subclasses support only
 * one of these modes, but both can come into play for example in a
 * {@link ReadWriteLock}. Subclasses that support only exclusive or
 * only shared modes need not define the methods supporting the unused mode.
 *
 * <p>This class defines a nested {@link ConditionObject} class that
 * can be used as a {@link Condition} implementation by subclasses
 * supporting exclusive mode for which method {@link
 * #isHeldExclusively} reports whether synchronization is exclusively
 * held with respect to the current thread, method {@link #release}
 * invoked with the current {@link #getState} value fully releases
 * this object, and {@link #acquire}, given this saved state value,
 * eventually restores this object to its previous acquired state.  No
 * {@code AbstractQueuedSynchronizer} method otherwise creates such a
 * condition, so if this constraint cannot be met, do not use it.  The
 * behavior of {@link ConditionObject} depends of course on the
 * semantics of its synchronizer implementation.
 *
 * <p>This class provides inspection, instrumentation, and monitoring
 * methods for the internal queue, as well as similar methods for
 * condition objects. These can be exported as desired into classes
 * using an {@code AbstractQueuedSynchronizer} for their
 * synchronization mechanics.
 *
 * <p>Serialization of this class stores only the underlying atomic
 * integer maintaining state, so deserialized objects have empty
 * thread queues. Typical subclasses requiring serializability will
 * define a {@code readObject} method that restores this to a known
 * initial state upon deserialization.
主要英文类注释

用通俗的话来说:同步器的主要使用方式是继承,子类通过继承同步器并实现它的抽象方法来管理同步状态,在抽象方法的实现过程中免不了要对同步状态进行更改,这时就需要使用同步器提供的3个方法(getState()、setState(int newState)和compareAndSetState(int expect,int update))来进行操作,因为它们能够保证状态的改变是安全的。子类推荐被定义为自定义同步组件的静态内部类,同步器自身没有实现任何同步接口,它仅仅是定义了若干同步状态获取和释放的方法来供自定义同步组件使用,同步器既可以支持独占式地获取同步状态,也可以支持共享式地获取同步状态,这样就可以方便实现不同类型的同步组件。

使用技巧:

同步器的设计是基于模板方法模式的,也就是说,使用者需要继承同步器并重写指定的方法,随后将同步器组合在自定义同步组件的实现中,并调用同步器提供的模板方法,而这些模板方法将会调用使用者重写的方法。而重写同步器指定的方法时,需要使用同步器提供的如下3个方法来访问或修改同步状态

·getState():获取当前同步状态。

·setState(int newState):设置当前同步状态。

·compareAndSetState(int expect,int update):使用CAS设置当前状态,该方法能够保证状态设置的原子性。

可以看出,可父类同步器重写的主要方法有tryAcquire(arg)、 tryRelease(int)  、isHeldExcluseively()   、 tryAcquireShared(int) 、 tryReleaseShared(int)这五个,父类同步器AQS中的这5个方法不是抽象方法,是一个实例方法,但方法体中只有一行代码“throw new UnsupportedOperationException()”,如果不被重写则会实际调用父类的方法,将会直接报出异常。一般情况下,前两个方法是在排他锁中需要重写的方法(通常第3个方法”isHeldExcluseively()“也会被重写,将判断当前同步器是否在独占模式下被线程占用),而后两个方法是在共享锁中需要被重写,但基本上不可能同时重写这5个方法,因为一个锁不可能既是排他锁又是共享锁。

 

同步器可重写的方法:
方法名 描述
boolean  tryAcquire(int) 独占式获取同步状态,实现此方法需要查询当前状态并判断同步状是否符合预期,然后再进行CAS设置同步状态
 boolean tryRelease(int) 独占式释放同步状态,等待获取同步状态的线程将有机会获取同步状态
boolean isHeldExclusively() 当前同步器是否在独占模式下被线程占用,一般此方法表示是否被当前线程所独占
int tryAcquireShared(int) 共享式获取同步状态,反加大于等于0(等于0表示下个等待节点可能获取锁失败,大于0表示后面的等待节点获取锁很可能成功),表示获取成功,反之,获取失败。
boolean tryReleaseShared(int) 共享式释放同步状态

 

 同步器AQS中的模板方法:

这些模板方法的方法体内将会调用以上5个可被重写的方法,

而自定义同步组件在实现一些同步功能时一般会使用委托的方式调用AQS的这些模板方法,而不会直接去调用上面AQS的那5个可重写的方法。当然也有例外,如ReentrantLock的tryLock()方法直接调用”boolean  tryAcquire(int)“

方法 说明
void acquire(int) 独占式获取同步状态,如果当前线程获取同步状态成功,则由该方法返回,否则,将会进人同步队列等待,该方法将会调用重写的tryAcquire(int)方法
void acquirelnterruptibly(int) 与acquire(int arg)相同,但是该方法响应中断,当前线程未获取到同步状态而进入同步队列中,如果当前线程被中断,则该方法会抛出InterruptedException并返回
boolean tryAcquireNanos(int ,long) 在acquireTnterruptibly(int arg)基础上增加了超时限制,如果当前线程在超时时间内没有获取到同步状态,那么将会返回false,如果获取到了返回true
boolean release(int)

独占式的释放同步状态,该方法会在释放同步状态之后,将同步队列中第一个节点包含的线程唤醒

void acquireShared(int) 共享式的获取同步状态,如果当前线程未获取到同步状态,将会进入同步队列等待,与独占式获取的主要区别是在同一时刻可以有多个线程获取到同步状态
void acquireSharedInterruptibly(int) 与acquireShared(int )相同,该方法响应中断
boolean tryAcquireSharedNanos(int,long) 在acquireSharedInterruptibly(int)基础上增加了超时限制
boolean releaseShared(int) 共享式的释放同步状态 
getQueuedThreads() 获取等待在同步队列上的线程集合

 AbstractQueuedSynchronizer类注释中的排他锁示例

class Mutex implements Lock, java.io.Serializable {
    /**
     * 如果state是0,表明锁没有被其他线程抢占,可以获取锁
     * 如果state是1,表明锁被某个线程抢占了
     */
    private static class Sync extends AbstractQueuedSynchronizer {
        //锁的状态,即是否被某个抢占了
        protected boolean isHeldExclusively() {
            return getState() == 1;
        }
        //如果state是0,尝试抢锁
        public boolean tryAcquire(int acquires) {
            assert acquires == 1; // Otherwise unused
            /*
             *cas原子操作地修改state的值,修改成功,表示抢锁成功
             * 则将当前线程设置为独占所有者线程
             */
            if (compareAndSetState(0, 1)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }
        // 将state重置为0
        protected boolean tryRelease(int releases) {
            assert releases == 1; // Otherwise unused
            //如果state为0,表明没有线程获取到锁,那么就不存在释放锁这各说法,所以管程状态错误
            if (getState() == 0) throw new IllegalMonitorStateException();
            //独占锁的释放,就是独占线程释放锁,所以独占所有者线程设为空
            setExclusiveOwnerThread(null);
            /*
             *释放锁,表明之前锁已经被获取了,当前只会一个线程能够调用setState(),
             * 所以可以不用compareAndSetState(int,int)方法来进行原子操作的更新,可以直接调用setState()方法
             */
            setState(0);
            return true;
        }

        Condition newCondition() {
            return new ConditionObject();
        }

        private void readObject(ObjectInputStream s)
                throws IOException, ClassNotFoundException {
            s.defaultReadObject();
            setState(0); // reset to unlocked state
        }
    }

    // The sync object does all the hard work. We just forward to it.
    private final Sync sync = new Sync();

    //lock接口的实现全部委托给sync
    public void lock() {
        sync.acquire(1);
    }

    public boolean tryLock() {
        return sync.tryAcquire(1);
    }

    public void unlock() {
        sync.release(1);
    }

    public Condition newCondition() {
        return sync.newCondition();
    }

    public boolean isLocked() {
        return sync.isHeldExclusively();
    }

    public boolean hasQueuedThreads() {
        return sync.hasQueuedThreads();
    }

    public void lockInterruptibly() throws InterruptedException {
        sync.acquireInterruptibly(1);
    }

    public boolean tryLock(long timeout, TimeUnit unit)
            throws InterruptedException {
        return sync.tryAcquireNanos(1, unit.toNanos(timeout));
    }
}
Mutex排他锁

 上面的示例中:独占锁Mutex是一个自定义同步组件,它在同一时刻只允许一个线程占有锁。Mutex中定义了一个静态内部类,该内部类继承了同步器并实现了独占式获取和释放同步状态。在tryAcquire(int acquires)方法中,如果经过CAS设置成功(同步状态设置为1),则代表获取了同步状态,而在tryRelease(int releases)方法中只是将同步状态重置为0。用户使用Mutex时并不会直接和内部同步器的实现打交道,而是调用Mutex提供的方法,在Mutex的实现中,以获取锁的lock()方法为例,只需要在方法实现中调用同步器的模板方法acquire(int args)即可,当前线程调用该方法获取同步状态失败后会被加入到同步队列中等待

 

AbstractQueuedSynchronizer类注释中的共享锁示例:

这是一个类似于CountDownLatch的闩锁类,只不过它只需要触发一个信号即可。 因为闩锁是非排他性的,所以它使用共享的获取和释放方法。

class BooleanLatch {

    private static class Sync extends AbstractQueuedSynchronizer {

        boolean isSignalled() {
            return getState() != 0;
        }

        protected int tryAcquireShared(int ignore) {
            return isSignalled() ? 1 : -1;
        }

        protected boolean tryReleaseShared(int ignore) {
            setState(1);
            return true;
        }
    }

    private final Sync sync = new Sync();

    public boolean isSignalled() {
        return sync.isSignalled();
    }

    public void signal() {
        sync.releaseShared(1);
    }

    public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }
}
BooleanLatch共享锁

 

 

3.AQS中的同步队列

主要的成员变量

    private transient volatile Node head;

    private transient volatile Node tail;

    private volatile int state;

 

 AbstractQueuedSynchronizer使用了一个int类型的名为state的成员变量表示同步状态,通过Node类型(Node是AbstractQueuedSynchronizer的静态内部类)的head 、tail头尾节点所确定的FIFO(双向链表型)队列来完成资源获取线程的排队工作。

而节点是构成同步队列(等待队列)的基础,同步器拥有首节点(head)和尾节点(tail),没有成功获取同步状态的线程将会成为节点加入该队列的尾部

同步队列结构图
 同步队列结构图


从上图可看出,同步器包含了两个节点类型的引用,一个指向头节点,而另一个指向尾节点。当一个线程成功地获取了同步状态(或者锁),其他线程将无法获取到同步状态,此节点需要添加到同步队列中,为保证线程安全,使用“compareAndSetTail(Node expect,Node update)"方法原子更新地在尾部插入等待节点(需要自旋循环直至更新成功)

等待节点在尾部点入队

 

同步队列遵循FIFO,首节点是获取同步状态成功的节点,首节点的线程在释放同步状态时,将会唤醒后继节点,而后继节点将会在获取同步状态成功时将自己设置为首节点 。获取同步状态成功的线程才能设置首节点,且只有一个线程能获取到同步状态,不存在线程安全问题,所以不用CAS来设置节点

设置首节点

 

4.静态内部类Node的组成

    Node节点是同步器AQS的一个关键内部类,一个Node实例表示一个双向链表型队列的一个节点  。

Node类的全部定义代码

    static final class Node {

        static final Node SHARED = new Node();

        static final Node EXCLUSIVE = null;

        static final int CANCELLED = 1;

        static final int SIGNAL = -1;

        static final int CONDITION = -2;

        static final int PROPAGATE = -3;

        volatile int waitStatus;

        volatile Node prev;

        volatile Node next;

        volatile Thread thread;

        Node nextWaiter;

        final boolean isShared() {
            return nextWaiter == SHARED;
        }

        final Node predecessor() throws NullPointerException {
            Node p = prev;
            if (p == null)
                throw new NullPointerException();
            else
                return p;
        }

        Node() {    // Used to establish initial head or SHARED marker
        }

        Node(Thread thread, Node mode) {     // Used by addWaiter
            this.nextWaiter = mode;
            this.thread = thread;
        }

        Node(Thread thread, int waitStatus) { // Used by Condition
            this.waitStatus = waitStatus;
            this.thread = thread;
        }
    }
View Code

 

4.1 Node的成员变量

        volatile int waitStatus;

        volatile Node prev;

        volatile Node next;

        volatile Thread thread;

        Node nextWaiter;

 

前几个属性都是用volatile关键字修饰,保存了内存的可见性,而nextWaiter属性在同步队列中一经初始化便不再变化,在条件队列中用锁保证可见性(在同步锁的lock与unlock之间的代码块中设置获取),因此可不用volatile修饰。

waitStatus属性表示当前节点的等待状态,具体含义见4.2。

thread属性表示当前节点入队等待的线程,即获取同步状态的线程。

prev 、next属性分别表示当前节点的前后驱节点。

nextWaiter属性在描述AQS同步队列的head、 tail两个成员变量中不起太大作用,只是表示共享型或独占型节点,nextWaiter属性主要在描述Condition条件队列的firstWaiter和lastWaiter两个成员变量中起重要作用,表示下一个等待条件的节点。

4.2 Node的静态常量

        static final Node SHARED = new Node();

        static final Node EXCLUSIVE = null;

        static final int CANCELLED = 1;

        static final int SIGNAL = -1;

        static final int CONDITION = -2;

        static final int PROPAGATE = -3;
静态常量SHARED和EXCLUSIVE分别代表在同步队列中当前节点的下一个等待节点的类型常数(共享型还是独占型)。
而静态常量CANCELLED、SIGNAL、CONDITION、PROPAGATE分别表示不同的等待状态,可以作为Node的waitStatus属性值。等待状态waitStatus各个取值的含义:
①CANCELLED,即值为1,同步队列中等待的线程等待超时或被中断,需要从同步队列中取消等待,节点进入此状态后不再变化。
②SIGNAL,即值为-1,后继节点的线程处于等待状, 而当前节点的线程如果释放了同步状态或者被取消,将通知后继节点,后继节点的线程将会运行
③CONDITION,即值为-2,节点在等待队列中,节点线程等待在Condition上,当其他线程对Condition调用了singal()方法后,该节点将会从等待队列中转移到同步队列中,加入到对同步状态的获取中
④PROPAGTE,即值为-3,表示下一次共享式同步状获取将会无条件地传播下去
⑤0,表示初始状态。

AQS在状态判断时,用waitStatus>0表示取消状态,waitStatus<=0表示有效状态。

4.2 Node的构造方法

Node() {    // Used to establish initial head or SHARED marker
}
 Node()方法没有方法体,使用属性的默认值,那么属性prev为null,前驱节点为空,表示为头节点;waitNode也为空,作为共享标记。
    Node(Thread thread, Node mode) {     // Used by addWaiter
            this.nextWaiter = mode;
            this.thread = thread;
        }

        Node(Thread thread, int waitStatus) { // Used by Condition
            this.waitStatus = waitStatus;
            this.thread = thread;
        }
Node(Thread , Node)、 Node(Thread , int )节点分别作为等待队列、设置条件时使用。

5.独占锁的同步状态的获取与释放

5.1 获取同步状态

通过调用同步器的acquire(int arg)方法可以获取同步状态,该方法对中断不敏感,也就是由于线程获取同步状态失败后进入同步队列中,后续对线程进行中断操作时,线程不会从同步队列中移出

    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

 

基本过程是:获取同步状态,构造等待节点、加入等待队列、自旋式地获取同步状态。

基本逻辑:先通过tryAcquire(int)方法(此方法由使用者自己来重写父类AQS定义相应规则)尝试获取同步状态,如果获取失败则将调用addWaiter()方法构建一个包含当前线程的排他型等待节点加入队列,然后再调用acquireQueued(Node,int)循环自旋CAS地获取同步状态。如果获取不到则阻塞节点中的线程,而被阻塞线程的唤醒主要依靠前驱节点的出队或阻塞线程被中断来实现.

现在来看到这几个方法的实现细节:

添加新的等待节点的方法addWaiter(Node)

主要通过调用compareAndSetTail(Node ,Node )方法来保证正确安全地设置尾节点,避免了非线程安全状态下添加新节点时节点先后顺序紊乱的情况。如果一次CAS设置尾节点失败,则直接进入enq(final Node)方法通过“死循环”形式地CAS操作,直到将节点设置成为尾节点成功后方法才能返回结束。

    private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;  //记录旧尾节点
        if (pred != null) {
            //pred不为null表明队列中至少有一个节点
            node.prev = pred;//新添加的节点node作为队列最新的尾节点,那么node的前驱节点则是以前的旧尾节点
            if (compareAndSetTail(pred, node)) {
                /**
                 * cas设置新的尾节点成功,将从方法返回
                 * 旧尾节点的后继节点则是新添加的node节点
                 */
                pred.next = node;
                return node;
            }
        }
        //cas设置新的尾节点失败,进入enq方法,通过CAS进行死循环般地设置新的尾节点
        enq(node);
        return node;
    }
    private Node enq(final Node node) {
        for (;;) {
            Node t = tail;//尾节点为null,表明队列中一个节点都没有,是一个空队列
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))//CAS设置地设置头节点
                    tail = head;//此时队列中只有一个节点,头尾节点是同一个节点
            } else {
                node.prev = t;//新添加的节点node作为队列最新的尾节点,那么node的前驱节点则是以前的旧尾节点
                if (compareAndSetTail(t, node)) {//CAS更新失败则继续循环CAS设置尾节点
                    /**
                     * cas设置新的尾节点成功,将从方法返回
                     * 旧尾节点的后继节点则是新添加的node节点
                     */
                    t.next = node;
                    return t;
                }
            }
        }
    }

 

从等待队列中获取同步状态的方法acquireQueued(Node,int)

每个线程都尝试获取同步状态,当条件满足,获取到了同步状态,就可以从这个自旋过程中退出,否则依旧留在这个自旋过程中,方法就不能返回。

当前线程在“死循环”中尝试获取同步状态,而只有前驱节点是头节点才能够尝试获取同步状态,其原因主要体现在两方面:①首先队列这种数据结构要保证FIFO先进先出的基本原则。②其次头节点是当前获取到同步状态的节点,只有在头节点释放同步状态,才能通知后继节点可进入同步状态。

    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;//获取同步状态的标志flag
        try {
            boolean interrupted = false;//线程中断标志的flag
            for (;;) {
                final Node p = node.predecessor(); //p为node的前驱节点
                if (p == head && tryAcquire(arg)) {//p的前驱节点是头节点,且尝试获取同步状成功
                    /**
                     * setHead的方法体代码
                     *  head = node;
                     *  node.thread = null;
                     *  node.prev = null;
                     */
                    setHead(node);//将node设为新的头节点
                    p.next = null; //旧的头节点不会再使用了,将其相关属性解引用,便于垃圾回收
                    failed = false;
                    return interrupted;
                }
                /**
                 * shouldParkAfterFailedAcquire()方法用来检测在同步状态获取失败的情况下,线程是否需要阻塞.
                 * 如果shouldParkAfterFailedAcquire返回true,将会调用parkAndCheckInterrupt()进行线程阻塞,并进行中断状态检测,
                 * parkAndCheckInterrupt()返回是否中断的状态
                 */
                if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed) //如果同步状态获取失败则取消同步状态的获取
                cancelAcquire(node);
        }
    }

 

自旋地获取同步状态图

上图中可看出:由于非首节点线程前驱节点出队或者被中断而从等待状态返回,随后检查自己的前驱是否是头节点,如果是则尝试获取同步状态。可以看到节点和节点之间在循环检查的过程中基本不相互通信,而是简单地判断自己的前驱是否为头节点,这样就使得节点的释放规则符合FIFO,并且也便于对过早通知的处理(过早通知是指前驱节点不是头节点的线程由于中断而被唤醒)

独占式同步状态获取流程

 

acquireQueued()方法体内部调用过的几个值得注意的方法

1)shouldParkAfterFailedAcquire(Node,Node)用来检测在同步状态获取失败的情况下,线程是否需要阻塞。为什么要判断node是否需要阻塞,因为有可能前面的节点是无效的。

    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)
            /*
             * This node has already set status asking a release
             * to signal it, so it can safely park.
             * 前驱节点刚好是SIGNAL,前驱节点就马上可能获取到同步状态了,
             * 自己可以去休眠睡一会了。如果前驱状态不是SIGNAL,node还要参加同步状态的争夺,它还不能睡觉。
             */
            return true;
        if (ws > 0) { //ws==CANCELLED
            /*
             * Predecessor was cancelled. Skip over predecessors and
             * indicate retry.
             * 前驱节点被取消了,必须向前遍历找到一个有效状态的节点作为node的前驱节点。
             * 中间这些无效的节点也从同步队列中移除掉了。
             */
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            /*
             * waitStatus must be 0 or PROPAGATE.  Indicate that we
             * need a signal, but don\'t park yet.  Caller will need to
             * retry to make sure it cannot acquire before parking.
             * 如果前驱正常,那就把前驱的状态设置成SIGNAL。CAS可能失败,pred可能刚释放锁
             */
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;

 

 

2)而 parkAndCheckInterrupt()方法就很简单,休眠当前线程并检测当前线程是否被中断了。

private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this);
    return Thread.interrupted();
}

 

 

3)cancelAcquire(Node)从同步队列中移除无效(取消状态)的节点

主要逻辑是:找到离node最近(node之前)的有效节点pred,同时将node与pred之间的无效节点移除,将node的state无条件设为CANCELLED,将node从队列中移除,更新pred与node.next的相互链接关系。

    private void cancelAcquire(Node node) {
        // Ignore if node doesn\'t exist
        if (node == null)
            return;
        node.thread = null;
        // Skip cancelled predecessors
        //pred表示在队列中node前面的最近的有效节点
        Node pred = node.prev;
        ////向前遍历找到离node最近的有效节点
        while (pred.waitStatus > 0)
            node.prev = pred = pred.prev;
 
        //保存pred的原始后继节点,在CAS更新pre的后继节点将会用到predNext
        Node predNext = pred.next; //
 
        // Can use unconditional write instead of CAS here.
        // After this atomic step, other Nodes can skip past us.
        // Before, we are free of interference from other threads.
        /*
         

以上是关于Java并发控制的基础类AbstractQueuedSynchronizer的实现原理简介的主要内容,如果未能解决你的问题,请参考以下文章

java并发-同步容器类

java多线程4.构建并发模块

基础构建模块

Java并发工具类控制并发线程数的Semaphore

《java并发编程实战》读书笔记4--基础构建模块,java中的同步容器类&并发容器类&同步工具类,消费者模式

java中ConcurrentLinkedQueue类