AQS详解之独占锁模式

Posted fengyun2050

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了AQS详解之独占锁模式相关的知识,希望对你有一定的参考价值。

AQS介绍

AbstractQueuedSynchronizer简称AQS,即队列同步器。它是JUC包下面的核心组件,它的主要使用方式是继承,子类通过继承AQS,并实现它的抽象方法来管理同步状态,它分为独占锁和共享锁。很多同步组件都是基于它来实现的,比如我门常见的ReentrantLock,它是基于AQS的独占锁实现的,它表示每次只能有一个线程持有锁。在比如ReentrantReadWriteLock它是基于AQS的共享锁实现的,它允许多个线程同时获取锁,并发的访问资源。AQS是建立在CAS上的一种FIFO的双向队列,它通过维护一个int类型的state,这个state是用volatile来修饰,从而保证状态的安全行。

AQS对于状态的更改提供了3个方法:

  1. getState() :返回同步状态的当前值

  2. setState() : 设置当前同步状态

  3. compareAndSetState():使用CAS设置当前状态,该方法能够保证状态的原子性。它是通过Unsafe这个类中的native方法来保证的。

如果请求的共享资源空闲,那么就把当前请求的线程设置为工作线程,并且将共享资源设置为锁定状态。如果被请求的共享资源占用,那么需要一套线程阻塞等待以及唤醒的锁的分配机制。那么这套机制AQS是用CLH队列锁实现的,获取不到锁的线程将加入到队列中。AQS内部维护的一个同步队列,获取失败的线程会加入到队列中进行自旋,移除队列条件是前驱节点是头节点并且成功获取到了同步状态,释放同步状态AQS会调用unparkSuccessor方法唤醒后继节点。

 

AQS队列内部维护的是一个FIFO的双向链表,如下图。这种结构的特点是每个数据结构都有2个指针,分别指向直接前驱节点和直接的后继节点。这种结构可以从任意的一个节点开始很方便的访问前驱和后继节点。每个Node由线程封装,当竞争失败后会加入到AQS队列

 

下面具体看一下Node组成:

static final class Node {
   /** 表示节点正处于共享模式下等待标记 */
   static final Node SHARED = new Node();
   /** 表示节点处于独占锁模式的等待标记 */
   static final Node EXCLUSIVE = null;
   /** waitStatus值,表示线程取消 */
   static final int CANCELLED =  1;
   /** waitStatus值,表示线程需要挂起 */
   static final int SIGNAL    = -1;
   /** waitStatus值,表示线程处于等待条件*/
   static final int CONDITION = -2;
   /**waitStatus值,表示下一个共享模式应该无条件传播*/
   static final int PROPAGATE = -3;
   /**状态字段*/
   volatile int waitStatus;
   /**前驱节点 */
   volatile Node prev;
   /**后继节点 */
   volatile Node next;
   /**当前线程*/
   volatile Thread thread;
   /**将此节点入列的线程,用来指向下一个节点*/
   Node nextWaiter;
   /**如果节点在共享模式下等待,则返回true*/
   final boolean isShared() {
       return nextWaiter == SHARED;
  }
   /**返回上一个节点,如果为null则抛出异常,前驱节点不是null使用 */
   final Node predecessor() throws NullPointerException {
       Node p = prev;
       if (p == null)
           throw new NullPointerException();
       else
           return p;
  }
   Node() {    // 用于建立初始化head节点
  }
   Node(Thread thread, Node mode) {     // 由addWaiter使用
       this.nextWaiter = mode;
       this.thread = thread;
  }
   Node(Thread thread, int waitStatus) { // 由Condition使用
       this.waitStatus = waitStatus;
       this.thread = thread;
  }
}

加入队列的过程必须是线程安全的,所以AQS提供了一个基于CAS设置尾节点的方法compareAndSetTail,这个也是unsafe类中的native方法。它需要传入当前线程的认为的尾节点和当前节点,当设置成功后,当前节点和尾部节点建立关联,当前节点正式加入到队列。

出队列设置头节点也必须要要保证线程安全的,AQS是通过CAS设置头节点的compareAndSetHead这个方法来实现的。此方法也是unsafe类型中的native方法。

 

AQS重要方法

