《并发系列一》AbstractQueuedSynchronizer(AQS)- 互斥锁源码剖析
Posted PIGP
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了《并发系列一》AbstractQueuedSynchronizer(AQS)- 互斥锁源码剖析相关的知识,希望对你有一定的参考价值。
本系列将开始讲解并发相关知识,说到并发,当然是从并发最底层的同步器-AbstractQueuedSynchronizer讲起,本文将讲解锁的实现,实现锁的前提条件,主要讲解互斥锁的获取及其释放的原理。从线程尝试获取锁,获取锁失败加入阻塞队列进行自旋并自阻塞,到线程被前驱节点唤醒重新获取锁,线程中断的中断补偿,获取锁过程中的异常处理-清除线程节点,执行完并发操作之后锁的释放进行详细的讲解。
如何实现一个锁
想象如果是自己设计一个锁的话,应该如何设计。首先我们必须有一个公共操作对象,用于标记锁的获取与释放,其次要有一个队列,用于记录获取锁失败而等待的线程。当锁释放时,需要通知等待队列中线程,让线程重新尝试获取锁。由此设计一个锁必须要有公共可操作对象与等待队列
公共可操作的对象
为了能让多线程能够有秩序(可以互斥可以共享)访问有限资源,就必须存在一个让所有线程都可以访问并修改的公共对象,通过该公共对象,线程可以知道是否还有剩余资源可以使用或者资源已经耗尽,线程进入等待队列进行等待。线程访问有限资源的规则,完全有该公共对象的值所决定。在AQS中使用int类型state变量来担任此角色,同时为了保证各个线程间的可见性,使用volatile关键字进行修饰,其源码如下:
private volatile int state;
等待队列
当线程不能获取到有限资源时,需要进入等待队列进行等待。每一个等待线程都会被封装到等待队列的节点中。此外锁的类型也会体现在等待节点上,使用互斥锁与共享锁进行标志。在线程等待的过程中线程会发生各种各样的情况,例如被中断或者发生异常等等,为了表示不同的情况,每一个节点都对应不同的状态。在AQS中,定义了内部类Node类定义节点信息,其代码如下
static final class Node {
//两种节点类型 共享与互斥 SHARED、EXCLUSIVE, nextWaiter属性表明节点的类型
static final Node SHARED = new Node();
static final Node EXCLUSIVE = null;
//四种节点的状态 取消、可唤醒、condition等待、传播, waitStatus属性表明节点的状态
static final int CANCELLED = 1; // >0 表明节点状态不正常,该节点应该被删除
static final int SIGNAL = -1; // 前一个节点释放锁之后,该节点可以获取锁
static final int CONDITION = -2; // 线程因为等待某一条件而进入condition等待队列
static final int PROPAGATE = -3; // 共享锁模式下,当某一节点释放时,可以唤醒多个节点,该状态表明可以
// 表明节点的装填
volatile int waitStatus;
volatile Node prev;
volatile Node next;
//被阻塞的线程
volatile Thread thread;
//表明节点的类型,共享或者独占
Node nextWaiter;
}
由Node节点的定义可知,该等待队列为双向链表,此外为了方便操作,定义了头结点和尾节点,其中AQS的数据结构定义如下:
//队列头节点,代表获取锁的线程
private transient volatile Node head;
//队列尾节点,尾节点需要CAS设值
private transient volatile Node tail;
//公共的状态,用户可以根据该值进行自定义的扩展
private volatile int state;
独占锁-获取
独占锁或者叫互斥锁,意思就是同一时刻只能有一个线程访问临界资源。 对于独占式锁,一般调用的是AQS的acquire()方法,源码如下:
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
其中操作流程如下: (1) 尝试获取锁 tryAcquire(arg)方法,该方法没有实现,留给用户扩展使用,如果获取锁则返回,如果获取不到锁执行下一步 (2) 构建一个类型为独占模式的节点,并将节点添加到等待队列 addWaiter方法 (3) 自旋获取锁,如果满足会阻塞线程 acquireQueued()方法 (4)当线程获取锁之后会执行selfInterrupt(), 目的是看一下在线程阻塞的过程中,有没有中断标记,如果存在中断标记,说明发生过中断,那么就需要将中断给补上。可以看出,该种方式的获取锁,会响应中断,但是该中断是在获取锁之后补上的,中断的实时性差,也可以说该方式获取锁进入等待队列等待时是不支持中断的针对上述获取锁的逻辑中,我们主要看一下(2)、(3)两步
addWaiter操作
首先看一下addWaiter()方法,源码如下:
private Node addWaiter(Node mode) {
//构建类型为mode的节点
Node node = new Node(Thread.currentThread(), mode);
Node pred = tail;
//如果tail不为null,表明等待队列不空,将node放到tail的后面即可
if (pred != null) {
node.prev = pred;
//此处会有并发操作,因此需要CAS设置
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
该方法主要是保证新构造的节点放入等待队列中,在等待队列不为空的情况下,首先执行一次CAS操作将tail设置为该node,如果成功则返回,如果不成功就执行enq(),enq()中处理了两种情况,一种为等待队列为空的情况,一种为等待队列非空的情况,其源码如下:
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) {
//队列为空时,设置头结点为并发操作,需要CAS设置
//这里注意的是设置的头结点为新初始化的一个节点,不是传入的node
if (compareAndSetHead(new Node()))
tail = head;
} else {
//等待队列非空时,一直循环添加到等待队列的尾部即可,直至成功返回
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
经过上面的操作,获取不到锁的线程将会被构建为一个节点,并将节点放到等待队列的尾部这里需要注意的是,当等待队列为空时,要先创建一个无意义的节点,代表正在获取锁的线程所处的节点(1) 往空的等待队列中添加节点的示意图如下,其中head节点为新创建的节点:(2) 往非空的等待队列中添加节点的示意图如下:
acquireQueued操作
首先贴一下acquireQueued操作的源码,代码如下:
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
//前驱为head的话,说明下一个被唤醒就应该是自己,可以尝试获取锁
//也有可能是头结点唤醒了该节点,该节点尝试获取锁
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
//判断是否可以阻塞
//进行阻塞
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
其中操作流程如下: (1)定义了两个状态,failed是否成功,如果失败需要清楚该节点,调用cancelAcquire()方法;interrupted记录是否产生了中断,中断补偿就是靠这个返回值进行判断是否需要补偿 (2)获取当前节点的前驱节点,如果前驱是head,则尝试获取锁,如果获取成功,则设置自己的为head,方法返回。只有当该节点(确切的说是线程)获取了锁之后,该方法才会返回,否则会一直在该方法内循环或者阻塞(3)如果获取锁失败,则进行判断是否满足阻塞的条件,如果符合阻塞的条件就执行park()函数进行阻塞,如果不符合阻塞条件,则进行自旋尝试获取锁,是否满足阻塞条件由shouldParkAfterFailedAcquire()方法判断 (4)如果满足阻塞条件,则进行则塞。当阻塞被唤醒后,检查中断信号,对应于parkAndCheckInterrupt()方法
该操作中涉及的比较重要的操作包括shouldParkAfterFailedAcquire()、parkAndCheckInterrupt()、cancelAcquire()方法,下面将详细的讲解:
shouldParkAfterFailedAcquire操作
该操作主要是根据前驱节点的状态,来决定该节点线程是否阻塞,主要分为两种情况: (1) 如果pre状态为signal,表明前驱节点释放锁时会唤醒自己,自己可以放心的阻塞即可,因此返回true (2)如果pre状态为cancel,表明前驱节点为取消状态,应该删除,这里会往前找到第一个状态<=0的节点作为前驱节点,并删除中间取消的节点,由于不知道新的前驱节点的状态,因此让改线程自旋重新走一遍流程 (3)如果pre状态为其他状态,则使用CAS操作设置状态为Signal,表明pre释放后会唤醒自己,但该操作不一定成功,因此通过自旋操作重新执行之前的流程 其源码如下所示:
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
//如果前驱节点状态为signal,表明前驱节点释放后就会唤醒node,直接返回true,进入堵塞
if (ws == Node.SIGNAL)
return true;
//说明前驱节点已经被取消,删除取消的节点,并找到一个未被取消的前驱,返回false,进行重新判断
//因为不知道新的前驱节点是否为头节点,所以重新执行之前的流程
if (ws > 0) {
do {
//讲道理,每一个节点往前找到第一个非取消的节点是不会出现交叉的情况,因此设置prev和next时可以直接赋值
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
//其他情况,就是将前驱节点状态修改为signal,表明前驱节点释放后,可以唤醒node,该操作不一定成功,自旋重新判断
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
parkAndCheckInterrupt操作:
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
进行阻塞,阻塞被唤醒之后检查是否被中断,并擦除中断标记
cancelAcquire操作 在acquireQueued操作中,是保证线程一定要获取锁之后才会退出的,如果线程在没有获取锁的情况下退出了,我们就会认为这是一种失败状态,当然要对节点进行清理,其中源码如下:
private void cancelAcquire(Node node) {
if (node == null)
return;
//线程类引用置空
node.thread = null;
//找到前驱节点,如果前驱节点状态为取消,同样找到第一个前驱节点状态正常的节点作为前驱
Node pred = node.prev;
while (pred.waitStatus > 0)
//这里不会出现线程安全问题吗????
//这里不会出现线程安全问题,因为修改节点的指针前驱指针只会自己的线程会修改,
//修改后继指针的活就需要使用CAS安全的修改了
node.prev = pred = pred.prev;
//获取到前驱节点的后继节点,用于CAS设置后继节点时使用
Node predNext = pred.next;
//节点的状态设置为取消
node.waitStatus = Node.CANCELLED;
// 如果节点是队尾节点,则CAS设置pre为尾节点,并CAS设置后继节点为null
if (node == tail && compareAndSetTail(node, pred)) {
//修改失败说明队列结构发生了变化,不用关心
compareAndSetNext(pred, predNext, null);
} else {
//节点不为尾节点
int ws;
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) {
//节点不为头节点
Node next = node.next;
if (next != null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next);
} else {
//如果节点为头节点,那么需要唤醒后继
unparkSuccessor(node);
}
node.next = node; // help GC
}
}
在清理node的过程中,包括以下几点: (1)节点的状态信息修改,线程置空,结点断开 (2)在节点断开的过程中,删除了前驱中取消的节点 (3)如果该节点是尾节点,则CAS重置tail,cas重置tail的next为空(这里允许失败,失败了代表尾节点更新了);如果该节点不为尾节点且不为头结点且prev节点的状态为signal或者设置成功了sinal,那么只需删除node即可,无需其他的操作(删除节点采用CAS操作,如果删除的过程中prev的next节点发生了变化,则不修改,因为prev节点的next发生了变化就不能说明从prev节点到node节点中间的所有节点都为取消节点,操作严谨了一些,但是 这个地方貌似与shouldParkAfterFailedAcquire操作中的删除取消节点的操作有所冲突,shouldParkAfterFailedAcquire操作中就没有使用CAS进行设置next节点,如果大家知道原因可以留言告诉我);如果该节点为头结点或者prev节点的状态不为signal那么立即唤醒后继,让后继节点自己判断自己的后续行为,实际上去执行自旋操作。 (4) 唤醒后继节点操作unparkSuccessor(), 如果当前节点的状态小于0,那么将状态置0(cancelAcquire过来的节点状态肯定不是小于0的,因为已经设置为取消了。这里将节点状态设置为0的意义是什么不是很清楚,设置成功与否也没有关系。是为了保证节点状态的意义???),然后找到第一个状态正常的后继,unpark后继节点,代码如下:
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
//找到第一个状态正常的后继节点
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
//如果后继节点不为null,则唤醒
if (s != null)
LockSupport.unpark(s.thread);
}
独占锁-释放
锁的释放主要讲state的值进行操作,唤醒后继节点,在AQS中对应release()方法,源码如下:
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
其中tryRelease()方法为保留方法,没有实现,留给用户扩展。如果释放锁成功,则唤醒后继。为什么唤醒后继的条件为(h != null && h.waitStatus != 0)?因为h!=null代表等待队列存在,h.waitStatus!=0,正常情况下,后继节点阻塞,头节点一定为signal,才会唤醒后继节点,从该处可以看出不仅仅是只有状态为signal时才会唤醒后继节点
总结
独占锁在获取锁调用的是acquire(int arg)方法, 释放锁调用release(int arg) 方法,在这两个方法中,为了实现不同的独占锁,坐着在实现时空出了获取锁与释放锁的方法tryAcquire(int arg)方法与tryRelease(int arg)方法,这两个方法的返回值都是boolean类型,表明获取锁成功与释放锁成功。在下一节中,我们会通过AQS实现一个我们自己的独占锁,并讲解锁的可重入性。
以上是关于《并发系列一》AbstractQueuedSynchronizer(AQS)- 互斥锁源码剖析的主要内容,如果未能解决你的问题,请参考以下文章
我肝了整整一年才肝出来这份 Java 并发的系列文章,无私奉献给你们