Java 并发之AbstractQueuedSynchronizer(AQS)源码解析
Posted 后台开发学习
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Java 并发之AbstractQueuedSynchronizer(AQS)源码解析相关的知识,希望对你有一定的参考价值。
关键字:CLH
,Node
,线程
,waitStatus
,CAS
,中断
目录
图解AQS的操作细节
0、前言
1、基本概念
1.1、CAS自旋
1.2、Node
1.3、CLH & AQS
1.4、ReentrantLock
2、图解AQS
2.1、线程A单独运行
2.2、线程B开始运行
2.3、线程C开始运行
2.4、线程A停止运行,线程B继续运行
2.5.1、线程B停止运行,线程C继续运行
2.5.2、线程C放弃竞争
3、问题总结
3.1、为什么在unparkSuccessor操作中从尾节点开始扫描
3.2、公平锁中的hasQueuedPredecessors操作为什么从tail开始
3.3、当前运行的线程在哪里
3.4、节点的waitStatus和AQS的state的区别
4、参考链接
CountDownLatch ==> Java 并发之CountDownLatch 计数器 操作图解细节
0、前言
AQS 全称 AbstractQueuedSynchronizer,中文简称是同步器,是 jdk1.5开始放在JUC中的,是java中关于线程、同步操作的基础组件,JUC作者Doug Lea AQS简介论文,ReentrantLock,ReentrantReadWriteLock,CountDownLatch等都是基于AQS开发的复合特定场景的锁
先介绍AQS中的基本概念,然后具体使用3个线程的操作过程一步一步分析AQS中的操作细节,配合着分析AQS源码,了解其中的原理。本文只分析互斥、非公平的方式竞争的情况
1、基本概念
1.1、CAS自旋
CAS 是Compare And Swap的简称,具有单一变量的原子操作特性,对比成功后进行交换操作,他是乐观操作,期间会无限循环操作,直到对比成功,然后进行后续交互操作
CAS 包含了三个操作数据,内存位置V、预期值A、新预期值B,如果当前内存V存放的数据和A一样,就认为比较成功,然后把当前V所在的位置设置为B
因为会无限循环操作,所以可能导致CPU效率低下,而且运行中还会导致ABA问题,也就是A->B->A的问题,误以为此时数据未发生变化,其实中间已经发生变化。该问题在java中提供了类AtomicStampedReference解决该问题,先会查看当前引用值是否符合期望,ABA也会变成1A->2B->3A,这样很清楚的感知到发生变化了。
1.2、Node
Node 是AQS中的双向链表的节点信息,主要信息如下
// 共享模式
static final Node SHARED = new Node();
// 互斥模式(独斥)
static final Node EXCLUSIVE = null;
// 当前线程取消竞争
static final int CANCELLED = 1;
// 后置节点等待被唤醒
static final int SIGNAL = -1;
// 暂时不明含义
/** waitStatus value to indicate thread is waiting on condition */
static final int CONDITION = -2;
// 和共享有关
static final int PROPAGATE = -3;
// 最重要的信息,就是为上面的1,-1,-2,-3,以及0
volatile int waitStatus;
// 前置节点
volatile Node prev;
// 后置节点
volatile Node next;
// 节点存储的线程信息,会被挂起和唤醒
volatile Thread thread;
// 和condition 有关
Node nextWaiter;
1.3、CLH & AQS
在AQS中所有没有抢占到锁的线程最后都会被存储到CLH双向链表中,然后挂起等待被唤醒
// CLH队列的头结点,其不包含线程信息,head永远为null
private transient volatile Node head;
// CLH队列的尾节点,每次新加一个节点都会尾插到最后
private transient volatile Node tail;
// 当前锁被占据的次数,因为可以被一个线程重复占据,所以其值可以大于0
// 没有线程占据,其值就是0
private volatile int state;
// 当前运行的线程,也是占据锁的线程,注意和CLH中的线程无关
private transient Thread exclusiveOwnerThread;
1.4、ReentrantLock
基于AQS开发的可重入、互斥锁,并且可以自行响应线程中断,而基于底层操作系统实现线程安全的synchronized关键字却不没有该功能
Lock lock = new ReentrantLock();
// 可以通过传入的boolean值设置其为公平方式还是不公平方式占据锁
lock.lock();
try {
// 业务代码,线程互斥
}
finally {
lock.unlock();
}
2、图解AQS
现在待运行的三个线程ABC,将会按照如下情况依次执行,三个线程故意设置耗时很长从而出现互斥的情况,必然出现锁竞争,自旋的情况,根据其运行情况,分析AQS的运行过程和原理,
2.1、线程A单独运行
不存在任何存在的竞争问题,A线程获取锁成功,A线程进入到了运行中的情况,此时的AQS执行时,tryAcquire直接成功,看看非公平方式的tryAcquire操作源码
ReentrantLock 类
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
// 获取当前线程,此处肯定是A线程了
int c = getState();
if (c == 0) {
// 第一次运行,state=0
if (compareAndSetState(0, acquires)) {
// 1、利用CAS自旋方式,判断当前state确实为0,然后设置成acquire(1)
// 这是原子性的操作,可以保证线程安全
setExclusiveOwnerThread(current);
// 设置当前执行的线程,直接返回为true
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
// 2、当前的线程和执行中的线程是同一个,也就意味着可重入操作
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
// 表示当前锁被1个线程重复获取了nextc次
return true;
}
// 否则就是返回false,表示没有尝试成功获取当前锁
return false;
}
此时A线程直接在1处被允许,然后设置了当前锁被占据的线程,直接返回,A线程正常运行
2.2、线程B开始运行
现在A线程一直在运行中,B线程开始运行,同样的B线程也要开始尝试获取当前锁,经过tryAcquire操作肯定返回false,获取当前锁失败,原因可同理分析上面1.1的代码块,现在的情形是
A线程:继续运行
B线程:获取锁失败,计划进入到CLH队列中
这时候B线程执行的线路是acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
,先添加一个互斥节点,然后唤醒可以执行的节点线程,这个时候的CLH队列是初始值状态,都指向null
先看看添加互斥节点的代码
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// 新建一个节点,包装的线程是当前线程,传入的mode是互斥的
// 当前线程是B线程
Node pred = tail;
// 此时尾节点tail=null
if (pred != null) {
// 当尾节点不为null,也就是CLH存在节点数据时
node.prev = pred;
if (compareAndSetTail(pred, node)) {
// 又是CAS操作,判断当前pred节点是否是尾节点,如果是,设置node为尾节点,否则跳过
pred.next = node;
// 双向链表的基本操作,把当前节点当做尾部节点插入
return node;
}
}
// 其实上面这一段代码的重点是**快速**,假定当前不存在互斥或者冲突,能够高效的添加节点成功
enq(node);
// 这个时候B线程创建好一个节点,添加到尾部
return node;
// 然后返回新创建的节点
}
private Node enq(final Node node) {
for (;;) {
// 死循环,可用严格确保可用完整插入到队列尾部
Node t = tail;
if (t == null) {
// 1、如果当前尾部是null,这个地方就是初始化的CLH队列的地方
// B线程肯定会执行到这个地方
if (compareAndSetHead(new Node()))
// CAS自旋方式,设置当前队列头部节点为一个new Node(不包含线程信息)
tail = head;
// 头部和尾部指向同一个新建的new Node 节点
// 注意此时没有退出。。。。只是完成if操作,会继续for循环操作
} else {
// 2、第二次循环到这里,CLH队列只有一个节点new Node
node.prev = t;
if (compareAndSetTail(t, node)) {
// 利用CAS的方式,配合双写链表的连接方式,添加一个节点到尾部
t.next = node;
// 然后返回原尾部节点
return t;
}
}
}
}
经历过上面的代码块的1、2两个步骤时的CLH队列情况是
步骤1 | 步骤2 |
---|---|
|
|
需要注意到head指向的节点线程为null!
再来到acquireQueued操作,去唤醒可能执行的节点线程
final boolean acquireQueued(final Node node, int arg) {
// 这个时候node节点是上图中的tail节点,也就是线程B所在的节点,arg=1
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
// 查看当前node节点的前置节点p
if (p == head && tryAcquire(arg)) {
// 如果前置节点是头部节点,B线程进行尝试获取锁
// 如果!是如果!
// 尝试获取成功了,也就是A线程执行完成,然后设置当前当前可执行的节点是B线程
setHead(node);
// 把node节点的线程设置为null,然后head指到node上
p.next = null; // help GC
// 回收操作
failed = false;
// 返回,然后将执行的将会是B线程
return interrupted;
}
// 按照我们的线程ABC的样例,上面的if肯定不会执行
// 1、因为即使node的前节点是head,但是也无法获取到锁,因为一直被A线程占有者
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
// 如果shouldParkAfterFailedAcquire返回true,则挂起,否则不挂起线程
interrupted = true;
// 2、第一次循环设置了p节点状态位-1,然后B线程循环至这里被挂起
}
} finally {
if (failed)
cancelAcquire(node);
}
}
// 返回true,挂起,返回false则不挂起
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
// 前节点的状态=-1,返回true,直接挂起即可
return true;
if (ws > 0) {
// 前置节点状态有>0,说明前驱节点取消了排队,移除掉该节点
do {
node.prev = pred = pred.prev;
// 拆分为2步
// pred = pred.prev
// node.prev = pred
} while (pred.waitStatus > 0);
// 当前节点开始往前扫描,从双向队列中除去>0的节点,直到<=0
pred.next = node;
} else {
// 利用CAS设置前节点状态位-1
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
// 总结一下该方法用途:当前节点的前驱节点状态位-1时,挂起当前线程
// 返回false时,也就是不需要挂起,返回true,则需要调用parkAndCheckInterrupt挂起当前线程
运行到现在,B线程挂起,A线程继续在运行
2.3、线程C开始运行
线程C开始运行,如下图
现在的线程情况是
线程A:运行中
线程B:挂起,暂停运行
线程C:开始运行
很明显,C线程会在tryacquire(1)方法中失败,然后同样通过addWaiter方法添加一个节点放在尾部,使得经过addWaiter方法之后的队列成为
又需要经过acquireQueued方法的处理,但是此时此时的节点是线程C,其前置节点是线程B,不是head,直接运行到shouldParkAfterFailedAcquire方法中
此时B线程所在的节点waitStatus=0,只能通过CAS方法设置B节点waitStatus为SIGHNAL(-1),,并且之后又经过1次循环操作,最后返回true,此时队列为
然后同理经过parkAndCheckInterrupt,线程C也被中断了
现在的情况是
线程A:运行中
线程B:挂起,暂停运行
线程C:挂起,暂停运行
2.4、线程A停止运行,线程B继续运行
现在A运行完成了,需要释放占据的锁
ReentrantLock 类
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
// 当前运行的线程和锁住的线程不是同一个,抛出监控状态异常错误
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
// 如果为0,则意味着这时可以释放锁了
free = true;
setExclusiveOwnerThread(null);
// 设置当前的占据锁的线程为null,给其他线程一个机会
}
setState(c);
// 存在重入的机会,所以不一定就是释放该锁
return free;
}
AbstractQueuedSynchronizer 类
public final boolean release(int arg) {
if (tryRelease(arg)) {
// 返回true,则表示可以唤醒其他挂起的线程,通知他们可以干活了
Node h = head;
if (h != null && h.waitStatus != 0)
// 如果头部节点有效,则开始唤醒CLH队列的节点线程
unparkSuccessor(h);
return true;
}
return false;
}
private void unparkSuccessor(Node node) {
// node节点是head,在我们上面已经说明了head节点的waitStatus=-1
int ws = node.waitStatus;
if (ws < 0)
// 1、设置head节点状态为0
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;
// 从head下一个节点开始处理
if (s == null || s.waitStatus > 0) {
s = null;
// 如果下一个节点为null,或者下一个节点状态>0(为废弃节点)
// 从尾节点开始扫描,直到扫描到距离head最近的一个<=0的节点信息
// 在我们当前的例子中,肯定就是线程B
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
// 样例中,s节点就是B线程所在的节点
// 2、唤醒s.thread ,会触发acquireQueued的循环继续执行
LockSupport.unpark(s.thread);
}
经过上面步骤1之后,head节点就被设置为了0
线程B经过LockSupport.unpark操作之后,原本挂起在acquireQueued的循环中,会继续执行
for (;;) {
final Node p = node.predecessor();
// 查看当前node节点的前置节点p
if (p == head && tryAcquire(arg)) {
// 如果前置节点是头部节点,B线程进行尝试获取锁
// 如果!是如果!
// 尝试获取成功了,也就是A线程执行完成,然后设置当前当前可执行的节点是B线程
setHead(node);
// 把node节点的线程设置为null,然后head指到node上
p.next = null; // help GC
// 回收操作
failed = false;
// 返回,然后将执行的将会是B线程
return interrupted;
}
恰好当前B线程的前一个节点p是head,而且线程A也已经释放了,没有占据锁,所以在tryAcquire操作时,可以顺利获取到锁,然后把B线程节点当做head,并且把node节点(原head节点)前置和后置都设置为null
其中图中的双向链表的1、2、3依次断开,原head节点被GC回收,新的CLH队列为
现在的情况是
线程A:运行结束
线程B:运行中
线程C:挂起,暂停运行
2.5.1、线程B停止运行,线程C继续运行
现在的情况是
线程A:运行结束
线程B:运行结束
线程C:运行中
按照1.4小节的套路,最后来到了unparkSuccessor 方法中
设置当前头部节点head的waitStatus为0,然后唤醒线程C,把head节点数据全部清除掉,head和tail将会指向同一个节点,如下图,最后线程C在运行,CLH队列只剩下一个节点,head和tail同时指向该节点。
直到线程C运行完,CLH队列依旧如此,相当于完成了CLH队列的初始化,当下一次又来互斥线程时,又会开始创建节点
2.5.2、线程C放弃竞争
现在模拟的是B线程继续运行,而在CLH中的线程C放弃了竞争这种场景
在上述的操作中是无法直接放弃竞争的,接下来看看是为什么,直接来到线程挂起和唤醒的代码快
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
// 利用另外的线程对当前线程触发中断请求,会直接返回当前中断状态,也就是true
// 那么在acquireQueued 函数中 interrupted 的值被赋值为true
// 其他有任何的改变...
}
只是用来标记曾经被中断过,但是未有任何实际上的放弃竞争的操作
不过上面的finally代码块中有采用cancelAcquire 方法,不过一般情况下这个并不会被执行,而是一种兜底方案
,除非出现了一些意外情况
那么我们该如何确保真的进行放弃竞争操作呢,不采用lock.lock
方法,而是lock.lockInterruptibly()
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
// 有线程中断,直接抛出中断异常
throw new InterruptedException();
if (!tryAcquire(arg))
doAcquireInterruptibly(arg);
}
private void doAcquireInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.EXCLUSIVE);
// 尾插节点成功
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
// 重点在这里,一旦出现了异常,则立马抛出该异常情况
// 此时failed 字段肯定是true了
// 需要进入到下面的cancelAcquire 方法中去取消竞争了
throw new InterruptedException();
}
} finally {
if (failed)
// 开始取消竞争
cancelAcquire(node);
}
}
在上面的学习知道,设置节点状态为Node.CANCELLED,然后在后续的队列清理中会清除该节点的,那现在看看是如何主动取消竞争的
private void cancelAcquire(Node node) {
// Ignore if node doesn‘t exist
if (node == null)
return;
node.thread = null;
// 节点线程数据设置为null
Node pred = node.prev;
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
// 从当前节点的前置节点开始扫描去掉废弃的节点,这样操作不会出现并发问题
Node predNext = pred.next;
// 设置为取消状态!
node.waitStatus = Node.CANCELLED;
// If we are the tail, remove ourselves.
if (node == tail && compareAndSetTail(node, pred)) {
// node是尾部节点,而且通过CAS自旋成功,设置pred为tail
// pred的next为nul,可以彻底和node 节点切割,node节点等待GC回收
compareAndSetNext(pred, predNext, null);
} else {
// If successor needs signal, try to set pred‘s next-link
// so it will get one. Otherwise wake it up to propagate.
int ws;
if (pred != head &&
// 前节点不是头部节点,而且包含的线程不为null(上面就有操作设置线程为null的情况)
// 或者节点状态可以设置为-1
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) {
// 这样说么下个节点是可以串在一起的!
Node next = node.next;
if (next != null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next);
// CAS 操作,设置后置节点
// 这个操作就相当去双写链表去掉node节点
} else {
unparkSuccessor(node);
// 唤醒node的后置节点
// 注意!此时node节点还在链表中
// 他的真正移除是依靠node后置节点的shouldParkAfterFailedAcquire 去移除节点操作完成的
}
node.next = node; // help GC
}
}
总结一下
- 如果尾部节点或者中间节点就依靠从链表中脱离操作实现放弃竞争
- 如果是头部的下一个节点,是依靠唤醒下一个节点后进行的链表清楚操作完成
这样就很清楚线程是如何取消竞争操作的
3、问题总结
上面的case只是各种线程执行方式的一种,实际场景中包含了各种各样的运行方式,每个线程执行的时间也长短不一,现在讨论几个常见的问题
3.1、为什么在unparkSuccessor操作中从尾节点开始扫描
这个问题需要回头观察CLH队列是如何创建的,来到addWaiter方法
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
// 通过CAS 操作后node节点为tail
// 并且早就设置了node.prev = pred
// 1、也就是前置的链表已经绑定完毕,后置的链表还未绑定
pred.next = node;
return node;
}
}
在代码块1处执行完成之后,还未执行绑定后置链表操作,如果出现唤醒操作,从头部开始遍历存在一定几率使得无法获取到当前最新的节点(因为后置链表还未绑定,无法通过.next获取到下一个节点信息),而从尾部开始扫描则不会导致该问题
很巧妙的利用尾部扫描避免了可能的并发问题
3.2、公平锁中的hasQueuedPredecessors操作为什么从tail开始
public final boolean hasQueuedPredecessors() {
Node t = tail; // Read fields in reverse initialization order
Node h = head;
// 先读取tail节点,再读取head节点
Node s;
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}
在思考这个问题之前,需要时刻认为CLH队列一直都在动态的变化中,head和tail随时会发生变化
在构建CLH链表时,是先设置head,然后设置tail,那么如果有null的情况只有head!=null && tail==null
一旦在这个时刻添加尾节点成功,head.next 的值其实已经变化了,但是如果先获取head可能存在null的情况,所以先获取tail再获取head
3.3、当前运行的线程在哪里
不存放在CLH队列中,样例图中也并未展示出来,其一直处于运行中,主要CLH队列的head节点线程为null
3.4、节点的waitStatus和AQS的state的区别
两者表示的含义完全不一样,节点的waitStatus是表示节点状态,简洁的表达线程的状态,当其值大于0就认为该节点线程放弃竞争锁
AQS中的state表示该锁被占据的次数,某一时刻只能被一个线程一次或者多次占据,其他线程只能被挂起等待
4、参考链接
- https://javadoop.com/2017/07/20/AbstractQueuedSynchronizer 其中的一个点一直没想清楚,启发很大
作者:jwfy
链接:https://www.jianshu.com/p/282bdb57e343
以上是关于Java 并发之AbstractQueuedSynchronizer(AQS)源码解析的主要内容,如果未能解决你的问题,请参考以下文章
Java并发编程:并发容器之ConcurrentHashMap(转载)
Java并发编程:并发容器之ConcurrentHashMap
Java并发编程:并发容器之CopyOnWriteArrayList