AQS (Abstract Queued Synchronizer)源码解析 -- 独占锁与共享锁的加锁与解锁

Posted 小脑斧科技博客

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了AQS (Abstract Queued Synchronizer)源码解析 -- 独占锁与共享锁的加锁与解锁相关的知识,希望对你有一定的参考价值。

1. 概述

本文,我们来介绍 java 中线程同步的另一个基础工具类 — AQS。

AQS (Abstract Queued Synchronizer) 是 JDK 提供的一套基于 FIFO 同步队列的阻塞锁和相关同步器的一个同步框架,通过 AQS 我们可以很容易地实现我们自己需要的独占锁或共享锁。
java 中,我们曾经介绍过的信号量、ReentrantLock、CountDownLatch 等工具都是通过 AQS 来实现的。

2. AQS — Abstract Queued Synchronizer

AQS 又称为队列同步器,java 是通过 AbstractQueuedSynchronizer 类来实现其思想的。
他通过一个 int 类型的成员变量 state 来控制同步状态,state = 0 时,则说明没有任何线程占用锁,当 state = 1 时,则说明有一个线程目前正在占用锁。
它支持实现共享锁与独占锁,下面我们就从源码来剖析,分析一下 AQS 的实现原理。

3. 类图

AbstractQueuedSynchronizer 继承自抽象类 AbstractOwnableSynchronizer,AbstractOwnableSynchronizer 这个类定义了存储独占当前锁的线程和获取的方法。
AbstractQueuedSynchronizer 类通过内部类 Node 构成的 FIFO 同步队列来完成线程获取锁的排队工作。
同时,AbstractQueuedSynchronizer 通过 ConditionObject 来构建等待队列。

4. Node

static final class Node {
   // 共享模式
   static final Node SHARED = new Node();
   // 独占模式
   static final Node EXCLUSIVE = null;

   // 标识线程已处于结束状态
   static final int CANCELLED =  1;
   // 等待被唤醒状态
   static final int SIGNAL    = -1;
   // 条件状态
   static final int CONDITION = -2;
   // 在共享模式中使用表示获得的同步状态会被传播
   static final int PROPAGATE = -3;

   // 等待状态, 存在CANCELLED、SIGNAL、CONDITION、PROPAGATE 4种
   volatile int waitStatus;

   // 同步队列中前驱结点
   volatile Node prev;

   // 同步队列中后继结点
   volatile Node next;

   // 请求锁的线程
   volatile Thread thread;

   // 等待队列中的后继结点
   Node nextWaiter;

   //判断是否为共享模式
   final boolean isShared() {
       return nextWaiter == SHARED;
   }

   //获取前驱结点
   final Node predecessor() throws NullPointerException {
       Node p = prev;
       if (p == null)
           throw new NullPointerException();
       else
           return p;
   }

   //.....
}

Node 类是一个典型的双向链表元素结构,拥有前驱和后继的引用。
其中 SHARED 和 EXCLUSIVE 常量分别代表共享模式和独占模式,所谓共享模式是一个锁允许多条线程同时操作,如信号量 Semaphore 采用的就是基于 AQS 的共享模式实现的。
而独占模式则是同一个时间段只能有一个线程对共享资源进行操作,多余的请求线程需要排队等待,如 ReentranLock。

AbstractQueuedSynchronizer waitStatus 取值

常量 意义
CANCELLED 1 同步队列中等待的线程等待超时或被中断,需要从同步队列中取消该Node的结点,节点终极状态
SIGNAL -1 等待唤醒状态的后继结点,当其前继结点的线程释放了同步锁或被取消,将会通知该后继结点的线程执行
CONDITION -2 该标识的结点处于等待队列中,结点的线程等待在Condition上,当其他线程调用了Condition的signal()方法后,CONDITION状态的结点将从等待队列转移到同步队列中,等待获取同步锁
PROPAGATE -3 在共享模式中,该状态标识结点的线程处于可运行状态
0 0 初始状态

5. AbstractQueuedSynchronizer 的模板方法

AbstractQueuedSynchronizer 类作为抽象的基础框架类,通过定义模板方法的方式提供了一套实现锁的模板,其最基本的锁实现方式需要子类复写模板:

protected boolean tryAcquire(int arg);            // 获取独占锁
protected boolean tryRelease(int arg);            // 释放独占锁
protected int tryAcquireShared(int arg);        // 获取共享锁
protected boolean tryReleaseShared(int arg);    // 释放共享锁
protected boolean isHeldExclusively();            // 判断是否持有独占锁

ReentrantLock、Semaphore、CountDownLatch 等工具都是通过复写上述若干方法实现的。

6. 独占模式获取锁操作 — acquire

