AQS抽象同步器的核心原理与实践

Posted BLLBL

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了AQS抽象同步器的核心原理与实践相关的知识,希望对你有一定的参考价值。

基于CAS自旋实现的轻量级锁有两个问题:

(1)CAS空自旋会浪费大量CPU资源。

(2)在CMP架构的CPU会导致“总线风暴”。

解决CAS空自旋的有效方式之一是以空间换时间,比较常见的方案由两种:分散操作和热点和使用队列削峰。JUC使用的是队列削峰的方案解决CAS性能问题(LongAdder是分散热点),它提供了一个双向队列的削峰基类——抽象基础类AbstractQueuedSynchronizer(AQS)。

锁与队列的关系

1.CLH锁的内部队列

CLH自旋锁使用的CLH是一个单向队列,也是一个FIFO队列。在独占锁中,竞争资源在一个时间点只能被一个线程锁访问,队列头部的节点表示占有锁的节点,新加入的抢锁线程需要等待,会插入队列的尾部。

2.分布式锁的内部队列

在分布式锁的实现中,以ZooKeeper的分布式锁为例,就是创建临时节点,顺序执行。

3.AQS的内部队列

AQS是JUC提供的一个用于构建锁和同步容器的基础类。例如ReentrantLock、Semaphore、CountDownLatch、FutureTask等都是基于AQS构建的。AQS解决了实现同步容器时设计的大量细节问题。

AQS是CLH队列的一个变种。AQS队列内部维护的是一个FIFO的双向链表,每个节点有前驱结点和后继节点。每个节点由线程封装,当线程争抢锁失败后会封装成节点加入AQS队列中;当获取锁的线程释放锁以后,会从队列中唤醒一个阻塞的节点(线程)。

AQS的核心成员

AQS出于“分离变与不变”(人话:单一职责和开闭原则)的原则,基于模版模式实现。AQS为锁获取、锁释放的排队和出队过程提供了一系列的模版方法。由于JUC的显式锁种类丰富,因此AQS将不同锁的具体操作抽取为钩子方法,让各种锁的子类去实现。

状态标志位

AQS中维持了一个单一的volatile变量state,state表示锁的状态。

private volatile int state;

state保证了可见性,所以任何线程通过getState()获取状态都可以得到最新值。AQS提供了compareAndSetState()方法利用底层UnSafe的CAS机制来实现原子性。

protected final boolean compareAndSetState(int exepect, int update) {
    return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}

以ReentrantLock为例,state初始化为0,表示未锁定。A线程执行该锁的lock()操作时,会调用tryAcquire独占该锁并将state加1。此后,其他线程再tryAcquire()时就会失败,直到A线程unlock()到state=0为止,其他线程才有机会获取该锁。当然,释放锁之前,A线程自己是可以重复获取锁的(state会累加),这就是可重入。但是,获取多少次就要释放多少次,这样才能保证state回到零态。

AbstractQueuedSynchronizer继承了AbstractOwnableSynchronizer,这个基类只有一个变量exclusiveOwnerThread,表示当前占用该锁的线程,并且提供了get和set方法。

队列节点类

Node

FIFO双向同步队列

每当线程通过AQS获取锁失败时,线程将被封装成一个Node节点,通过CAS原子操作插入队列尾部。当有线程释放锁时,AQS会尝试让队头的后继节点占用锁。

JUC显式锁与AQS的关系

AQS是一个同步器,它实现了锁的基本抽象功能,该类是由模版模式来实现的。

1.ReentrantLock与AQS的组合关系

ReentrantLock是一个可重入的互斥锁,可以被单个线程多次获取。ReentrantLock把所有Lock接口的操作都委派到一个Sync类上,该类继承了AbstractQueuedSynchronizer:

static abstract class Sync extends AbstractQueuedSynchronizer { ... }

ReentrantLock支持公平锁和非公平锁。默认情况下是非公平锁。

final static class NonfairSync extends Sync { ... }
final static class FairSync extends Sync { ... }

由ReentrantLock的lock和unlock的源码可以看到,它们只是分别调用了sync对象的lock和release方法。

public void lock() {
        sync.acquire(1);
}

public void unlock() {
    sync.release(1);
}

而Sync内部类只是AQS的子类,所以本质是ReentrantLock的操作是委托给AQS完成的。

AQS的模版流程

AQS定义了两种资源共享方式:

  • Exclusive(独享锁):只有一个线程能占有锁资源,如ReentrantLock。
  • share(共享锁):多个线程可以同时占有资源,如Semaphore、CountDownLatch。

