AQS源码分析
Posted 技术秘宝
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了AQS源码分析相关的知识,希望对你有一定的参考价值。
1. AQS简介
AQS(AbstractQueuedSynchronizer)抽象同步队列,AQS是一个用来构建锁和同步器以及队列模型的简单框架。它不仅是JUC锁的底层实现原理,也是面试的热频考点。熟练的掌握其中的知识,对我们来说非常重要。
我们在分析AQS之前先抛出几个问题,看读完这篇文章之后,可不可以很好的回答。
首先我们来看下AQS的UML结构图和整体的架构图来了解一下AQS的框架:
2. AQS原理分析
AQS的核心思想,就是当某个线程想要请求共享资源的时候,如果被请求的共享资源空闲,那么我们就将当前请求资源的线程设置为有效的工作线程,并占取资源,资源设置为锁定状态,如果共享资源被锁定后,其他线程请求该资源时,需要阻塞等待唤醒机制来保证锁分配。也就是说我们需要一个队列来存放阻塞的线程。AQS采用CLH队列的变体实现。
CLH队列本身是单链表,为了适合AQS的整体框架,AQS实际采用的是CLH队列的变体,即虚拟双向队列(FIFO),AQS是通过将每条请求共享资源的线程封装成队列中的一个节点来实现锁的分配。
主要原理:
在多线程的环境下,如果每个线程在抢占锁资源的时候,都要根据state的情况来分配锁,因此,变量state必须要保证多个线程间的可见性,提到可见性,想必大家都会想到Volatile的特性之一吧,没错,AQS中使用的就是一个Volatile的int类型来表示同步状态。而为了保证每次线程修改state值时的线程安全,我们通过CAS完成对state值的修改。
2.1 AQS底层数据结构CLH单位节点——Node
我们先来看下Node的属性方法图:
static final class Node{
//SHARED 表示该线程是获取共享模式资源时被阻塞挂起放入AQS队列中
static final Node SHARED = new Node();
//EXCLUSIVE 表示该线程是获取独占模式资源时被阻塞挂起放入AQS队列中
static final Node EXCLUSIVE = null;
//waitstate 队列等待线程状态,CANCELLED表示线程获取锁的线程已经取消
//CANCELLED 该状态是由于timeout或者 interrupt
static final int CANCELLED = 1;
/*表示线程需要被唤醒*/
static final int SIGNAL = -1;
//表示该节点在条件队列中,直到它被传输前,都不会被当作同步队列节点
static final int CONDITION = -2;
/*表示线程释放共享资源的时候会通知其他节点,仅用在头结点*/
static final int PROPAGATE = -3;
//节点表示的线程的状态
volatile int waitStatus;
//当前节点的前一个节点
volatile Node prev;
//当前节点的后一个节点
volatile Node next;
//当前节点表示的线程
volatile Thread thread;
// 指向下一个处于condition状态的节点
Node nextWaiter;
//判断该节点是否为共享模式
final boolean isShared() {
return nextWaiter == SHARED;
}
/*返回前驱节点,如果前驱节点为空将会抛出NullPointerException的异常*/
//因此,当前驱节点不能为空的时候使用,可以帮助虚拟机减少判空的操作。
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
//用来建立初始的队列头,或者共享状态的标志
Node() { // Used to establish initial head or SHARED marker
}
//向同步队列添加新的节点
Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}
//向条件队列中添加节点
Node(Thread thread, int waitStatus) { // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread;
}
}
2.1 AQS底层数据结构CLH单位节点——Node
总结:
*线程两种锁的模式
*waitStatus的几个枚举值
2.2 AQS类成员属性分析
那我们先来看看AQS类的几个重要的成员属性
//该属性是同步等候队列的头部,头部如果存在,则waitstatus必不可能是CANCELLED
//head只能通过setHead()的函数,head是lazily initialized
private transient volatile Node head;
//同步等候队列额尾部,tail是延迟初始化,它仅仅只能使用enq()方法添加新元素
private transient volatile Node tail;
//同步资源状态
private volatile int state;
//获取同步资源状态
protected final int getState() {return state;}
//设置同步资源状态,具有volatile语义
protected final void setState(int newState) {state = newState;}
//通过CAS来保证同步状态(state)的原子修改,在内存值等于期望值的情况下
protected final boolean compareAndSetState(int expect, int update) {
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
我们可以看到getState(),,setState(), compareAndSetState(),方法都是用Final来修饰的,说明子类无法重写它们,我们可以通过State字段表示同步状态来实现多线程的独占模式和共享模式。
2.3 用ReentrantLock来理解AQS阻塞排队源码
首先ReentrantLock是独占锁,有公平和非公平两种锁模式,而AQS又有独占和共享的模式。
独占锁和共享锁,两者的主要区别是什么呢?
独占锁:当某个线程占用资源时,其他线程便只能等待占用锁的线程释放后才可以竞争,并且每次有且仅有一个线程可以竞争成功。
共享锁:贡献资源可以被多个线程同时占有,直到共享资源被占用完毕,常见的有ReadWriteLock,CountDownLatch。
什么是公平锁和非公平锁呢?
公平锁:每个线程当想要抢占资源时,必须按照先来后到的顺序,一个一个来对应到超市购物付款情景下,每个超市的顾客在付款时,应该排好队,不能插队,前面有人在排队了,就主动排队。
非公平锁:非公平锁就是当某个线程请求资源时,不管前面是否有其他线程在占用资源或者排队,强行直接过来尝试占用资源,但是你占用不一定成功,(年轻人不讲武德,可能会被打)。超市排队付款,你说突然来了一位不速之客,直接插队抢占超市自动付款机,是不是可能抢占失败(会被后面的顾客吐槽,或者引起不必要的矛盾)
上面我们简单的了解独占模式和共享模式,以及什么是公平锁,什么是非公平锁。AQS其实就是JUC包中实现锁的底层,我们可以用其实现自定义同步器,而AQS提供了大量用于自定义同步器实现的Protected方法,自定义同步器实现的相关方法也只是为了通过修改State字段来实现线程的独占模式或者共享模式。自定义同步器需要实现以下方法:
//独占模式,arg为获取锁的次数,尝试获取资源,arg一般是1
//如果获取失败,且该线程还没有进入同步队列,则阻塞进入同步队列,直到其他线程释放资源
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
//独占模式,arg为释放锁额次数,尝试释放资源
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}
//共享方式,arg为获取锁的次数,负数表示失败,0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。
protected int tryAcquireShared(int arg) {
throw new UnsupportedOperationException();
}
//共享模式,arg为释放锁的次数,尝试释放资源,如果释放后允许唤醒后续等待节点返回True,否则返回false
protected boolean tryReleaseShared(int arg) {
throw new UnsupportedOperationException();
}
//该线程是否正在独占资源,只有用到Condition才需要去实现它。
protected boolean isHeldExclusively() {
throw new UnsupportedOperationException();
}
以上几个方法是我们实现自定义同步器需要实现的几个方法(不是全部),就非公平锁和公平锁而言,非公平锁的处理稍微复杂一些,因此,我们以非公平锁为主,来分析ReentrantLock与AQS关联源码。首先,我们通过流程图来分析一下非公平锁获取资源的流程。
为了更好的理解ReentrantLock和AQS方法之间的交互,我们以非公平锁为例,将加锁和解锁交互的过程单独呈现,以便对后续内容的理解。
加锁:
ReentrantLock的加锁方法进行加锁操作。
会调用内部类Sync的Lock方法,由于Sync.lock()是抽象方法,根据ReentrantLock初始化选择的公平锁和非公平锁,执行相关内部类的Lock方法,本质上都会执行AQS的Acquire方法。
AQS的Acquire方法会执行tryAcquire方法,而tryAcquire则是由自定义同步器自行实现,ReentrantLock则是通过公平锁和非公平锁实现了不同的tryAcquire,在不同的情景下,调用不同的tryAcquire方法。
tryAcquire是获取锁的逻辑,获取失败后,会执行AQS框架的后续逻辑,跟ReentrantLock自定义同步器无关。
解锁:
ReentrantLock通过unlock进行解锁。
Unlock内部会调用内部类Sync的Release方法,该方法是继承与AQS。
而Release内部会调用tryRelease方法,tryRelease方法需要自定义同步器自行实现,且该方法只在内部类Sync中实现,在NonfairSync和FairSync中并没有实现,因此,可以看出在解锁时,ReentrantLock并没有公平锁和非公平锁的区分。
释放成功后,所有处理由AQS框架完成,与自定义同步器无关。
注:sync(该sync非彼Sync,是ReentrantLock类的成员变量,用来表示是采用公平锁还是非公平锁,默认是非公平锁)一般分为NonfairSync和FairSync两种,两者都是继承于Sync(该Sync是继承AQS的抽象类),关系图如下:
通过上面的过程,我们可看出ReentrantLock加锁解锁时API层核心方法的映射。
2.3.1 ReentrantLock源码分析
先看下ReentrantLock的使用方法:
private ReentrantLock lock = new ReentrantLock();
public void run(){
//1. 加锁
lock.lock();
try{
//2. 执行临界区代码
}catch (InterruptedException e){
e.printStackTrace();
}finally{
//3. 解锁
lock.unlock();
}
}
首先我们看下ReentrantLock初始化,它有一个无参构造方法,和一个带参构造函数,无参构造函数默认ReentrantLock是非公平锁,而在带参构造函数中可以通过传入参数来选择ReentrantLock实例时非公平锁还是公平锁。
/*ReentrantLock类下的一个成员变量,同步器会提供全部的实现机制*/
//标注了是公平锁还是非公平锁
private final Sync sync;
//无参构造器默认采用非公平锁
public ReentrantLock() {
sync = new NonfairSync();
}
//带参构造器可以通过 fair 的值选择公平锁还是非公平锁
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
当我们初始化ReentrantLock后,会调用lock()方法
/*lock()方法是内部类Sync的一个抽象方法,需要具体的Sync类的对象具体化
具体化的lock()方法中会调用AQS提供的Acquire方法*/
public void lock() {
sync.lock();
}
接下来让我们解读一下FairSync和NonfairSync中lock的具体实现
//非公平锁的lock方法,我们可以看出,非公平锁,一上来就先试着抢占共享资源
final void lock() {
if (compareAndSetState(0, 1))//用CAS来尝试抢占资源
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
/*公平锁的lock方法,由于是公平锁,一上来不会先抢占资源,而是通过acquire方法继续后面的操作*/
final void lock() {
acquire(1);
}
可以看出当我们调用ReentrantLock#lock方法后,其实就会调用AQS提供的acquire(arg)方法。接下来我们就来揭开acquire方法的神秘面纱
/*在独占模式下,忽视中断,通过调用至少一次的tryAcquire(该方法由自定义同步器自行实现)方法,
并且我们看到acquire是用final修饰,不可以被重写*/
//一般arg为1,表示占取资源后将state设置为1.
public final void acquire(int arg) {
//当tryAcquire抢占失败后,我们就调用acquireQueued方法,将线程阻塞
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();//中断当前线程
}
AQS只提供了一个用protected修饰的tryAcquire方法,具体的实现由自定义同步器自行实现,接下来我们看下ReentrantLock是怎么实现tryAcquire方法的。首先我们先来看看Sync类中的nonfairTryAcquire(int acquires),不知道大家会不会有点奇怪的感觉,nonfairTryAcquire为啥不写在nonFairSync类中,我认为ReentrantLock本就默认非公平锁,因为上面这个原因才把nonfairTryAcquire写在Sync类中。
//非公平锁模式
final boolean nonfairTryAcquire(int acquires) {
//获得当前线程
final Thread current = Thread.currentThread();
int c = getState(); //获得state的状态
if (c == 0) { //如果为0,表示没有其他线程占取。采用CAS,独占锁资源
if (compareAndSetState(0, acquires)) {
//如果CAS操作成功,则将锁的线程设置为当前请求的线程
setExclusiveOwnerThread(current);
//独占成功,返回true
return true;
}
}
//如果state不等于0,说明资源被占用,判断占用线程是否为请求线程
//ReentrantLock是可重入锁,因此,state可以>1,表示该线程再次获得锁资源
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
//如果新计算的nextc值没有溢出,则将state的值更新为nextc的值。
setState(nextc);
//同一个线程再次请求锁资源,积累重入锁的数量,占取成功,返回true
return true;
}
return false;
}
那么接下来我们来看看公平锁的tryAcquire(int acquire)方法。该方法写在FairSync类中
//公平锁的tryAcquire方法
protected final boolean tryAcquire(int acquires) {
//获取当前请求线程
final Thread current = Thread.currentThread();
int c = getState(); //获取state的状态值
if (c == 0) { //state为0,说明锁资源还没有被占用
//hasQueuedPredecessors方法,如果当前线程,存在等待队列,即不是队列头,或者队列为空,则返回false,否则为true。
//hasQueuedPredecessors返回false,表示当前线程前面没有其他线程请求资源了,因此,取反为true,通过CAS获取锁
//hasQueuedPredecessors返回true,表示当前线程前面存在其他线程请求资源,因此,取反为false,需要阻塞排队
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
其实公平锁和非公平锁的tryAcquire方法,差别就在于公平锁和非公平锁的概念上,公平锁是先来先服务,后来的就排队等待前面的释放了资源,才可以抢占资源。而非公平锁是不讲武德的,以上来不管前面有没有人排队,我都要试着去抢上一把,抢成功了则独占资源,否则进入等待队列。
如果tryAcquire抢占失败了,我们将会通过addWaiter方法将该线程组成的Node加入到等待队列中
我们需要先了解一下addWaiter方法。addWaiter方法传入一个Node mode(每个节点都有其模式,EXCLUSIVE:独占;SHARED:分享)参数,创建一个当前线程的节点,然后加入等待队列。
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
//首先通过快速的插入方式,如果插入失败
Node pred = tail; //得到阻塞队列的尾结点
if (pred != null) {//如果尾结点为空,则说明队列没有初始化
//不为空则将node的前置节点赋值为尾结点
node.prev = pred;
if (compareAndSetTail(pred, node)) {
//尾结点的下个节点赋值给node
pred.next = node;
return node;
}
}
//如果尾结点为空,或者尾节点被其他线程修改时,使用enq方法来插入节点
enq(node);
return node;//返回当前线程组成的节点
}
主要的流程如下:
通过当前的线程和锁模式新建一个节点
将pred指针指向尾节点Tail
将New中Node的prev指针指向pred
从上面的代码中存在两个CAS的操作,一个是compareAndSetHead,另一个是compareAndSetTail,接下来让我们阅读以下源码感受下。看看设置等待队列头尾节点的CAS操作。
/**
* CAS head field. Used only by enq.
*/
private final boolean compareAndSetHead(Node update) {
return unsafe.compareAndSwapObject(this, headOffset, null, update);
}
/**
* CAS tail field. Used only by enq.
*/
private final boolean compareAndSetTail(Node expect, Node update) {
return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
}
我们可以看到设置头尾节点的CAS方法,compareAndSetHead和compareAndSetTail方法调用的是魔法类Unsafe中的compareAndSwapObject方法,而compareAndSwapObject底层是用c++写的,我们一般是看不到的,需要自己去找jvm的源码,推荐看HotSpot,java8的源码。
// java.util.concurrent.locks.AbstractQueuedSynchronizer
static {
try {
stateOffset = unsafe.objectFieldOffset(AbstractQueuedSynchronizer.class.getDeclaredField("state"));
headOffset = unsafe.objectFieldOffset(AbstractQueuedSynchronizer.class.getDeclaredField("head"));
tailOffset = unsafe.objectFieldOffset(AbstractQueuedSynchronizer.class.getDeclaredField("tail"));
waitStatusOffset = unsafe.objectFieldOffset(Node.class.getDeclaredField("waitStatus"));
nextOffset = unsafe.objectFieldOffset(Node.class.getDeclaredField("next"));
} catch (Exception ex) {
throw new Error(ex);
}
}
从AQS的静态代码块可以看出,都是获取一个对象的属性相对于该对象在内存当中的偏移量,这样我们就可以根据这个偏移量在对象内存当中找到这个属性,tailOffset指的是tail对应的偏移量,所以这个时候会将new出来的Node设置为当前队列的尾节点。同时,由于等待队列是双向链表,也需要将前一个节点指向尾节点。
如果pred指针是Null(说明等待队列中没有元素),或者当前pred指针和Tail指向的位置不同(说明被别的线程已经修改),就需要看一下enq方法。
private Node enq(final Node node) {
for (;;) {
Node t = tail; //获得尾节点
//如果尾节点为空,说明等待队列还没有初始化
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
//如果已经初始化或者并发导致队列中有元素了,则将当前node通过CAS操作添加到等待队列尾部
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
如果尾节点为Null,说明队列没有初始化,需要初始化,初始化的头节点并不是当前线程节点,而是调用了无参构造函数的节点。如果已经初始化或者并发导致队列中有元素(这里指,调用addWaiter方法时,本身队列中是没有元素的,但是由于并发,在调用enq的时候,tail节点可能就不为空了)我们只需要将当前node添加为队列尾节点就可以了。其实,addWaiter就是一个在双端链表添加尾节点的操作,需要注意的是,双端链表的头节点是一个无参构造函数的头节点。
非公平锁获取锁总结:
1. 当没有线程获取到锁时,线程1获取锁成功
2. 线程2申请锁时,但锁被线程1占用,此时线程2放入等待队列阻塞
3. 如果还有其他线程要获取锁,依次在队列中往后排队即可,直到占取锁线程释放锁资源
回到tryAcquire方法中时,我们发现公平锁在判断等待队列时,是使用hasQueuedPredecessors来判断的,如果返回false,说明当前线程可以争取共享资源,否则,说明队列中存在有效队列,当前线程必须加入等待队列阻塞
public final boolean hasQueuedPredecessors() {
// The correctness of this depends on head being initialized
// before tail and on head.next being accurate if the current
// thread is first in queue.
Node t = tail; // Read fields in reverse initialization order
Node h = head;
Node s;
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}
上面代码中最关键的部分就是最后一句h != t && ((s = h.next) == null || s.thread != Thread.currentThread()); 我们来分析下为什么要使用头节点比较,并且为什么要判断下一个节点?
变体CLH队列的头节点是一个虚节点,其不存储任何信息,只是占位。真正的第一个有数据的节点,是在第二个节点开始的,当h!=t时:
*如果s=h.next == null, 这时候表示的是当前队列正在有线程初始化,因为,此时头节点不等于尾节点,说明此时有线程正在加入队列,且只进行了tail指向head,而没有进行到head指向tail,队列中存在有效元素,因此返回true,
*如果s=h.next!=null,则说明队列至少存在一个有效元素,如果此时s.thread ==Thread.currentThread(),则说明等待队列的第一个有效节点的线程与当前请求资源的线程相同,那么当前线程是可以获取资源的,返回true。如果s.thread != Thread.currentThread(),说明等待队列第一个有效节点线程与当前线程不同,当前线程必须加入等待队列。
// java.util.concurrent.locks.AbstractQueuedSynchronizer#enq
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
//我们清楚的看到,节点插入队列时是非原子操作的,因此,会存在短暂的head!=tail的情况
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
上面代码来自enq方法,我们可以看到节点入队列不是原子操作,所有会出现短暂的head != tail, 此时Tail指向最后一个节点, 而且Tail指向Head。如果Head没有指向Tail(即队列已经初始化,且存在至少一个有效节点,可见代码8、9、10行),这种情况下也需要将相关线程加入队列中。所以这块代码是为了解决极端情况下的并发问题。
接下来我们来分析分析等待队列中线程出队列的时机,即acquireQueued方法
首先我们回到AQS的acquire方法中:
// java.util.concurrent.locks.AbstractQueuedSynchronizer
public final void acquire(int arg) {
if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
//阻塞期间被中断过的线程被唤醒获取锁后会在补一个中断
selfInterrupt();
}
前面我们分析了addWaiter方法,就是将线程+节点模式封装成一个Node数据结构,然后将其加入到双端队列中,返回一个包含该线程的Node,并将该Node作为参数,进入到acquireQueued方法中。acquireQueued方法可以对排列中的线程进行“获锁”操作。
既然acquireQueued方法是对排列中的线程进行“获锁”操作的,那么是如何进行相关的操作的呢?它到底是如何判断一个线程何时出队列?如何出队列呢?且看如下源码分析:
final boolean acquireQueued(final Node node, int arg) {
//标记是否成功获取资源
boolean failed = true;
try {
//标记等待过程中是否中断过
boolean interrupted = false;
//开始自旋,要么获取锁,要么中断
for (;;) {
//获取当前节点的前驱节点
final Node p = node.predecessor();
//如果p是头节点,说明该当前节点是等待队列的第一个有效节点,尝试获取锁
//头节点是虚节点
if (p == head && tryAcquire(arg)) {
//当前线程获取成功,头指针移动到当前node
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
//如果p为头节点但是当前没有获取到锁(可能非公平锁被抢占了),或者p不是头节点,这时候需要判断当前node是否要被阻塞
//被阻塞的条件(前驱节点的waitStatus为-1 SIGNAL 表示当前线程准备好了,准备释放资源了)防止无限循环浪费资源,具体有两个方法下面新品
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
//标记等待过程存在中断
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
接下来我们来细细品味上面acquireQueued方法中出现的三个方法,分别是 setHead、shouldParkAfterFailedAcquire、parkAndCheckinterrupt方法
// java.util.concurrent.locks.AbstractQueuedSynchronizer
private void setHead(Node node) {
//将当前节点设置为虚节点,但是waitStatus的值不会改变会一直沿用
head = node;
node.thread = null;
node.prev = null;
}
setHead方法是把当前节点置为虚节点,但并没有修改waitStatus,因为它是一直需要用的数据。
// java.util.concurrent.locks.AbstractQueuedSynchronizer
//根据当前节点的前驱节点waitStatus值来确定当前节点是否需要阻塞
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
//获得前驱节点的节点状态
int ws = pred.waitStatus;
//如果前驱节点的节点状态是 -1 SIGNAL,说明头节点处于唤醒状态
//则当前节点需要阻塞
if (ws == Node.SIGNAL)
return true;
//通过枚举值我们知道waitStatus>0是取消状态
if (ws > 0) {
do {
//通过do-while循环查找取消节点,把取消节点从队列中删除
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
//当前置节点的节点状态<=0时,设置前任节点等待为SIGNAL
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
如果前驱节点的waitStatus状态≤0,则需要首先将其前驱节点置为SIGNAL,当前节点进入阻塞的一个条件是前驱节点必须为SIGNAL,这样下一次自旋后发现前驱节点为SIGNAL(指的是在acquireQueued方法中的for循环那里自旋),就会返回true。
当shouldParkAfterFailedAcquire 返回true代表线程可以进入阻塞中断,那么下一步parkAndCheckInterrupt就该让线程阻塞了,接下来我们看看parkAndCheckInterrupt的源码
// java.util.concurrent.locks.AbstractQueuedSynchronizer
private final boolean parkAndCheckInterrupt() {
//该方法调用了LockSupport类的park方法
LockSupport.park(this);
//返回了当前阻塞线程的中断状态
return Thread.interrupted();
}
线程阻塞我们是用LockSupport类的park方法来让线程中断的,但是为什么要判断线程是否中断过呢,这是因为线程在阻塞期间如果收到了中断,线程被唤醒(转为运行态)获取锁后(acquireQueued为true)需要补一个中断,自中断的selfInterrupt方法源码如下:
这里寸山有一点不明白,就是为什么线程在阻塞期间中断过,被唤醒获取锁后还需要再补一个中断?欢迎大家联系寸山替我解答疑惑
//是一个方便线程中断的方法
static void selfInterrupt() {
Thread.currentThread().interrupt();
}
总结一下acquireQueued方法的大体流程,如下:
从上图可以看出,想要跳出自旋的循环的前置条件是“前置节点是头节点,且当前线程获取成功”。为了防止因死循环导致CPU资源被浪费,我们会判断前置节点状态来决定是否要将当前线程挂起,具体挂起流程如下(shouldParkAfterFailedAcquire流程):
从队列中释放节点的疑虑打消了,那么又有新问题了:
shouldParkAfterFailedAcquire方法中waitStatus<=0的那些状态被取消的节点是怎样生成的呢?又是什么时候把一个节点的waitStatus设置为-1?
是在什么时间释放节点通知到被挂起的线程呢?
CANCELLED状态节点的生成,我们来看看acquireQueued方法中的Finally代码:
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
//标记等待过程中是否中断过
boolean interrupted = false;
//开始自旋,要么获取锁,要么中断
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
...
failed = false;
return interrupted;
}
...
}
} finally {
if (failed)
cancelAcquire(node);
}
}
我们可以看到在acquireQueued方法中通过cancelAcquire方法,将Node的状态标记为CANCELLED。接下来,我们逐行来分析这个方法的原理:
private void cancelAcquire(Node node) {
// 如果节点不存在则直接返回
if (node == null)
return;
//将当前节点不关联任何线程,即设置为虚节点
node.thread = null;
//获取当前节点的前驱节点
Node pred = node.prev;
//通过while循环跳过取消状态的node
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
//获取过滤后的前驱节点的后续节点
Node predNext = pred.next;
//将当前节点的状态设置为CANCELLED
node.waitStatus = Node.CANCELLED;
//如果当前节点是尾节点,将从后往前的第一个非取消状态节点设置为尾节点
//如果更新成功,则通过CAS将tail.next 设置为null
if (node == tail && compareAndSetTail(node, pred)) {
compareAndSetNext(pred, predNext, null);
} else {
//如果更新失败
int ws;
//如果当前节点的前驱节点不是头节点,即当前node不是头节点的后续节点,则:1.判断前驱节点的ws==SIGNAL,2. 如果ws不是CALLCELLED状态,则通过CAS操作设置为SINGAL看成功与否
//如果上述,1 or 2中有一个为true,则继续判断当前node的线程是否为null,
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) {
//如果上述if语句条件都满足,则把当前节点的前驱节点的后续节点,通过CAS操作设置为当前node的后续节点
//即 CAS => (Pred.next = node.next) 当然前置条件是当前node.next的节点不能为空 or ws为CANCELLED
//否则没有什么意义,总不能把前驱节点的后续节点接在一个无效的后续节点上面吧
Node next = node.next;
if (next != null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next);
} else {
//如果当前节点是head的后续节点,或者上述条件不满足,那就唤醒当前节点的后续节点
//如果后续节点为空,则我们从尾节点向前找到第一个有效的节点,将其唤醒
unparkSuccessor(node);
}
node.next = node; // help GC
}
}
看完了cancelAcquire方法,我们来总结一下该方法的流程
流程总结
判断当前节点是否存在,不存在直接返回
将当前node的线程置为null,设置为虚节点
获取当前node的前驱节点,如果前驱节点的状态为CANCELLED,那就一直while循环往前遍历,找到第一个waitStatus<=0的节点,将找到的pred节点和当前Node关联,将当前node 的状态设置为CANCELLED。
根据当前节点的位置,考虑三种不同的情况:
当前节点是尾节点。(直接删除当前节点并通过CAS将前驱节点设置为尾节点,且通过CAS将pred.next设置为空)
当前节点是head的后续节点。(说明当前节点即将释放,唤醒当前节点的后续节点,unparkSuccessor(node); )
当前节点既不是头节点的后续节点,也不是尾节点。(如果当前node的后续节点有效,我们则将while遍历得到的ws<=0的pred的后续节点设置为当前节点的后续节点即可,当然为了线程安全,这里也采用CAS操作)
我们通过看图的方式,来分析一下上面的三种情况:
第二种情况就是当前node的前驱节点是head节点,那么我们只需要唤醒CurrentNode节点的后续节点,当后续节点为无效节点时(指某个节点为空,或者waitStatus>0,即为CANCELLED状态)我们便从tail节点,开始从右至左的遍历,直到找到第一个有效的节点,将其唤醒。
情况2中,我们调用了unparkSuccessor方法,该方法传入的是当前节点,负责唤醒传入节点的后续节点。我们来看看该方法的源码吧
//java.util.concurrent.locks.AbstractQueuedSynchronizer#unparkSuccessor
private void unparkSuccessor(Node node) {
//获得当前节点的状态
int ws = node.waitStatus;
//当前节点状态如果不为CANCELLED
if (ws < 0)
//通过CAS操作设置当前节点的ws为0
compareAndSetWaitStatus(node, ws, 0);
//获取当前节点后续节点
Node s = node.next;
//如果后续节点无效,即为null or ws>0 状态为CANCELLED
if (s == null || s.waitStatus > 0) {
//将s置为null
s = null;
//从tail节点开始,从右往左遍历找到第一个有效节点
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
//将有效节点赋值给当前节点
s = t;
}
if (s != null)
//调用LockSupport类下的unpark方法,唤醒后续节点
LockSupport.unpark(s.thread);
}
通过上面的分析,我们对于CANCELLED节点状态的产生和变化也有了大致的了解,但是我们不难发现,基本所有的操作都是对next指针的操作,而没有对Prev指针的操作,那么什么情况下会对Prev指针进行操作呢?
解答:执行cancelAcquire的时候,当前节点的前置节点可能已经从队列中出去了(已经执行过Try代码块中的shouldParkAfterFailedAcquire方法了),如果此时修改Prev指针,有可能会导致Prev指向另一个已经移除队列的Node,因此这块变化Prev指针不安全。shouldParkAfterFailedAcquire方法中,会执行下面的代码,其实就是在处理Prev指针。shouldParkAfterFailedAcquire是获取锁失败的情况下才会执行,进入该方法后,说明共享资源已被获取,当前节点之前的节点都不会出现变化,因此这个时候变更Prev指针比较安全。
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
2.4 ReentrantLock是如何关联AQS解锁的
上面我们已经剖析了AQS加锁的基本流程,接下来我们来分析分析AQS解锁的基本流程。上面也有所提及ReentrantLock在解锁的时候,是不分公平锁和非公平锁的,接下来我们来分析分析:
AQS解锁首先需要调用ReentrantLock#unlock 方法,该方法通过Sync内部框架类的实例来调用release方法
// java.util.concurrent.locks.ReentrantLock
public void unlock() {
//通过Sync内部框架类来调用release方法
sync.release(1);
}
我们看到,ReentrantLock#unlock方法,实际上是调动了Sync#release 方法,这里需要提下其实该方法还是在AQS中实现的,只是Sync类继承与AQS,于是便可以通过Sync类的实例调用该方法。所以解锁的关键可能在release中能得到体现到,接下来我们来分析一下该方法。
// java.util.concurrent.locks.AbstractQueuedSynchronizer
public final boolean release(int arg) {
//我们先需要调用tryRelease方法,并将arg作为参数传入tryRelease
if (tryRelease(arg)) {
//如果tryRelease方法true,则表示该锁没有被任何线程所持有
//获得头节点
Node h = head;
//如果头节点不为空并且头节点的waitStatus不是初始化节点的情况,解除线程挂起状态
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
//返回true
return true;
}
//否则返回false
return false;
}
看到上面的代码,很多小伙伴可能都会在h!=null&&h.waitStatus!=0 处沉思很久,为什么这里的判断条件是这样的呢?这样做到底何意?
h == null时,表示Head还没有初始化,初始情况下,head == null。当第一个节点入队列时,Head会被初始化为一个虚拟节点。所以说如果还没有节点来得及入队,就会出现head == null的情况。
h != null&& waitStatus == 0 表示队列中存在有效节点,且后续节点对应的线程仍在运行。不需要被唤醒
h != null && waitStatus < 0 表示队列中存在有效节点,但是后续节点可能被阻塞了需要被唤醒
要理解上面的几句话,我们还得在来看看unparkSuccessor方法,我们看到我们在唤醒后续节点时,都会判断当前节点的ws,如果当前节点的ws<0,则将当前节点的ws通过CAS赋值为0,也就是当前节点的后续节点需要被唤醒时,当前节点的ws必定不会等于0.
// java.util.concurrent.locks.AbstractQueuedSynchronizer
private void unparkSuccessor(Node node) {
// 获取结点waitStatus
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
// 获取当前节点的下一个节点
Node s = node.next;
// 如果下个节点是null或者下个节点被cancelled,就找到队列最开始的非cancelled的节点
if (s == null || s.waitStatus > 0) {
s = null;
// 就从尾部节点开始找,到队首,找到队列第一个waitStatus<0的节点。
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
// 如果当前节点的下个节点不为空,而且状态<=0,就把当前节点unpark
if (s != null)
LockSupport.unpark(s.thread);
}
我们第一次分析unparkSuccessor方法时,肯定小伙伴就存在问题了,问什么当前node.next 为null or ws>0时,选取唤醒节点的时候,我们会从尾部开始往前找到第一个非CANCELLED的节点呢?我们来分析分析
我们还是要回到之前的addWaiter方法:
// java.util.concurrent.locks.AbstractQueuedSynchronizer
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
我们可以从第8-11行,看出节点入队列并不是原子操作。而Node pred = tail; compareAndSetTail(pred, node)这两个地方可以看作Tail入队的原子操作,也就是如果我们还没有执行到pred.next = node,就调用了unparkSuccessor方法,那么如果从前往后找,可能就找不到当前加入的节点了,因为,此时前驱节点还没有将next指向当前新加入的节点。
除了上面的原因之外,还有一点原因,在产生CANCELLED状态节点的时候,我们先断开了next指针,而perv指针并没有断开。总而言之,我们都是为了遍历整个队列
哈哈,阅读完release方法的源码,我们发现该方法其实还是烟雾弹,虽然有一些解锁释放的操作,但是都依赖于tryRelease类,在ReentrantLock里面的公平锁和非公平锁的父类Sync定义了可重入锁的释放锁机制。tryRelease是AQS中的抽象方法,具体的实现还是在Sync类中。
//java.util.concurrent.locks.AbstractQueuedSynchronizer#tryRelease
//定义在AQS中的方法,具体的实现在Sync类中
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}
//java.util.concurrent.locks.ReentrantLock.Sync#tryRelease
protected final boolean tryRelease(int releases) {
//较少可重入的次数
int c = getState() - releases;
//如果请求释放锁资源的线程与持有锁的线程不同,则抛出异常
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
//将释放结果设置为false
boolean free = false;
//如果有线程全部释放,因为ReentrantLock是可重入锁,当将自己占有锁的次数全部释放掉时
//将释放结果设置为null
//将锁持有线程赋值为null
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
//设置state的值
setState(c);
//返回free的结果,free的结果其实就是代表着当前锁是否有线程持有
//如果次数减少后,锁还是有线程占用,则为false,否则为true
return free;
}
2.5 中断恢复后的执行流程
当线程被阻塞后,会执行 return Thread.interrupted方法,这个函数返回的是当前执行线程的中断状态,并清除。
// java.util.concurrent.locks.AbstractQueuedSynchronizer
//parkAndCheckInterrupt主要在acquireQueued方法中调用
//从等待队列中获取节点
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
我们在回到acquireQueued方法(该方法只有当前node.pred是头节点,且抢占锁成功时,才可以跳出循环)当parkAndCheckInterrupt返回true或者False的时候,interrupted的值不同,但是都会执行下次循环,如果下次循环时,获取锁成功,就会把当前interrupted返回。
// java.util.concurrent.locks.AbstractQueuedSynchronizer
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
如果acquireQueued为True,就会执行selfInterrupt方法。
//acquire方法部分代码
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
// java.util.concurrent.locks.AbstractQueuedSynchronizer
static void selfInterrupt() {
Thread.currentThread().interrupt();
}
该方法其实是为了中断线程。但为什么获取了锁以后还要中断线程呢?这部分属于Java提供的协作式中断知识内容,感兴趣同学可以查阅一下。这里简单介绍一下:
当中断线程被唤醒时,并不知道被唤醒的原因,可能是当前线程在等待中被中断,也可能释放了锁以后被唤醒。因此我们通过Thread.interrupted()方法检查中断标记(该方法返回了当前线程的中断状态,并将当前线程的中断标识设置为false),并记录下来,如果发现该线程中断过,就再中断一次。
线程在等待资源的过程中被唤醒,唤醒后还是会不断去尝试获取锁,直到抢到锁为止。也就是说,在整个流程中,并不响应中断,只是记录中断记录。最后抢到锁返回了,那么如果被中断过的话,就需要补充一次中断。
这里的处理方式主要是运用线程池中基本运作单元Worder中的runWorker,通过Thread.interrupted()进行额外的判断处理,感兴趣的同学可以看下ThreadPoolExecutor的源码。
2.6 小结
文章开头,我们罗列了几个问题,那接下来我们来回答一下:
Q:某个线程获取锁失败的后续流程是什么呢?
A:将带有节点模式的线程封装成一个节点,并加入等待队列中。最后存在某种排队等候机制,线程继续等待,仍然保留获取锁的可能,获取锁流程仍在继续。
Q:既然说到了排队等候机制,那么就一定会有某种队列形成,这样的队列是什么数据结构呢?
A:是CLH变体的FIFO双端队列。
Q:处于排队等候机制中的线程,什么时候可以有机会获取锁呢?
A:可以详细看下acquireQueued方法的源码分析,相信你会得到答案的。
Q:如果处于排队等候机制中的线程一直无法获取锁,需要一直等待么?还是有别的策略来解决这一问题?
A:线程所在节点的状态会变成取消状态,取消状态的节点会从队列中释放,详细见cancelAcquire方法的源码分析。
Q:Lock函数通过Acquire方法进行加锁,但是具体是如何加锁的呢?
A:AQS的Acquire会调用tryAcquire方法,tryAcquire由各个自定义同步器实现,通过tryAcquire完成加锁过程。
Q:变体CLH双端队列中的节点的有几种waitStatus状态?
A:5种,分别是
初始默认值(0)表示一个Node被初始化的时候的默认值、CANCELLED(1)表示线程获取锁的请求已经取消了、SIGNAL(-1)CONDITION(-2)表示节点在等待队列中,节点线程等待唤醒
PROPAGATE(-3) 当前线程处在SHARED情况下
Q:公平锁和非公平锁是什么?
A:公平锁和非公平锁可以理解为ReentrantLock下的两种线程抢占锁资源的策略。公平锁就是很公正,一个线程在请求锁资源时,先要判断等待队列中是否存在有效节点,如果有,就进入等待队列阻塞,如果没有,就直接抢占锁资源。非公平锁,就是不管等待队列中存不存在有效节点,线程在请求锁资源的时候都需要先尝试抢占锁,如果失败在进入等待队列阻塞。
Q:ReentrantLock是可重入锁吗?
A:答案是肯定的,AQS类中维护了一个用volatile关键字修饰的一个state属性,就记录了获取ReentrantLock锁对象的重入次数。其中volatile是为了在多线程下的可见性,以及原子操作。
3. AQS的应用
3.1 JUC中的应用场景
AQS是JUC包下轻量级锁的基石。因此,基于AQS的同步工具有很多。我们来简单的介绍下,详细的我们会在之后的文章中向大家介绍。
同步工具 |
同步工具与AQS的关联 |
ReentrantLock |
使用AQS保存锁重复持有的次数。当一个线程获取时,ReentrantLock记录当前获得锁的线程标识,用于检测是否重复获取,以及错误线程试图解锁操作时异常情况的处理。 |
Semaphore |
使用AQS同步状态来保存信号量的当前计数。tryRelease会增加计数,acquireShared会减少计数 |
CountDownLatch |
使用AQS同步状态中的16位保存写锁持有的次数,剩下的16位用于保存读锁的持有次数。 |
CyclicBarrier |
它可以让一组线程全部达到一个状态后再全部同时执行。Cyclic的意义在于当所有等待线程执行完毕,并重置CyclicBarrier的状态后它可以被重用。Barrier的意义在于当线程调用await方法后会被阻塞,阻塞点就称为屏障点,等到所有的线程都调用了await方法后,线程们就冲破屏障,继续向下运行 |
ReentrantReadWriteLock |
使用AQS同步状态中的16位保存写锁持有的次数,剩下的16位用于保存读锁的持有次数 |
ThreadPoolExecutor |
Worker利用AQS同步状态实现对独占线程变量的设置(tryAcquire和tryRelease)。 |
3.2 自定义同步工具
了解AQS基本原理以后,根据AQS的知识点,我们来自己实现一个同步工具吧。
同步工具类:
内部类Sync继承与AQS,并重写其中的一些抽象方法 。
维护一个Sync类的实例对象。
实现lock() 和 unlock()方法
public class NewLock {
public static class Sync extends AbstractQueuedSynchronizer{
protected boolean tryAcquire(int arg) {
return compareAndSetState(0,1);
}
protected boolean tryRelease(int arg) {
setState(0);
return true;
}
protected boolean isHeldExclusively() {
return getState() == 1;
}
}
private Sync sync = new Sync();
public void lock(){
sync.acquire(1);
}
public void unlock(){
sync.release(1);
}
}
我们来使用自己完成的自定义Lock来完成一定的同步功能吧:
public class NewMain {
static int count = 0;
static NewLock newLock = new NewLock();
public static void main(String[] args) throws InterruptedException {
Runnable runnable = new Runnable() {
@Override
public void run() {
try{
// newLock.lock();
for(int i=0;i<100;i++){
count++;
System.out.println("Thread="+Thread.currentThread().getName()+", count = "+count);
}
}catch (Exception e){
e.printStackTrace();
}finally {
// newLock.unlock();
}
}
};
Thread thread0 = new Thread(runnable);
Thread thread1 = new Thread(runnable);
thread0.start();
thread1.start();
thread0.join();
thread1.join();
System.out.println(count);
}
}
我们会发现如果我们不使用自定义的NewLock,每次结果虽然count都是200,但是每次线程执行的顺序都是在不断变化的
当我们将注释注解了,然后在运行程序发现每次两个线程执行的顺序都是一致的,要么线程0,先执行到100,然后线程1接着100继续执行到200,要么线程1先执行到100,线程0接着100继续执行到200.
4. 总结
AQS的底层原理也是面试的热点问题,尤其对于校招来说,经常有面试官会问到,希望看过这篇文章的小伙伴,下次遇到面试官问AQS的时候,可以心不慌,游刃有余,从容不迫的回答面试官的问题。
AQS在JUC包中可以说是中流砥柱了,今后我们如果需要自定义同步器,吃透AQS来说是必不可少的。
最后,篇幅和能力都有限,如果存在问题或者疑问可以随时联系我,我及时修改,亡羊补牢,为时未晚。希望有幸读到文章的小伙伴可以共同进步,逐步走向人生巅峰。
参考资料
加从ReentrantLock的实现看AQS的原理及应用: https://tech.meituan.com/2019/12/05/aqs-theory-and-apply.html
《Java并发编之美》
以上是关于AQS源码分析的主要内容,如果未能解决你的问题,请参考以下文章
ReentrantLock核心源码分析,AQS独占模式,可重入锁
Java高并发编程实战6,通过AQS源码分析lock()锁机制