AQS 原理解析以及源码分析
Posted 小图包
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了AQS 原理解析以及源码分析相关的知识,希望对你有一定的参考价值。
目录
什么是AQS?
AbstractQueuedSynchronizer
,可以叫做抽象队列同步器,可以避免处理多个位置上发生的竞争问题。在基于AQS构建的同步器中,只可能在一个时刻发生阻塞,从而降低上下文切换的开销,并提高吞吐量。
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer
implements java.io.Serializable {
private transient volatile Node head;//头节点
private transient volatile Node tail;//尾节点
private volatile int state; //同步状态
protected final int getState() {
return state;
}
protected final void setState(int newState) {
state = newState;
}
protected final boolean compareAndSetState(int expect, int update) {
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
...
AQS 的核心也包括了这些方面:同步队列,独占式锁的获取和释放,共享锁的获取和释放以及可中断锁,超时等待锁获取这些特性的实现,这些实际上是AQS提供出来的模板方法。AQS 是一个抽象类,当我们继承 AQS 去实现自己的同步器时,要做的仅仅是根据自己同步器需要满足的性质实现线程获取和释放资源的方式(修改同步状态变量的方式)即可,至于具体线程等待队列的维护(如获取资源失败入队、唤醒出队、以及线程在队列中行为的管理等),AQS 在其顶层已经帮我们实现好了,AQS 的这种设计使用的正是模板方法模式。
AQS实现锁需要重写AQS中三个重要的方法
protected boolean tryAcquire(int arg) {} //独占式获取
protected boolean tryRelease(int arg) {} //独占式释放
protected int tryAcquireShared(int arg) {} //共享式获取
protected boolean tryReleaseShared(int arg) {} //共享式释放
protected boolean isHeldExclusively() {} //判断AQS是否被该线程独占
AQS支持独占锁(exclusive)和共享锁(share)两种模式。
1 独占锁:只能被一个线程获取到(Reentrantlock)
2 共享锁:可以被多个线程同时获取(CountDownLatch,ReadWriteLock).
等待队列
等待队列的节点需要有一个实现类,它正是Node
这个静态内部类。
static final class Node {
/** Marker to indicate a node is waiting in shared mode */
static final Node SHARED = new Node();
/** Marker to indicate a node is waiting in exclusive mode */
static final Node EXCLUSIVE = null;
/** waitStatus value to indicate thread has cancelled */
static final int CANCELLED = 1;
/** waitStatus value to indicate successor's thread needs unparking */
static final int SIGNAL = -1;
/** waitStatus value to indicate thread is waiting on condition */
static final int CONDITION = -2;
/**
* waitStatus value to indicate the next acquireShared should
* unconditionally propagate
*/
static final int PROPAGATE = -3;
volatile int waitStatus;
// 双向链表的结构
volatile Node prev;
volatile Node next;
// Node对象用来包装线程
volatile Thread thread;
// 用来表明当前node的线程是想要获取共享锁还是独占锁
Node nextWaiter;
final boolean isShared() {
return nextWaiter == SHARED;
}
}
作为双向链表的节点,自然需要prev和next,即前驱和后继指针。通过这两个指针将各节点连接起来。
thread成员肯定也是必需的。因为Node就是用来设计包装线程对象的,自然需要一个Thread类型的成员。
nextWaiter成员则是用来表明当前node的线程是想要获取共享锁还是独占锁。注意,这个成员只是这个作用,不是用来连接双向链表的。
最重要的还是waitStatus成员,用来表明node代表线程的状态。
AQS内部的数据结构与原理
AQS内部实现了两个队列,一个同步队列,一个条件队列。
同步队列的作用是:当线程获取资源失败之后,就进入同步队列的尾部保持自旋等待,不断判断自己是否是链表的头节点,如果是头节点,就不断参试获取资源,获取成功后则退出同步队列。
条件队列是为Lock实现的一个基础同步器,并且一个线程可能会有多个条件队列,只有在使用了Condition才会存在条件队列。
独占式获取锁
1 独占锁加锁入口-acquire()方法
/**
以独占模式获取,忽略中断。
*通过至少调用一次{@link#tryAcquire},
*回归成功。否则线程将排队,可能
*反复阻塞和解除阻塞。这种方法可以使用
*实现方法{@link Lock#Lock}。
*
* @param arg获取的参数。此值传递给
*/
public final void acquire(int arg) {
//1
if (!tryAcquire(arg) &&
// 2
//3 acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
1 调用tryAcquire() 方法,返回true则结束(tryAcquire一般都是在AQS的子类中实现)
2 如果调用tryAcquire返回的fasle则调用addWaiter() 方法入队,插入到Node节点的末尾。
步骤如下:
1.tryAcquire方法尝试以独占方式获取资源,在AQS中是一个抛出异常的方法,想要调用子类必须重写该方法;
2.state表示资源状态,不同的锁意义不尽相同,获取到资源后,返回true;返回fasle,进入下一个流程。
3.addWaiter()方法;将线程加入队列尾部,并且标记为独占模式。
4.acquireQueued()方法,让线程在等待中获取资源才返回,如果等待过程中被中断则返回true,否则返回fasle。
5.在 4 中如果被中断过他是不响应的,只有在获取资源后才能进行自我中断,设置中断标识
分析注释2
addWaiter()方法
/**
* 加入独占线程节点到队尾上
*/
private Node addWaiter(Node mode) {
//用当前线程创建一个Node结点
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
//尝试快读加入队列成功则返回新的结点Node
//失败,则采用自旋加入结点直至成功返回该节点
Node pred = tail;
//队尾非空
if (pred != null) {
node.prev = pred;
//如果CAS入队尾成功 返回结点
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
//如果队尾为空
//或者通过CAS进入队尾失败,存在竞争
//通过end()方法自旋
enq(node);
return node;
}
enq()方法 自旋方式使当前线程的node节点插入队尾
/**
* Inserts node into queue, initializing if necessary. See picture above.
* @param node the node to insert
* @return node's predecessor
* 进入enq方法有两种情况
* 1.队尾为空
* 2.通过CAS进入队尾失败,存在竞争
*自旋方式使node进入队尾
*/
private Node enq(final Node node) {
//自旋
for (;;) {
//队尾结点
Node t = tail;
//如果队列为空
if (t == null) { // Must initialize
//创建head结点
//CAS设置队列头结点
if (compareAndSetHead(new Node()))
//此时队列只有一个结点
//头结点等于尾结点
tail = head;
} else {
//设置node结点的prev引用指向t
//node.prev = t;
//CAS设置新的队尾结点为node
node.prev = t;
if (compareAndSetTail(t, node)) {
//原队尾结点的next引用指向node
//node就是新的节点tail
t.next = node;
//自旋结束,返回原尾结点
return t;
}
}
}
}
分析注释3处
acquireQueued() 方法:节点加入队列后,尝试在等待队列中自旋获取资源
/**
*结点加入队列后,尝试在等待队列中自旋获取资源
*/
final boolean acquireQueued(final Node node, int arg) {
//标记是否拿到资源
boolean failed = true;
try {
//标记是否中断
boolean interrupted = false;
//自旋
for (;;) {
//node的前驱结点,会抛出NullPointerException
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
//4
if (shouldParkAfterFailedAcquire(p, node) &&
//5
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
/*
* 获取Node节点的前驱节点
*/
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
分析注释4处 shouldParkAfterFailedAcquire() 线程获取资源失败后,判断是否阻塞线程
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
//获得前驱结点的状态
//根据前驱结点的状态判断是否将线程阻塞
int ws = pred.waitStatus;
//如果前驱结点已经被设置为SIGNAL状态
if (ws == Node.SIGNAL)
/**
*前驱结点释放资源后马上唤醒后继节点
*返回true 表示阻塞线程
*/
return true;
//如果前驱结点的线程被撤销,跳过所有的被撤销的pro结点
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
注释5处
/**
*阻塞线程,判断阻塞线程是否中断
*/
private final boolean parkAndCheckInterrupt() {
//park()会让当前线程进入waiting状态
//在此状态下,有两种途径可以唤醒该线程
//1:unpark()
//2:interrupt()
LockSupport.park(this);
//返回线程是否被中断,会清除中断标识
return Thread.interrupted();
}
总结
1:尝试获取资源,如果获取资源成功,tryAcquire()方法返回true,则accquire方法执行结束,否则进入步骤2。
2:如果尝试获取资源失败,tryAccquire方法返回false,则执行addWaiter()方法将线程以结点的方式加到队列的末尾。
3:addWaiter方法也许会存在竞争(并发执行的)造成入队为失败,需要通过自旋方式。
4:入队尾成功后,通过accquireQueued方法尝试再队列中获取资源或者阻塞线程。
5:park方法阻塞线程后,等待前驱结点调用unpark方法或者线程中断来唤醒线程。
6:判断线程是否中断并且维护线程中断的标识。
独占锁解锁过程
/**
*释放独占的资源
*会唤醒等待队列中的其他线程来获取资源
*/
public final boolean release(int arg) {
//如果tryRelease释放资源成功
if (tryRelease(arg)) {
//队列中的头结点
Node h = head;
//头结点非空,并且头结点的waitStatus不等于0
if (h != null && h.waitStatus != 0)
//唤醒后继节点,让后继节点竞争资源
unparkSuccessor(h);
//释放独占的资源成功,放回true
return true;
}
//释放独占资源失败,返回false
return false;
}
/**
* 唤醒后继节点
*/
private void unparkSuccessor(Node node) {
/*
*节点的状态
如果小于0,则肯定不是CANCELLED状态
*/
int ws = node.waitStatus;
//cas将节点状态修改为0
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* 节点node的后继节点
*/
Node s = node.next;
//如果后继节点非空,且状态大于0,即CANCELLED
//说明后继节点的线程取消对资源的等待
if (s == null || s.waitStatus > 0) {
//将后继节点至为空
s = null;
//从队尾结点开始向前遍历
//找到队列中的node节点后第一个等待唤醒的节点
//如果遍历到节点t非空且不等于当前结点node
//则校验节点t的状态
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
总结
1:将node节点的状态设置为0
2:寻找到下一个非取消状态的节点s
3:如果节点s不为null,则调用LockSupport.unpark(s.thread)方法唤醒s所在的线程(唤醒线程也是有顺序的,就是添加到CLH队列的顺序)
AQS共享锁的加锁
AQS获取共享锁是通过调用acquireShared() 这个顶层方法,我们看一下这个方法的源代码:
acquireShared 方法
/**
* 以共享模式获取资源。忽略中断
*/
public final void acquireShared(int arg) {
//该方法只有一个分支,判断是否能获取到共享资源
//能够获取到则返回正数,否则,返回负数
if (tryAcquireShared(arg) < 0)
//没有获取到公共资源,将执行此方法
doAcquireShared(arg);
}
这个方法中if判断,当tryAcquireShared()这个返回值是小于0的时候获取锁失败,进入doAcquireShared()方法。tryAcquireShared方法是用来获取共享模式下的锁,对于tryAcquireShared()这个方法我们重点看一下他的返回值。
当失败的时候返回的是负值,如果返回的是0表示获取共享模式成功但是它下一个节点的共享模式无法获取成功。如果返回的是正数也就是大于0,表示当前线程获取共享模式成功,并且它后面的线程也可以获取共享模式。
doAcquireShared 方法
/**
*加入等待队列
*以共享非中断获取同步状态
*/
private void doAcquireShared(int arg) {
//自旋加入队尾,与独占模式类似,只是传入的参数不同
//独占模式夏安安传入参数node.EXCLUSIVE
//共享模式下传入参数Node.SHARED
final Node node = addWaiter(Node.SHARED);
//标记是否中断
boolean failed = true;
try {
boolean interrupted = false;
//自旋开始
for (;;) {
// 获取Node结点的前驱节点
final Node p = node.predecessor();
//判断结点的前驱节点是否为头节点,如果是,尝试获取共享资源
if (p == head) {
//尝试获取共享资源
int r = tryAcquireShared(arg);
//如果r>=0表示获取共享资源成功
if (r >= 0) {
//将当前结点设置为头结点,检查后继节点是否在共享模式下等待
setHeadAndPropagate(node, r);
//原头结点的next引用置为null,方便GC
p.next = null; // help GC
//如果线程被中断
if (interrupted)
//维护中断状态
selfInterrupt();
//标记failed为fasle
failed = false;
return;
}
}
//与独占模式下操作类似
//校验是否需要阻塞线程,判断中断状态
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
首先调用addWaiter()方法,它主要是封装为Node节点,并且把该节点添加到队列的尾部。此处传入共享模式的参数,节点就变成了共享模式。
当前线程添加到队列后,然后通过自旋(for(;;))获取前驱节点,如果前驱节点是头节点,那么调用tryAcquireShared()方法获取当前节点的状态,注意此方法的返回值在上面已经介绍过,等于0表示不用唤醒后继节点,只有大于0才会唤醒后面的所有节点。
如果获取共享资源成功,调用setHeadAndPropagate方法设置当前节点为头节点,并让原来的头节点出队列。如果在获取锁自旋的过程中中断过,那么将当前线程中断。
如果当前节点的前驱节点不是头节点,通过shouldParkAfterFailedAcquire判断当前线程的状态,如果线程阻塞返回true,否则返回false. parkAndCheckInterrupt方法是指当前线程在获取锁的过程中是否被中断唤醒,如果当前线程状态阻塞并且被中断过那么就把标志为interrupted更新为true。
如果发生异常调用cancelAcquire方法,此方法是把当前节点先更新为取消状态,并清除该节点。
setHeadAndPropagate ()方法
/**
*成为头结点,唤醒后继节点
*/
private void setHeadAndPropagate(Node node, int propagate) {
//获取队列头结点
Node h = head; // Record old head for check below
//把当前结点设置为头结点
setHead(node);
/*
* 三种可以唤醒node的操作
* 1propagate > 0代表后继节点需要被唤醒
* 2原头结点h为空或者ws<0
* 3新的头结点为空或者新的头结点的ws<0
*/
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
//找到当前结点的后继节点s
Node s = node.next;
//s=null或者s是共享模式,调用doReleaseShared方法唤醒后继节点
if (s == null || s.isShared())
doReleaseShared();
}
}
一个是当前节点,一个是tryAcquireShared方法的返回值。从源代码中我们看到它首先记录了当前头节点,然后它通过setHead()方法把当前获取到锁的节点设置为头节点。通过if语句把符合条件的继续唤醒后继节点,如果下一个节点为空那么调用doReleaseShared方法,doReleaseShared方法继续唤醒后面的节点
doReleaseShared()
/**
*共享模式的释放操作
*/
private void doReleaseShared() {
/*
*自旋释放后继节点
*/
for (;;) {
//如果是头结点非空并且头结点不等于尾结点:队列中至少两个节点
Node h = head;
if (h != null && h != tail) {
//获取头结点的waitstatus
int ws = h.waitStatus;
//状态是SIGNAL
if (ws == Node.SIGNAL) {
//变量h有waitStatus通过CAS设置为初始状态0
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
//如果CAS失败,执行continue进入下一个循环
continue; // loop to recheck cases
//如果cas成功
//唤醒后继节点
//执行过程佟独占模式下的唤醒流程
unparkSuccessor(h);
}
//如果h的ws=0就把h设置为PROPAGATE,表示可以向后传播
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
//自旋跳出结果,head不变则跳出自旋,head变化则一直自旋
//如果头结点没有发生变化,表示设置完毕,可以退出循环
//如果头结点发生变化,可能被唤醒的其他结点重新设置了头结点
//这样头结点发生了改变,要进行重试,保证可以传播唤醒信号
if (h == head) // loop if head changed
break;
}
}
当前线程状态如果是Node.SIGNAL,Node.SIGNAL的值是-1,是一个静态常量,此值表示当前线程被挂起。如果当前线程被挂起,那么更新当前线程的状态值为0.如果更新失败那么就继续。更新成功后调用unparkSuccessor()此方法是唤醒共享锁的第一个节点。如果本身头节点属于重置状态waitStatus==0,并且把它设置为传播状态那么就向下一个节点传播。
AQS共享锁的解锁
/**
*共享模式释放资源
*/
public final boolean releaseShared(int arg) {
//尝试释放公共资源
if (tryReleaseShared(arg)) {
//尝试释放资源成功执行,doReleaseShared放发
//同acquireShared方法调用doReleaseShared方法类似
doReleaseShared();
return true;
}
return false;
}
总结
-
多个线程通过调用tryAcquireShared方法获取共享资源,返回值大于等于0则获取资源成功,返回值小于0则获取失败。
-
当前线程获取共享资源失败后,通过调用addWaiter方法把该线程封装为Node节点,并设置该节点为共享模式。然后把该节点添加到队列的尾部。
-
添加到尾部后,判断该节点的上一个节点是不是队列的头节点,如果是头节点,那么该节点的上一个节点出队列并获取共享资源,同时调用setHeadAndPropagate方法把该节点设置为新的头节点,同时唤醒队列中所有共享类型的节点,去获取共享资源。如果获取失败,则再次加入到队列中。
-
如果该节点的前驱节点不是头节点,那么通过for循环进行自旋转等待,直到当前节点的前驱节点是头节点,结束自旋。
以上是关于AQS 原理解析以及源码分析的主要内容,如果未能解决你的问题,请参考以下文章
AQS(AbstractQueuedSynchronizer)源码深度解析—共享式获取锁释放锁的原理一万字