AQS使用了模版方法模式,自定义同步器需要重写下面的几个AQS提供的模版方法:

isHeldExclusively()//该线程是否处于独占资源。只有用到condition才需要实现它.
tryAcquire(int)//独占方式获取资源,成功返回true,失败返回false
tryRelease(int)//独占方式释放资源,成功返回true,失败返回false
tryAcquireShared(int)//共享方式获取资源。负数表示失败,0表示成功但是没有剩余可用资源;正数表示成功且有剩余资源
tryReleaseShared(int)//共享方式释放资源.成功返回true,失败返回false.

独占锁的获取是通过AQS提供的acquire()。我门看一下这个方法的源代码:

public final void acquire(int arg) {
   if (!tryAcquire(arg) &&
       acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
       selfInterrupt();
}

发现acquire()获取同步状态成功与否做了2件事情。1成功,方法结束返回,2失败,会将当前线程加入到同步队列,它是通过调用addWaiter()和acquireQueued()方法实现的,我门继续看一下这2个方法的源代码.

private Node addWaiter(Node mode) {
   Node node = new Node(Thread.currentThread(), mode);
   Node pred = tail;
   if (pred != null) {
       node.prev = pred;
       if (compareAndSetTail(pred, node)) {
           pred.next = node;
           return node;
      }
  }
   enq(node);
   return node;
}

通过方法会发现它会先把当前线程封装为Node类型,然后判断尾节点是否为空,如果不为空进行CAS操作入队列,如果为空,那么会调用enq()这个方法,此方法做了通过不断的for循环自旋CAS尾插入节点。

现在我门已经明白独占锁获取失败入队列的过程了,那么对于同步队列的节点会做什么事情来保证自己有机会获取独占锁呢?我门来看一下acquireQueued()这个方法的源代码

final boolean acquireQueued(final Node node, int arg) {
   boolean failed = true;
   try {
       boolean interrupted = false;
       for (;;) {
           final Node p = node.predecessor();//获取前驱节点
           if (p == head && tryAcquire(arg)) {//当前节点是头节点并且成功获取到同步状态,那么获取到锁
               setHead(node);                 
               p.next = null; // help GC
               failed = false;
               return interrupted;
          }
           if (shouldParkAfterFailedAcquire(p, node) &&
               parkAndCheckInterrupt())//获取失败调用的方法
               interrupted = true;
      }
  } finally {
       if (failed)
           cancelAcquire(node);
  }
}

从源代码我门可以看出来这是一个自旋过程(for(;;)),它首先获取当前节点的前驱节点,然后判断当前节点能否获取独占锁,如果前驱节点是头节点并且获取同步状态,那么就可以获取到独占锁。如果获取锁失败线程会进入等待状态等待获取独占锁。

shouldParkAfterFailedAcquire()这个方法主要的逻辑是调用compareAndSetWaitStatus(),使用CAS将节点状态由INITIAL设置为SIGNAL。如果失败会返回false,通过acquireQueued()的自旋转会继续设置,直到设置成功。设置成功后调用parkAndCheckInterrupt()方法,此方法会调用LockSupport.park(this)让该线程阻塞。到此独占锁获取过程已经分析完毕了。

独占锁的释放是用relase()方法,我门来看一下源代码

public final boolean release(int arg) {
   if (tryRelease(arg)) {
       Node h = head;
       if (h != null && h.waitStatus != 0)
           unparkSuccessor(h);
       return true;
  }
   return false;
}

这段代码的逻辑就很容易理解了,如果同步状态释放成功,则执行if语句内的代码,当head不为空并且状态不为0的时候会执行unparkSuccessor()方法,unparkSuccessor方法会执行LookSupport.unpark()方法.每一次释放锁就会唤醒队列中该节点的后继节点,可以进一步的说明获取锁是一个先进先出的过程.

以上是关于AQS详解之独占锁模式的主要内容,如果未能解决你的问题,请参考以下文章

AQS 详解之共享锁模式

Java并发:深入浅出AQS之共享锁模式源码分析

Java并发之AQS源码分析

深入理解AQS(二)- 共享模式

AQS

jdk1.8 J.U.C并发源码阅读------AQS之独占锁的获取与释放