AQS为不同的资源共享方式提供了不同的模版流程,AQS提供了一种实现阻塞锁和依赖FIFO等待队列的同步器的框架。自定义的同步器只需要实现共享资源state的获取与释放方式即可,这些逻辑都编写在钩子方法中。无论是共享锁还是独占锁,AQS在执行模版流程时都会回调自定义的钩子方法。

AQS的钩子方法

自定义同步器时,AQS中需要重写的钩子方法如下:

  • tryAcquire(int):独占锁钩子,尝试获取资源,若成功则返回true,若失败则返回false。
  • tryRekease(int):独占锁钩子,尝试释放资源,若成功则返回true,若失败则返回false。
  • tryAcquireShared(int):共享锁钩子,尝试获取资源,负数表示失败;
  • isHeldExclusively():独占锁钩子,判断该线程是否正在独占资源。只有用到condition条件队列时才需要去实现它。

通过AQS实现简单的独占锁

SimpleMockLock只实现了Lock接口的两个方法:

(1)lock方法:完成显式锁的抢占。

(2)unlock方法:完成显式锁的释放。

SimpleMockLock的锁抢占和释放是委托给Sync实例的方法来实现的。在抢占锁时,AQS的acquire会调用tryAcquire钩子方法;释放锁时,AQS的release会调用tryRelease钩子方法。

内部类Sync继承AQS类时提供了一下两个钩子方法的实现:

(1)tryAcquire:将state设置为1并保存当前线程,表示互斥锁已经占用。

(2)tryRelease:将state设置为0,表示互斥锁已经被释放。

public class SimpleMockLock implements Lock {

    // 同步器实例
    private final Sync sync = new Sync();

    // 自定义的内部类:同步器
    // 直接使用 state 表示锁的状态
    // state = 0 表示锁没有被占用
    // state = 1 表示已经被占用
    private static class Sync extends AbstractQueuedSynchronizer {
        @Override
        protected boolean tryAcquire(int arg) {
            if (compareAndSetState(0, 1)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }

        @Override
        protected boolean tryRelease(int arg) {
            if (Thread.currentThread() != getExclusiveOwnerThread()) {
                throw new IllegalMonitorStateException();
            }
            if (getState() == 0) {
                throw new IllegalMonitorStateException();
            }
            // 接下来不需要使用CAS操作,因为下面的操作不存在并发场景
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }
    }

    @Override
    public void lock() {
        sync.acquire(1);
    }

    @Override
    public void lockInterruptibly() throws InterruptedException {

    }

    @Override
    public boolean tryLock() {
        return false;
    }

    @Override
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        return false;
    }

    @Override
    public void unlock() {
        sync.release(1);
    }

    @Override
    public Condition newCondition() {
        return null;
    }

    static int i = 0;

    public static void lockAndFastIncrease(Lock lock) {
        lock.lock();
        i++;
        System.out.println(i);
        lock.unlock();
    }