AbstractQueuedSynchronizer 通过 acquire 方法获取锁。
下图是整个加锁过程的流程图。

AQS (Abstract Queued Synchronizer)源码解析 -- 独占锁与共享锁的加锁与解锁

通过整个流程,我们看到,独占模式获取锁的原则就是:

  • 获取锁失败的节点插入队尾
  • 队首不存储任何信息
  • 队首的后继非 CANCELLED 状态节点是唯一有权利获取锁的节点

6.1. acquire 方法

public final void acquire(int arg) {
   if (!tryAcquire(arg) &&
           acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
       selfInterrupt();
}

acquire 方法调用了我们上面已经提到的需要子类复写的获取独占锁方法 tryAcquire,框架只定义了获取失败后如何处理 — addWaiter。

6.2. 获取锁失败后入队操作 — addWaiter

private Node addWaiter(Node mode) {
   Node node = new Node(Thread.currentThread(), mode);
   // Try the fast path of enq; backup to full enq on failure
   Node pred = tail;
   if (pred != null) {
       node.prev = pred;
       if (compareAndSetTail(pred, node)) {
           pred.next = node;
           return node;
       }
   }
   enq(node);
   return node;
}

这是一个典型的双向队列节点入队操作,毋庸细说,他通过我们已经讲到过的 CAS 操作实现了尾节点的判断和更新。

private final boolean compareAndSetTail(Node expect, Node update) {
   return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
}

而兜底方法 enq 有两种可能被调用。
1. 当前队列为空,尚没有任何元素存在
2. 并发环境下,原子操作 compareAndSetTail 失败

因此,enq 方法中承担了两部分工作的处理:

private Node enq(final Node node) {
   for (;;) {
       Node t = tail;
       if (t == null) { // Must initialize
           if (compareAndSetHead(new Node()))
               tail = head;
       } else {
           node.prev = t;
           if (compareAndSetTail(t, node)) {
               t.next = node;
               return t;
           }
       }
   }
}

可以看到,enq 方法中通过原子操作和循环的方式实现了并发环境下尾节点的添加,之所以这部分逻辑从 addWaiter 中抽离,是因为大部分情况下是不会出现上述两种可能的,将正常业务逻辑与特殊且不常用业务逻辑分离,是值得学习的一种代码组织方式。

6.3. 重新获取锁及更新节点状态 — acquireQueued

如上所述,并发环境下有可能在插入列表之前尚需要等待锁,但在插入列表后,马上又可以获取到锁,因此此时再次获取锁就可以减少不必要的等待。
于是,在上述 addWaiter 方法执行完,又执行了 acquireQueued 方法。

final boolean acquireQueued(final Node node, int arg) {
   boolean failed = true;
   try {
       boolean interrupted = false;
       // 自旋操作
       for (;;) {
           // 获取当前插入节点的前置节点
           final Node p = node.predecessor();
           // 如果当前节点的前置节点是 head 则重新获取锁
           if (p == head && tryAcquire(arg)) {
               setHead(node);
               p.next = null; // help GC
               failed = false;
               return interrupted;
           }
           // 如果需要线程进入等待,则等待
           if (shouldParkAfterFailedAcquire(p, node) &&
                   parkAndCheckInterrupt())
               interrupted = true;
       }
   } finally {
       if (failed)
           cancelAcquire(node);
   }
}

最终,通过 shouldParkAfterFailedAcquire 方法判断是否需要让线程挂起,如果需要挂起,则调用 parkAndCheckInterrupt 方法挂起线程。
线程的挂起最终是通过 Unsafe 的 static native 方法 park 实现的。

6.4. 节点出队 — cancelAcquire

最终,无论是线程获取锁过程中发生异常还是超时唤醒,都需要将当前 node 出队,这就是 cancelAcquire 方法做的事。

private void cancelAcquire(Node node) {
   // Ignore if node doesn't exist
   if (node == null)
       return;

   node.thread = null;

   // Skip cancelled predecessors
   Node pred = node.prev;
   while (pred.waitStatus > 0)
       node.prev = pred = pred.prev;

   // predNext is the apparent node to unsplice. CASes below will
   // fail if not, in which case, we lost race vs another cancel
   // or signal, so no further action is necessary.
   Node predNext = pred.next;

   // Can use unconditional write instead of CAS here.
   // After this atomic step, other Nodes can skip past us.
   // Before, we are free of interference from other threads.
   node.waitStatus = Node.CANCELLED;

   // If we are the tail, remove ourselves.
   if (node == tail && compareAndSetTail(node, pred)) {
       compareAndSetNext(pred, predNext, null);
   } else {
       // If successor needs signal, try to set pred's next-link
       // so it will get one. Otherwise wake it up to propagate.
       int ws;
       if (pred != head &&
               ((ws = pred.waitStatus) == Node.SIGNAL ||
                (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
               pred.thread != null) {
           Node next = node.next;
           if (next != null && next.waitStatus <= 0)
               compareAndSetNext(pred, predNext, next);
       } else {
           unparkSuccessor(node);
       }

       node.next = node; // help GC
   }
}

可以看到,上面这段代码处理了三种情况:
1. node 是队尾节点
2. node 的前驱不是队首节点,node 也不是队尾节点
3. node 的前驱是队首节点

其中,对于情况 1 和情况 2,都进行了 node 的出队,但是第三种情况却没有执行出队方法,这是为什么呢?
原因是当前节点已经被标记为 CANCELLED 状态,那么,在后继节点执行 shouldParkAfterFailedAcquire 方法时,会先让其前驱节点,也就是我们当前的 node 节点出队。

6.5. 唤醒后继节点 — unparkSuccessor

由上所述,当 node 是队首节点的后继时,执行了 unparkSuccessor 方法。
当前节点是队列中唯一等待锁的节点,所以必须让出获取锁的权限,让他的后继去获取锁,这就是 unparkSuccessor 方法做的事情。

private void unparkSuccessor(Node node) {
   /*
    * If status is negative (i.e., possibly needing signal) try
    * to clear in anticipation of signalling.  It is OK if this
    * fails or if status is changed by waiting thread.
    */

   int ws = node.waitStatus;
   if (ws < 0)
       compareAndSetWaitStatus(node, ws, 0);

   /*
    * Thread to unpark is held in successor, which is normally
    * just the next node.  But if cancelled or apparently null,
    * traverse backwards from tail to find the actual
    * non-cancelled successor.
    */

   Node s = node.next;
   if (s == null || s.waitStatus > 0) {
       s = null;
       for (Node t = tail; t != null && t != node; t = t.prev)
           if (t.waitStatus <= 0)
               s = t;
   }
   if (s != null)
       LockSupport.unpark(s.thread);
}

代码通过循环的方式,找到了队列中 node 节点后面第一个状态不为 CANCELLED 的节点,执行 LockSupport.unpark 唤醒了该节点对应的线程。
LockSupport.unpark 最终调用了 UNSAFE 类的 unpark 方法。

public static void unpark(Thread thread) {
   if (thread != null)
       UNSAFE.unpark(thread);
}

那么,被唤醒的线程执行了什么呢?我们要找到该线程是什么时候被挂起的,那就是在上述的 acquireQueued 方法中,我们回看 acquireQueued 方法,被唤醒的线程在执行完 Thread.interrupted() 方法后继续循环,尝试获取锁,从而保证了锁的独占与竞争。
parkAndCheckInterrupt 方法并没有对 Thread.interrupted() 的返回值做任何处理,由此可见,acquire 方法获取锁失败的线程是不能被 interrupt 方法中断的。

7. 可中断独占锁加锁 — acquireInterruptibly

上面提到,acquire 方法获取锁失败的线程是不能被 interrupt 方法中断的,AQS 还提供了另一个方法 acquireInterruptibly 加锁,从而让获取锁失败等待的线程可以被中断。

public final void acquireInterruptibly(int arg)
   throws InterruptedException
{
   if (Thread.interrupted())
       throw new InterruptedException();
   if (!tryAcquire(arg))
       doAcquireInterruptibly(arg);
}

7.1. 可中断独占锁加锁失败处理 — doAcquireInterruptibly

可以看到,这里获取锁失败后调用了 doAcquireInterruptibly 方法,doAcquireInterruptibly 方法与 acquire 的结构非常像:

private void doAcquireInterruptibly(int arg)
   throws InterruptedException
{
   final Node node = addWaiter(Node.EXCLUSIVE);
   boolean failed = true;
   try {
       for (;;) {
           final Node p = node.predecessor();
           if (p == head && tryAcquire(arg)) {
               setHead(node);
               p.next = null; // help GC
               failed = false;
               return;
           }
           if (shouldParkAfterFailedAcquire(p, node) &&
               parkAndCheckInterrupt())
               throw new InterruptedException();
       }
   } finally {
       if (failed)
           cancelAcquire(node);
   }
}

可以看到,这里仍然是通过独占模式获取锁,但是在 parkAndCheckInterrupt 返回后,与此前 acquire 方法中继续循环的方式不同,取而代之的,抛出了 InterruptedException 异常,中断线程执行。

8. 独占锁的解锁 — release

独占锁的解锁较为简单,因为加锁成功后,该线程对应的节点已经从同步队列中移除,此处如果解锁成功,只需要唤醒下一个节点去竞争锁即可。

AQS (Abstract Queued Synchronizer)源码解析 -- 独占锁与共享锁的加锁与解锁

public final boolean release(int arg) {
   if (tryRelease(arg)) {
       Node h = head;
       if (h != null && h.waitStatus != 0)
           unparkSuccessor(h);
       return true;
   }
   return false;
}

这里仍然调用了 unparkSuccessor 方法,通过循环的方式,找到了队列中 node 节点后面第一个状态不为 CANCELLED 的节点,执行 LockSupport.unpark 唤醒线程。

9. 共享锁的加锁 — acquireShared

从图上可以看到,共享锁的加锁主要做了下面三项工作:
1. 当线程调用acquireShared()申请获取锁资源时,如果成功,则进入临界区。
2. 当获取锁失败时,则创建一个共享类型的节点并进入一个FIFO等待队列,然后被挂起等待唤醒。
3. 当队列中的等待线程被唤醒以后就重新尝试获取锁资源,如果成功则唤醒后面还在等待的共享节点并把该唤醒事件传递下去,即会依次唤醒在该节点后面的所有共享节点,然后进入临界区,否则继续挂起等待。

9.1. 获取锁传递与唤醒 — setHeadAndPropagate

整个流程与独占锁的加锁流程非常类似,最大的区别就是 setHeadAndPropagate 方法,这个方法做的事情就是我们上面提到的:

  • 如果成功则唤醒后面还在等待的共享节点并把该唤醒事件传递下去,即会依次唤醒在该节点后面的所有共享节点,然后进入临界区,否则继续挂起等待
//两个入参,一个是当前成功获取共享锁的节点,一个就是tryAcquireShared方法的返回值,注意上面说的,它可能大于0也可能等于0
private void setHeadAndPropagate(Node node, int propagate) {
   Node h = head; //记录当前头节点
   //设置新的头节点,即把当前获取到锁的节点设置为头节点
   //注:这里是获取到锁之后的操作,不需要并发控制
   setHead(node);
   //这里意思有两种情况是需要执行唤醒操作
   //1.propagate > 0 表示调用方指明了后继节点需要被唤醒
   //2.头节点后面的节点需要被唤醒(waitStatus<0),不论是老的头结点还是新的头结点
   if (propagate > 0 || h == null || h.waitStatus < 0 ||
           (h = head) == null || h.waitStatus < 0) {
       Node s = node.next;
       //如果当前节点的后继节点是共享类型获取没有后继节点,则进行唤醒
       //这里可以理解为除非明确指明不需要唤醒(后继等待节点是独占类型),否则都要唤醒
       if (s == null || s.isShared())
           doReleaseShared();
   }
}

private void setHead(Node node) {
   head = node;
   node.thread = null;
   node.prev = null;
}

9.2. 唤醒后继节点 — doReleaseShared

doReleaseShared 方法进行了后续节点的唤醒。

private void doReleaseShared() {
   for (;;) {
       //唤醒操作由头结点开始,注意这里的头节点已经是上面新设置的头结点了
       //其实就是唤醒上面新获取到共享锁的节点的后继节点
       Node h = head;
       if (h != null && h != tail) {
           int ws = h.waitStatus;
           //表示后继节点需要被唤醒
           if (ws == Node.SIGNAL) {
               //这里需要控制并发,因为入口有setHeadAndPropagate跟release两个,避免两次unpark
               if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                   continue;      
               //执行唤醒操作      
               unparkSuccessor(h);
           }
           //如果后继节点暂时不需要唤醒,则把当前节点状态设置为PROPAGATE确保以后可以传递下去
           else if (ws == 0 &&
                   !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)
)
               continue
;                
       }
       //如果头结点没有发生变化,表示设置完成,退出循环
       //如果头结点发生变化,比如说其他线程获取到了锁,为了使自己的唤醒动作可以传递,必须进行重试
       if (h == head)                  
           break;
   }
}

10. 共享锁的解锁 — releaseShard

我们再来看共享锁的解锁操作。

public final boolean releaseShared(int arg) {
   if (tryReleaseShared(arg)) {
       doReleaseShared();
       return true;
   }
   return false;
}

结合上面我们已经介绍过的 doReleaseShared 方法源码,共享锁的解锁非常简单,与独占模式的解锁一样,他在解锁成功以后进行了后续节点的唤醒操作,从而保证锁的竞争。

以上是关于AQS (Abstract Queued Synchronizer)源码解析 -- 独占锁与共享锁的加锁与解锁的主要内容,如果未能解决你的问题,请参考以下文章

JUC中AQS简介

显示锁和aqs

AQS源码探究_04 成员方法解析(释放锁响应中断出队逻辑)

AQS源码探究_04 成员方法解析(释放锁响应中断出队逻辑)

AQS中非公平锁的实现原理简介

从ReentrantLock加锁解锁角度分析AQS