    public static void main(String[] args) {
        LongAdder cnt = new LongAdder();
        final int TURNS = 1000;
        final int THREADS = 10;
        final ExecutorService pool = Executors.newFixedThreadPool(THREADS);
        final SimpleMockLock lock = new SimpleMockLock();
        long start = System.currentTimeMillis();
        for (int i = 0; i < THREADS; i++) {
            pool.submit(() -> {
                try {
                    for (int j = 0; j < TURNS; j++) {
                        lockAndFastIncrease(lock);
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
            });
        }
        final long l = System.currentTimeMillis() - start;
        System.out.println("耗时:" + l);
        pool.shutdown();
    }

}

AQS锁抢占原理

流程的第一步,显式锁的lock方法会调用同步器基类AQS的模版方法acquire。acquire是AQS封装好的获取资源的公共入口,它是AQS提供的利用独占的方式获取资源的方法,源码如下

public final void acquire(int arg) {
    if(!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

acquire至少执行一次tryAcquire钩子方法,tryAcquire默认抛出一个异常,具体的获取独占资源state的逻辑需要钩子方法来实现。若调用tryAcquire尝试成功,则acquire将直接返回,表示抢到锁;若不成功,则将线程加入等待队列中。

tryAcquire流程:CAS操作state字段,将值从0改为1,若成功表示锁未被占用,返回true;若失败,则返回false。如果是重入锁,state字段值会累积,表示重入次数。

直接入队:addWaiter。在acquire模版方法中,如果钩子方法tryAcquire返回失败,就构造同步节点(独占式节点模式为Node.EXCLUSIVE),通过addWaiter方法将节点加入同步队列的队尾。

自选入队:enq。addWaiter第一次尝试在尾部添加节点失败,意味有并发抢锁发生,需要自旋。enq方法通过CAS自旋将节点添加到队列尾部。

自旋抢占:acquireQueued。节点入队之后,启动自旋锁的流程,acquireQueued的主要逻辑:当前Node节点线程在死循环中不断获取同步状态,并且在前驱结点上自旋,只有当前驱结点是头结点时才尝试获取锁。为了不浪费资源,如果头结点获取了锁,那么该节点会终止自旋,线程回去执行临界区的代码。其余处于自旋状态的线程当然也不会自旋浪费资源,而是被挂起进入阻塞状态。

Re

《Java高并发核心编程》

虽然讲源码了,但是感觉有些啰嗦,没有重点,笔记做下来感觉太注重细节,没有总结和纲领,看下《Java并发编程之美》试试。

JUC回顾之-AQS同步器的实现原理

1.什么是AQS?

     AQS的核心思想是基于volatile int state这样的volatile变量,配合Unsafe工具对其原子性的操作来实现对当前锁状态进行修改。同步器内部依赖一个FIFO的双向队列来完成资源获取线程的排队工作。

2.同步器的应用

 同步器主要使用方式是继承,子类通过继承同步器并实现它的抽象方法来管理同步状态,对同步状态的修改或者访问主要通过同步器提供的3个方法:

  • getState() 获取当前的同步状态
  • setState(int newState) 设置当前同步状态
  • compareAndSetState(int expect,int update) 使用CAS设置当前状态,该方法能够保证状态设置的原子性。

     同步器可以支持独占式的获取同步状态,也可以支持共享式的获取同步状态,这样可以方便实现不同类型的同步组件。

     同步器也是实现锁的关键,在锁的实现中聚合同步器,利用同步器实现锁的语义。

3.AQS同步队列

   同步器AQS内部的实现是依赖同步队列(一个FIFO的双向队列,其实就是数据结构双向链表)来完成同步状态的管理。

   当前线程获取同步状态失败时,同步器AQS会将当前线程和等待状态等信息构造成为一个节点(node)加入到同步队列,同时会阻塞当前线程;

   当同步状态释放的时候,会把首节点中的线程唤醒,使首节点的线程再次尝试获取同步状态。AQS是独占锁和共享锁的实现的父类。   

 

4.AQS锁的类别:分为独占锁和共享锁两种。

  • 独占锁:锁在一个时间点只能被一个线程占有。根据锁的获取机制,又分为“公平锁”和“非公平锁”。等待队列中按照FIFO的原则获取锁,等待时间越长的线程越先获取到锁,这就是公平的获取锁,即公平锁。而非公平锁,线程获取的锁的时候,无视等待队列直接获取锁。ReentrantLock和ReentrantReadWriteLock.Writelock是独占锁。
  • 共享锁:同一个时候能够被多个线程获取的锁,能被共享的锁。JUC包中ReentrantReadWriteLock.ReadLock,CyclicBarrier,CountDownLatch和Semaphore都是共享锁。

  JUC包中的锁的包括:Lock接口,ReadWriteLock接口;Condition条件,LockSupport阻塞原语。      

  AbstractOwnableSynchronizer/AbstractQueuedSynchronizer/AbstractQueuedLongSynchronizer三个抽象类,

  ReentrantLock独占锁,ReentrantReadWriteLock读写锁。CountDownLatch,CyclicBarrier和Semaphore也是通过AQS来实现的。

  下面是AQS和使用AQS实现的一些锁,以及通过AQS实现的一些工具类的架构图:

 

                         图 1.依赖AQS实现的锁和工具类

                                                                                

 

5.AQS同步器的结构:同步器拥有首节点(head)和尾节点(tail)。同步队列的基本结构如下:

 

                                                       图 1.同步队列的基本结构 compareAndSetTail(Node expect,Node update)

  • 同步队列设置尾节点(未获取到锁的线程加入同步队列): 同步器AQS中包含两个节点类型的引用:一个指向头结点的引用(head),一个指向尾节点的引用(tail),当一个线程成功的获取到锁(同步状态),其他线程无法获取到锁,而是被构造成节点(包含当前线程,等待状态)加入到同步队列中等待获取到锁的线程释放锁。这个加入队列的过程,必须要保证线程安全。否则如果多个线程的环境下,可能造成添加到队列等待的节点顺序错误,或者数量不对。因此同步器提供了CAS原子的设置尾节点的方法(保证一个未获取到同步状态的线程加入到同步队列后,下一个未获取的线程才能够加入)。  如下图,设置尾节点:

 图 2.尾节点的设置  节点加入到同步队列

  •  同步队列设置首节点(原头节点释放锁,唤醒后继节点):同步队列遵循FIFO,头节点是获取锁(同步状态)成功的节点,头节点在释放同步状态的时候,会唤醒后继节点,而后继节点将会在获取锁(同步状态)成功时候将自己设置为头节点。设置头节点是由获取锁(同步状态)成功的线程来完成的,由于只有一个线程能够获取同步状态,则设置头节点的方法不需要CAS保证,只需要将头节点设置成为原首节点的后继节点 ,并断开原头结点的next引用。如下图,设置首节点:

图 3.首节点的设置

 6.独占式的锁的获取:调用同步器的acquire(int arg)方法可以获取同步状态,该方法对中断不敏感,即线程获取同步状态失败后进入同步队列,后续对线程进行中断操作时,线程不会从同步队列中移除。

    (1) 当前线程实现通过tryAcquire()方法尝试获取锁,获取成功的话直接返回,如果尝试失败的话,进入等待队列排队等待,可以保证线程安全(CAS)的获取同步状态。

    (2) 如果尝试获取锁失败的话,构造同步节点(独占式的Node.EXCLUSIVE),通过addWaiter(Node node,int args)方法,将节点加入到同步队列的队列尾部。

    (3) 最后调用acquireQueued(final Node node, int args)方法,使该节点以死循环的方式获取同步状态,如果获取不到,则阻塞节点中的线程。acquireQueued方法当前线程在死循环中获取同步状态,而只有前驱节点是头节点的时候才能尝试获取锁(同步状态)( p == head && tryAcquire(arg))。

    原因是:1.头结点是成功获取同步状态的节点,而头结点的线程释放锁以后,将唤醒后继节点,后继节点线程被唤醒后要检查自己的前驱节点是否为头结点。

                2.维护同步队列的FIFO原则,节点进入同步队列以后,就进入了一个自旋的过程,每个节点(后者说每个线程)都在自省的观察。

 下图为节点自旋检查自己的前驱节点是否为头结点:

                              图 4 节点自旋获取同步状态

 

独占式的锁的获取源码:

acquire方法源码如下
/**
     * Acquires in exclusive(互斥) mode, ignoring(忽视) interrupts.  Implemented
     * by invoking at least once {@link #tryAcquire},
     * returning on success.  Otherwise the thread is queued(排队), possibly
     * repeatedly(反复的) blocking and unblocking, invoking {@link
     * #tryAcquire} until success.  This method can be used
     * to implement method {@link Lock#lock}.
     *
     * @param arg the acquire argument.  This value is conveyed(传达) to
     *        {@link #tryAcquire} but is otherwise uninterpreted and
     *        can represent anything you like.
     *        
     *  独占式的获取同步状态      
     *        
     */
    public final void acquire(int arg) {
        if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }


 

  尝试获取锁:tryAcquire方法:如果获取到了锁,tryAcquire返回true,反之,返回false。

//方法2:
    protected final boolean tryAcquire(int acquires) {
        // 获取当前线程
        final Thread current = Thread.currentThread();
        // 获取“独占锁”的状态,获取父类AQS的标志位
        int c = getState();
        //c == 0 意思是锁(同步状态)没有被任何线程所获取
        //1.当前线程是否是同步队列中头结点Node,如果是的话,则获取该锁,设置锁的状态,并设置锁的拥有者为当前线程
        if (c == 0) {
            if (!hasQueuedPredecessors() &&

// 修改下状态为,这里的acquires的值是1,是写死的调用子类的lock的方法的时候传进来的,如果c == 0,compareAndSetState操作会更新成功为1. compareAndSetState(0, acquires)) {
// 上面CAS操作更新成功为1,表示当前线程获取到了锁,因为将当前线程设置为AQS的一个变量中,代表这个线程拿走了锁。 setExclusiveOwnerThread(current);
return true; } } //2.如果c不为0,即状态不为0,表示锁已经被拿走。
//因为ReetrantLock是可重入锁,是可以重复lock和unlock的,所以这里还要判断一次,获取锁的线程是否为当前请求锁的线程。 else if (current == getExclusiveOwnerThread()) {
//如果是,state继续加1,这里nextc的结果就会 > 1,这个判断表示获取到的锁的线程,还可以再获取锁,这里就是说的可重入的意思
int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }

addWaiter方法的源码:回到aquire方法,如果尝试获取同步状态(锁)失败的话,则构造同步节点(独占式的Node.EXCLUSIVE),
通过addWaiter(Node node,int args)方法
将该节点加入到同步队列的队尾。

/**
    * Creates and enqueues node for current thread and given mode.
    *
    * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
    * @return the new node
    * 
    * 
    * 如果尝试获取同步状态失败的话,则构造同步节点(独占式的Node.EXCLUSIVE),通过addWaiter(Node node,int args)方法将该节点加入到同步队列的队尾。
    * 
    */
    private Node addWaiter(Node mode) {
// 用当前线程够着一个Node对象,mode是一个表示Node类型的字段,或者说是这个节点是独占的还是共享的,或者说AQS的这个队列中,哪些节点是独占的,哪些节点是共享的。 Node node
= new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure Node pred = tail;
//队列不为空的时候
if (pred != null) { node.prev = pred; // 确保节点能够被线程安全的添加,使用CAS方法
// 尝试修改为节点为最新的节点,如果修改失败,意味着有并发,这个时候进入enq中的死循环,进行“自旋”的方式修改 if (compareAndSetTail(pred, node)) { pred.next = node; return node; } }
//进入自旋 enq(node);
return node; }




enq方法的源码:同步器通过死循环的方式来保证节点的正确添加,在“死循环” 中通过CAS将节点设置成为尾节点之后,当前线程才能从该方法中返回,否则
当前线程不断的尝试设置。

enq方法将并发添加节点的请求通过CAS变得“串行化”了。
/**
     * Inserts node into queue, initializing if necessary. See picture above.
     * @param node the node to insert
     * @return node\'s predecessor
     * 
     * 同步器通过死循环的方式来保证节点的正确添加,在“死循环” 中通过CAS将节点设置成为尾节点之后,当前线程才能从该方法中返回,否则当前线程不断的尝试设置。
     * enq方法将并发添加节点的请求通过CAS变得“串行化”了。
     * 
     */
    private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

 

acquireQueued方法:在队列中的线程获取锁的过程:
/**
    * Acquires in exclusive uninterruptible mode for thread already in
    * queue. Used by condition wait methods as well as acquire.
    *
    * @param node the node
    * @param arg the acquire argument
    * @return {@code true} if interrupted while waiting
    * 
    * acquireQueued方法当前线程在死循环中获取同步状态,而只有前驱节点是头节点才能尝试获取同步状态(锁)( p == head && tryAcquire(arg))
    *     原因是:1.头结点是成功获取同步状态(锁)的节点,而头节点的线程释放了同步状态以后,将会唤醒其后继节点,后继节点的线程被唤醒后要检查自己的前驱节点是否为头结点。
    *           2.维护同步队列的FIFO原则,节点进入同步队列之后,就进入了一个自旋的过程,每个节点(或者说是每个线程)都在自省的观察。
    * 
    */
    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
//死循环检查(自旋检查)当前节点的前驱节点是否为头结点,才能获取锁
for (;;) {
// 获取节点的前驱节点
final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) {//节点中的线程循环的检查,自己的前驱节点是否为头节点
//将当前节点设置为头结点,移除之前的头节点 setHead(node); p.next
= null; // help GC failed = false; return interrupted; }
// 否则检查前一个节点的状态,看当前获取锁失败的线程是否要挂起
if (shouldParkAfterFailedAcquire(p, node) &&
//如果需要挂起,借助JUC包下面的LockSupport类的静态方法park挂起当前线程,直到被唤醒
parkAndCheckInterrupt()) interrupted = true; } } finally {
//如果有异常
if (failed)
//取消请求,将当前节点从队列中移除 cancelAcquire(node); } }


 

独占式的获取同步状态的流程如下: 

图5 独占式的获取同步状态的流程

 7.独占锁的释放:下面直接看源码:

 

 /* 
1. unlock():unlock()是解锁函数,它是通过AQS的release()函数来实现的。 * 在这里,“1”的含义和“获取锁的函数acquire(1)的含义”一样,它是设置“释放锁的状态”的参数。 * 由于“公平锁”是可重入的,所以对于同一个线程,每释放锁一次,锁的状态-1。 unlock()在ReentrantLock.java中实现的,源码如下:
*/ public void unlock() { sync.release(1); }

 

release()会调用tryRelease方法尝试释放当前线程持有的锁(同步状态),成功的话唤醒后继线程,并返回true,否则直接返回false

    /**
    * Releases in exclusive mode.  Implemented by unblocking one or
    * more threads if {@link #tryRelease} returns true.
    * This method can be used to implement method {@link Lock#unlock}.
    *
    * @param arg the release argument.  This value is conveyed to
    *        {@link #tryRelease} but is otherwise uninterpreted and
    *        can represent anything you like.
    * @return the value returned from {@link #tryRelease}
    * 
    * 
    * 
    */
    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

 

 // tryRelease() 尝试释放当前线程的同步状态(锁)
  protected final boolean tryRelease(int releases) {
            //c为释放后的同步状态
          int c = getState() - releases;
          //判断当前释放锁的线程是否为获取到锁(同步状态)的线程,不是抛出异常(非法监视器状态异常)
          if (Thread.currentThread() != getExclusiveOwnerThread())
              throw new IllegalMonitorStateException();
          boolean free = false;
          //如果锁(同步状态)已经被当前线程彻底释放,则设置锁的持有者为null,同步状态(锁)变的可获取
          if (c == 0) {
              free = true;
              setExclusiveOwnerThread(null);
          }
          setState(c);
          return free;
      }
      

 释放锁成功后,找到AQS的头结点,并唤醒它即可:

// 4. 唤醒头结点的后继节点
     
     private void unparkSuccessor(Node node) {
         //获取头结点(线程)的状态
        int ws = node.waitStatus;
        //如果状态<0,设置当前线程对应的锁的状态为0
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);
            
        Node s = node.next;
        
         //解释:Thread to unpark is held in successor, which is normally just the next node. 
         //But if cancelled or apparently(显然) null, traverse backwards(向后遍历) from tail to find the actual(实际的) non-cancelled successor(前继节点).
         //从队列尾部开始往前去找最前面的一个waitStatus小于0的节点。
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        //唤醒后继节点对应的线程
        if (s != null)
            LockSupport.unpark(s.thread);
    }
    

 上面说的是ReentrantLock的公平锁获取和释放的AQS的源码,唯独还剩下一个非公平锁NonfairSync没说,其实,它和公平锁的唯一区别就是获取锁的方式不同,公平锁是按前后顺序一次获取锁,非公平锁是抢占式的获取锁,那ReentrantLock中的非公平锁NonfairSync是怎么实现的呢?

 /**
     * Sync object for non-fair locks
     */
    static final class NonfairSync extends Sync {
        private static final long serialVersionUID = 7316153563782823691L;

        /**
         * Performs lock.  Try immediate barge, backing up to normal
         * acquire on failure.
         */
        final void lock() {
            if (compareAndSetState(0, 1))
                setExclusiveOwnerThread(Thread.currentThread());
            else
                acquire(1);
        }

        protected final boolean tryAcquire(int acquires) {
            return nonfairTryAcquire(acquires);
        }
    }

非公平锁的lock的时候多了上面加粗的代码:在lock的时候先直接用cas判断state变量是否为0(尝试获取锁),成功的话更新成1,表示当前线程获取到了锁,不需要在排队,从而直接抢占的目的。而对于公平锁的lock方法是一开始就走AQS的双向队列排队获取锁。更详细的关于ReentrantLock的实现请看后面写的一篇文章:http://www.cnblogs.com/200911/p/6035765.html

 

 总结:在获取同步状态的时候,同步器维护一个同步队列,获取失败的线程会被加入到队列中并在队列中自旋;移除队列(或停止自旋)的条件是前驱节点为头结点并且获取到了同步状态。在释放同步状态时,同步器调用tryRelease(int args)方法释放同步状态,然后唤醒头结点的后继节点。AQS的实现思路其实并不复杂,用一句话准确的描述的话,其实就是使用标志状态位status(volatile int state)和 一个双向队列的入队和出队来实现。AQS维护一个线程何时访问的状态,它只是对状态负责,而这个状态的含义,子类可以自己去定义。

 

 

 

自己注释的AQS的源码:如下:

 

 

public class AbstractQueuedSynchronizerTest {

    /**
     * 
     * (AQS节点的定义,同步队列的节点定义)
     *
     * <p>
     * 修改历史:                                            <br>  
     * 修改日期            修改人员       版本             修改内容<br>  
     * -------------------------------------------------<br>  
     * 2016年7月4日 上午10:26:38   user     1.0        初始化创建<br>
     * </p> 
     *
     * @author        Peng.Li 
     * @version        1.0  
     * @since        JDK1.7
     */
    static final class Node {

        /** Marker to indicate a node is waiting in shared mode */
        static final Node SHARED = new Node();
        /** Marker to indicate a node is waiting in exclusive mode 
         * 
         * */
        static final Node EXCLUSIVE = null;

        /** waitStatus value to indicate thread has cancelled 
         *     在同步队列中等待的线程等待超时或者被中断,需要从同步队列中取消等待
         * */
        static final int CANCELLED = 1;

        /** waitStatus value to indicate successor\'s thread needs unparking(唤醒)
         *     后继节点的线程处于等待状态,而当前的节点如果释放了同步状态或者被取消,将会通知后继节点,使后继节点的线程得以运行。
         **/
        static final int SIGNAL = -1;

        /** waitStatus value to indicate thread is waiting on condition 
         *  节点在等待队列中,节点的线程等待在Condition上,当其他线程对Condition调用了signal()方法后,该节点会从等待队列中转移到同步队列中,加入到同步状态的获取中
         **/
        static final int CONDITION = -2;
        /**
         * waitStatus value to indicate the next acquireShared should
         * unconditionally(无条件的) propagate(传播)
         * 
         * 表示下一次共享式同步状态获取将会被无条件地传播下去
         */
        static final int PROPAGATE = -3;

        /**
         * Status field, taking on only the values:
         *   SIGNAL:     The successor of this node is (or will soon be)
         *               blocked (via park), so the current node must
         *               unpark its successor when it releases or
         *               cancels. To avoid races, acquire methods must
         *               first indicate they need a signal,
         *               then retry the atomic acquire, and then,
         *               on failure, block.
         *   CANCELLED:  This node is cancelled due to timeout or interrupt.
         *               Nodes never leave this state. In particular,
         *               a thread with cancelled node never again blocks.
         *   CONDITION:  This node is currently on a condition queue.
         *               It will not be used as a sync queue node
         *               until transferred, at which time the status
         *               will be set to 0. (Use of this value here has
         *               nothing to do with the other uses of the
         *               field, but simplifies mechanics.)
         *   PROPAGATE:  A releaseShared should be propagated(传播) to other
         *               nodes. This is set (for head node only) in
         *               doReleaseShared to ensure propagation
         *               continues, even if other operations have
         *               since intervened(干涉).
         *   0:          None of the above
         *
         * The values are arranged numerically to simplify use.
         * Non-negative values mean that a node doesn\'t need to
         * signal. So, most code doesn\'t need to check for particular
         * values, just for sign.
         *
         * The field is initialized to 0 for normal sync nodes, and
         * CONDITION for condition nodes.  It is modified using CAS
         * (or when possible, unconditional volatile writes).
         * 
         * 使用CAS更改状态,volatile保证线程可见性,即被一个线程修改后,状态会立马让其他线程可见。
         * 
         */
        volatile int waitStatus;

        /**
         * Link to predecessor node that current node/thread relies on
         * for checking waitStatus. Assigned during enqueing(入队), and nulled
         * out (for sake of GC) only upon dequeuing.  Also, upon
         * cancellation of a predecessor, we short-circuit while
         * finding a non-cancelled one, which will always exist
         * because the head node is never cancelled: A node becomes
         * head only as a result of successful acquire. A
         * cancelled thread never succeeds in acquiring, and a thread only
         * cancels itself, not any other node.
         * 
         * 前驱节点,当前节点加入到同步队列中被设置
         */
        volatile Node prev;

        /**
         * Link to the successor node that the current node/thread
         * unparks upon release. Assigned during enqueuing, adjusted
         * when bypassing cancelled predecessors, and nulled out (for
         * sake of GC) when dequeued.  The enq operation does not
         * assign next field of a predecessor until after attachment,
         * so seeing a null next field does not necessarily mean that
         * node is at end of queue. However, if a next field appears
         * to be null, we can scan prev\'s from the tail to
         * double-check.  The next field of cancelled nodes is set to
         * point to the node itself instead of null, to make life
         * easier for isOnSyncQueue.
         * 
         * 后继节点
         */
        volatile Node next;

        /**
         * The thread that enqueued this node.  Initialized on
         * construction and nulled out after use.
         * 
         * 获取同步状态的线程
         */
        volatile Thread thread;

        /**
         * Link to next node waiting on condition, or the special
         * value SHARED.  Because condition queues are accessed only
         * when holding in exclusive(独有的) mode, we just need a simple
         * linked queue to hold nodes while they are waiting on
         * conditions. They are then transferred(移动到) to the queue(同步队列) to
         * re-acquire. And because conditions can only be exclusive,
         * we save a field by using special value to indicate shared
         * mode.
         * 
         * 等待队列中的后继节点,如果当前节点是共享的,那么这个字段是一个SHARED常量,
         * 也就是说节点类型(独占和共享)和等待队列中的后继节点共用同一个字段。
         */
        Node nextWaiter;

        /**
         * Returns true if node is waiting in shared mode
         */
        final boolean isShared() {
            return nextWaiter == SHARED;
        }

        /**
         * Returns previous node, or throws NullPointerException if null.
         * Use when predecessor cannot be null.  The null check could
         * be elided, but is present to help the VM.
         *
         * @return the predecessor of this node
         */
        final Node predecessor() throws NullPointerException {
            Node p = prev;
            if (p == null)
                throw new NullPointerException();
            else
                return p;
        }

        Node() { // Used to establish initial head or SHARED marker
        }

        Node(Thread thread, Node mode) { // Used by addWaiter
            this.nextWaiter = mode;
            this.thread = thread;
        }

        Node(Thread thread, int waitStatus) { // Used by Condition
            this.waitStatus = waitStatus;
            this.thread = thread;
        }
    }

    /**
     * Head of the wait queue, lazily initialized.  Except for (除...以外)
     * initialization(初始化), it is modified only via method setHead.  Note:
     * If head exists, its waitStatus is guaranteed not to be
     * CANCELLED.(如果head引用已经存在,等待状态保证不会被取消)
     */
    private transient volatile Node head;

    /**
     * Tail of the wait queue(等待队列), lazily initialized.  Modified only via
     * method enq to add new wait node.
     */
    private transient volatile Node tail;

    /**
     * The synchronization state.
     * 同步状态,线程可见的,共享内存里面保存
     * 
     */
    private volatile int state;

    /**
     * Returns the current value of synchronization state.
     * This operation has memory semantics of a <tt>volatile</tt> read.
     * @return current state value
     * 
     * 得到同步状态的值
     * 
     */
    protected final int getState() {
        return state;
    }

    /**
     * Sets the value of synchronization state.
     * This operation has memory semantics of a <tt>volatile</tt> write.
     * @param newState the new state value
     */
    protected final void setState(int newState) {
        state = newState;
    }

    /**
     * Acquires in exclusive(互斥) mode, ignoring(忽视) interrupts.  Implemented
     * by invoking at least once {@link #tryAcquire},
     * returning on success.  Otherwise the thread is queued(排队), possibly
     * repeatedly(反复的) blocking and unblocking, invoking {@link
     * #tryAcquire} until success.  This method can be used
     * to implement method {@link Lock#lock}.
     *
     * @param arg the acquire argument.  This value is conveyed(传达) to
     *        {@link #tryAcquire} but is otherwise uninterpreted and
     *        can represent anything you like.
     *        
     *  独占式的获取同步状态      
     *        
     */
    public final void acquire(int arg) {
        if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

    protected boolean tryAcquire(int arg) {
        throw new UnsupportedOperationException();
    }

    /**
    * Creates and enqueues node for current thread and given mode.
    *
    * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
    * @return the new node
    * 
    * 
    * 如果尝试获取同步状态失败的话,则构造同步节点(独占式的Node.EXCLUSIVE),通过    addWaiter(Node node,int args)方法将该节点加入到同步队列的队尾。

    * 
    */
    private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            // 确保节点能够被安全的添加
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }

    /**
     * Convenience method to interrupt current thread.
     * 分析:如果在acquireQueued()中,当前线程被中断过,则执行selfInterrupt();否则不会执行。
     * 线程在阻塞状态被“中断唤醒”而获取CPU的执行权;但是该线程前面还有其他等待锁的线程,根据公平性原则,该线程仍然无法获取到锁,他会再次阻塞。
     * 直到该线程被他前面等待锁的线程唤醒;线程才会获取锁。该线程“成功获取锁并真正执行起来之前”,他的中断会被忽略并且中断标记会被清除,因为在parkAndCheckInterrupt()中,
     * 我们线程的中断状态时调用了Thread.interrupted(),这个函数在返回中断状态之后,还会清除中断状态,正因为清除了中断状态,所以在selfInterrupt重新产生一个中断。
     * 
     * 
     * 当前线程自己产生一个中断
     */
    private static void selfInterrupt() {
        Thread.currentThread().interrupt();
    }

    /**
    * Acquires in exclusive uninterruptible mode for thread already in
    * queue. Used by condition wait methods as well as acquire.
    *
    * @param node the node
    * @param arg the acquire argument
    * @return {@code true} if interrupted while waiting
    * 
    * acquireQueued方法当前线程在死循环中获取同步状态,而只有前驱节点是头节点才能尝试获取同步状态( p == head && tryAcquire(arg))
    *     原因是:1.头结点是成功获取同步状态的节点,而头节点的线程释放了同步状态以后,将会唤醒其后继节点,后继节点的线程被唤醒后要检查自己的前驱节点是否为头结点。
    *           2.维护同步队列的FIFO原则,节点进入同步队列之后,就进入了一个自旋的过程,每个节点(或者说是每个线程)都在自省的观察。
    * 
    */
    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = fal

以上是关于AQS抽象同步器的核心原理与实践的主要内容,如果未能解决你的问题,请参考以下文章

AbstractQueuedSynchronizer 原理分析 - 独占/共享模式

AbstractQueuedSynchronizer 原理分析 - 独占/共享模式(转)

原来 AQS实现原理还能如此总结

抽象同步队列AQS

并发编程-并发容器(J.U.C)核心 AbstractQueuedSynchronizer 抽象队列同步器AQS介绍

JUC多线程:AQS抽象队列同步器原理