AQS之lock源码分析
Posted 码农后勤部部子
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了AQS之lock源码分析相关的知识,希望对你有一定的参考价值。
文章来源:若水
AQS(AbstractQueuedSynchronizer)是JAVA中众多锁 以及并发⼯具的基础,其底层采⽤ 乐观锁,⼤量使⽤了CAS操作,并且在冲突时,采⽤⾃旋⽅式重试。AQS 虽然被定义为抽象类,但事实上它并不包含任何抽象⽅法,AQS将⼀些需要⼦类覆写的⽅法都设计成protect⽅法(即当前类和⼦类可以调⽤),将其默认实现抛出UnsupportedOperationException异常,如果⼦类使⽤这些⽅法,但是没有覆写,则会抛出异常。如果⼦类没有使⽤到这些⽅法,则不需要任何操作。
并发⼯具套路:状态,队列,CAS状态:由于状态是全局共享,⼀般会被设置成volatile 类型,以保证其修改的可⻅性。队列:队列通常⼀个等待的集合,⼤多数以链表的形式表现,队列采⽤是悲观锁的思想,表示当前所等待的资源、状态或条件短时间内⽆法满⾜,因此,会将当前线程包装成某类型的数据结构,扔到⼀个等待队列中,当⼀定条件满⾜后,再从等待队列中取出。CAS:cas操作是最轻量的并发处理,通常我们对于状态的修改都会⽤到CAS操作,因为状态可能被多个线程同时修改,CAS操作保证同⼀时刻,只有⼀个 线程能修改成功,保证线程安全,CAS操作基本由Unsafe⼯具类的compareAndSwapXXX来实现的,cas采⽤ 乐观锁的思想,因此伴随⾃旋,如果发现⽆法成功,则不断重试,直到成功,⾃旋的表现形式通常 是死循环 for( ;; )
AQS - 状态
AQS - 状态/** * The synchronization state. */private volatile int state;
独占锁 同⼀时刻,锁只能被⼀个线程所持有,通过state变量是否为0,判断当前锁是否被占⽤。
在监视器锁中,我们⽤ObjectMonitor对象的_owner属性记录了当前拥有监视器锁的线程。
在AQS中,我们通过 exclusiveOwnerThread 属性:
/**
* The current owner of exclusive mode synchronization.
*/
private transient Thread exclusiveOwnerThread;
// 继承⾃ AbstractOwnableSynchronizer
setExclusiveOwnerThread(Thread.currentThread());
exclusiveOwnerThread 属性值即为当前持有锁的线程。
队列
AQS中,队列的实现是⼀个双向链表,被称为sync queue,表示所有等待锁的线程集合,有点类似于synchronized 原理的wait set
并发编程中使⽤队列 将当前线程包装成某类型的数据结构 扔到等待队列中,
static final class Node {
……….
volatile Thread thread;// 节点所代表的线程
// 双向链表,每个节点需要保存⾃⼰的前驱节点和后继节点的引⽤
volatile Node prev;
volatile Node next;
// 线程所处的等待锁的状态, 初始化,该值为0
volatile int waitStatus;
/** waitStatus value to indicate thread has cancelled */
static final int CANCELLED = 1;
/** waitStatus value to indicate successor's thread needs
unparking */
static final int SIGNAL = -1;
/** waitStatus value to indicate thread is waiting on condition */
static final int CONDITION = -2;
/**
* waitStatus value to indicate the next acquireShared should
* unconditionally propagate
*/
static final int PROPAGATE = -3;
// 该属性⽤于条件队列或者共享锁
Node nextWaiter; …………. }
说明:Node类中有⼀个状态变量waitStatus,表示当前Node所代表的线程的等待锁状态。在独占锁模式下,我们只需要关注CANCELLED SIGNAL 两种状态(CANCELLED 线程被取消了,SIGNAL 线程需要被唤醒,CONDITION 线程在条件队列⾥⾯等待,PROPAGATE 释放共享资源时需要通知其他节点)。nextWaiter 属性,它在独占锁模式下永远为null。AQS队列 是双向链表,需要头节点和尾节点:
// 头节点,不代表任何线程,是⼀个哑节点
private transient volatile Node head; /
/ 尾节点,每⼀个请求锁的线程会加到队尾
private transient volatile Node tail;
AQS中队列是⼀个CLH队列,它的head节点永远是⼀个哑节点(dummy node),不代表任何线程(某些情况下可以看作代表了当前持有锁的线程),因此head所指向的Node的thread属性永远是null。
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
说明:
⼊队操作:当⼀个线程获取锁失败后该线程会被转换成Node 节点,使⽤enq(fifinal Node node) ⽅法将该节点插⼊到AQS阻塞队列⾥。
compareAndSetHead(new Node()) 初始化head (哑节点,不代表任何线程即thread属性永远是null )
当前 线程没有抢到锁被封装成Node扔到队列中,即使队列是空的,也不会排在第⼆个。
总结
thread: 表示当前Node所表示的线程
waitStatus:表示节点所处的等待状态,共享锁模式只需关注三种状态:SIGNAL=-1
CANCELLED=1 初始态(0)
prev next: 节点的前驱和后继
nextWaiter:进作为标记,值永远null, 表示当前处于独占锁模式
CAS 操作
CAS⽤来改变状态,⼀般 在静态代码块中初始化需要CAS操作的属性的偏移量
private static final Unsafe unsafe = Unsafe.getUnsafe();
private static final long stateOffset;
private static final long headOffset;
private static final long tailOffset;
private static final long waitStatusOffset;
private static final long nextOffset;
static {
try {
stateOffset = unsafe.objectFieldOffset
(AbstractQueuedSynchronizer.class.getDeclaredField("state"));
headOffset = unsafe.objectFieldOffset
(AbstractQueuedSynchronizer.class.getDeclaredField("head"));
tailOffset = unsafe.objectFieldOffset
(AbstractQueuedSynchronizer.class.getDeclaredField("tail"));
waitStatusOffset = unsafe.objectFieldOffset
(Node.class.getDeclaredField("waitStatus"));
nextOffset = unsafe.objectFieldOffset
(Node.class.getDeclaredField("next"));
} catch (Exception ex) { throw new Error(ex); } }
以上代码可以看出,CAS操作主要针对5个属性,包括AQS三个属性 status、head、tail,以及node对象的两个属性waitStatus、next。
cas 操作
/**
* CAS head field. Used only by enq.
*/
private final boolean compareAndSetHead(Node update) {
return unsafe.compareAndSwapObject(this, headOffset, null,
update); }
/**
* CAS tail field. Used only by enq.
*/
private final boolean compareAndSetTail(Node expect, Node update)
{
return unsafe.compareAndSwapObject(this, tailOffset, expect,
update); }
/**
* CAS waitStatus field of a node.
*/
private static final boolean compareAndSetWaitStatus(Node node,
int expect,
int update) {
return unsafe.compareAndSwapInt(node, waitStatusOffset,
expect, update); }
/**
* CAS next field of a node.
*/
private static final boolean compareAndSetNext(Node node,
Node expect,
Node update) {
return unsafe.compareAndSwapObject(node, nextOffset, expect,
update); }
AQS 核⼼属性
1)锁相关的属性有两个:
/**
* The synchronization state.
*/
private volatile int state; // 锁的状态
/**
* The current owner of exclusive mode synchronization.
*/
private transient Thread exclusiveOwnerThread; // 当前持有锁的线程,
这个属性继承 AbstractOwnableSynchronizer
2) sync queue相关属性, 两个
private transient volatile Node head; // 队头 为dummy node
private transient volatile Node tail; // 队尾 新⼊队的节点
3)队列中的Node中需要关注属性有三组:
volatile Thread thread;// 节点所代表线程
// 双向链表,每个节点需要保存⾃⼰的前驱节点和后继节点的s引⽤
volatile Node prev;
volatile Node next;
// 线程所处的等待锁的状态,初始化时,值为0
volatile int waitStatus;
/** waitStatus value to indicate thread has cancelled */
static final int CANCELLED = 1;
/** waitStatus value to indicate successor's thread needs
unparking */
static final int SIGNAL = -1;
/** waitStatus value to indicate thread is waiting on condition */
static final int CONDITION = -2;
获取锁
ReentrantLock lock = new ReentrantLock();
lock.lock();//3
获取 ⾮公平锁
static final class NonfairSync extends Sync {
private static final long serialVersionUID =
7316153563782823691L;
/**
* Performs lock. Try immediate barge, backing up to normal
* acquire on failure.
*/
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
代码说明:
如果cas状态status 0->1 成功,则设置当前持有锁的线程。
如果cas失败,则
*/
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt(); }
代码说明:如果获取锁失败,添加Node到sync queue 队列
tryAcquire 由⼦类实现,获取锁的 具体逻辑
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
代码说明:
获取当前锁状态
若锁状态=0
CAS锁状态status 0-> 参数acquires = 1,若true,则设置当前持有锁线程,返回true。
若锁 状态 != 0
判断 当前线程 == 当前持有锁线哼,若true,则 判断最新锁状态值 = 锁状态 + 参 数acquires 值与0的⼤⼩,若 ⼩于0 ,则抛出错误 “超过最⼤锁计数”。
设置新锁状态值 返回true
其他返回false,即 没有获取到锁
addWaiter(Node node)
该⽅法由AQS实现,负责在获取锁失败后调⽤,将当前请求锁的线程包装成Node扔 到sync queue 中,并返回Node
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {// 如果队列不为空,则⽤CAS⽅式将当前节点设为尾节点
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node; }
代码说明:
创建Node实例(当前线程,mode = Node.EXCLUSIVE )
即newNode;
获取队列最后⼀个 pred
如果队列最后 ⼀个不为空,则 newNode的 前驱节点为pred,cas队列最后Node,成功后pred 的后驱节点为newNode,即 ⽤CAS⽅式将当前节点设为尾节点,返回 newNode
pred == null 或者 casTail 失败后,enq(node) 将节点插⼊队列,返回节点newNode可⻅ 每⼀个处于独占模式下的节点,它的nextWaiter ⼀定是null
enq
能执⾏这个⽅法,说明当前线程获取锁已经失败了,封装成Node 放进等待队列中,
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head; // 赋值后没有返回
} else { // 尾分叉 到这⾥说明队列已经不为空,再继续尝试将节点加到队
尾
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
代码说明:
for(;;) 是死循环
获取队尾节点
若为空,则CAS设置head,哑节点没有thread属性赋值,head节点赋值给尾节点,并没有⽴即返回
尾分叉
若不为空,参数node节点的前驱指向尾节点, 将参数node节点cas设置尾节点 直到成功, 原尾节点的后继指向参数节点node
当⼤量线程同时⼊队,同⼀时刻,只有⼀个线程完成,所以出现尾分叉将⼀个节点添加到sync queue的末尾也需要 三步:
1 node的前驱节点为当前的尾节点:node.prev = t
2 修改tail属性,使它指向当前节点
3 修改原来的尾节点,使它的next指向当前节点
因为这三部不是原⼦操作,第⼀步容易成功,第⼆步由⼀个cas操作,并发条件下会失败,很有可能会出现完成第⼆步 没有完成第三步时 即 将新的节点设置成尾节点,此时原来旧的尾节点的next值可能还是null。如果此时有线程恰巧从头节点开始向后遍历链表,则遍历不到新加尾节点。从尾节点遍历有值。
⾄于“分叉”⼊队失败的节点,继续尝试CAS操作,最终都会通过⾃旋尝试⼊队
addWaiter总结
addWaiter(Node.EXCLUSIVE)⽅法最终返回了代表当前线程的Node节点,在返回的那⼀刻,这个节点必然是当时sync queue的尾节点
acquireQueued
1)能执⾏到该⽅法,说明addWaiter⽅法已经成功将封装了当前的Thread节点添加到等待队列的队尾
2)该⽅法中再次尝试获取锁
3)在再次尝试获取锁失败后,判断是否需要把当前线程挂起
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
代码说明:
死循环 for(;;)
获取队列的当前驱节点。
如果 前驱节点是head 且 尝试获取锁 ,若成功,则 设置head 将当前 赋值给head,线程置为空,节点的前驱置为null,节点的后继也置为null,返回中断表示false。
为什么前⾯获取锁失败,这⾥还要再次尝试获取锁?
再次尝试获取锁是基于⼀定条件:当前节点的前驱节点是HEAD节点。head 节点是个哑节点,不代表任何线程,或者代表持有锁的线程,如果当前节点的前驱节点是head节点,说明当亲节点已经排在整个队列最前⾯
shouldParkAfterFailedAcquire
该⽅法⽤于决定在获取锁失败后,
是否将线程挂起判断依据是前驱节点的waitStatus值
waitStatus值的状态:
static final int CANCELLED = 1;
static final int SIGNAL = -1;
static final int CONDITION = -2;
static final int PROPAGATE = -3;
⼀共初始化值是0,不属于上⾯任何⼀种状态。
CANCEL四种状态,独占锁的获取操作,⽤到CANCELLED 和 SIGNAL。LED 状态 ,表示Noded 的当前线程已经取消了排队,即放弃获取锁。
SIGNAL 状态 不是表征当前节点状态,⽽是当前节点的下⼀个节点的状态。当⼀个节点的waitStatus被置为SIGNAL, 说明下⼀个节点(即后继节点) 已经被挂起了(或者),因此当前节点释放或者放弃取锁时,如果它的waitStatus属性为SIGNAL,还有完成⼀个额外操作-唤起 它的后继节点。signal这个状态的设置常常不是 节点⾃⼰给⾃⼰设的。是后继节点设置的。
private static boolean shouldParkAfterFailedAcquire(Node pred,Node node) { int ws = pred.waitStatus; if (ws == Node.SIGNAL) /* * This node has already set status asking a release * to signal it, so it can safely park. */ return true; if (ws > 0) { /* * Predecessor was cancelled. Skip over predecessors and * indicate retry. */ do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { /* * waitStatus must be 0 or PROPAGATE. Indicate that we * need a signal, but don't park yet. Caller will need to * retry to make sure it cannot acquire before parking. */ compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false;}
代码说明:
获取 前驱节点的waitStatus的状态 ws;
若ws等于Node.SIGNAL 即 前驱节点的状态已经是SIGNAL,说明闹钟已经没了,可以直接睡了返回。
若ws⼤于0, 即状态为CANCELLED,说明前驱节点已经取消了等待锁(由于超时或者中断等原因)
既然前驱节点不等了,继续往前找,直到找到⼀个还在等待锁的节点。
然后我们跨过这些不等待的节点,直接排到等待锁的节点后⾯
else 前驱节点状态不是SIGNAL,也不是CANCELLED,⽤CAS设置前驱节点的ws为Node.SIGNAL, 给⾃⼰设定⼀个闹钟
parkAndCheckInterrupt
将线程挂起,等待被唤醒
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);//线程被挂起,停下不往下执⾏
return Thread.interrupted(); }
注意:LockSupport.park 执⾏完成后线程就被挂起,除⾮其他线程unpark了当前线程,或者当前线程被中断,,否则代码是不会往下执⾏的。
总结:
AQS 中⽤statuss 属性表示锁,如果能成功将state属性通过CAS操作从0->1 获取锁获取锁的线程才能将exclusiveOwnerThreads 设置⾃⼰
addWaiter负责将当前等待锁的线程包装 成Node,并成功地添加到队尾,这⼀点由它调⽤的enq⽅法保证,enq⽅法同时还负责在队列
acquireQueued⽅法⽤于Node成功⼊队,继续尝试获取锁(取决于Node的前驱节点是不是head),或者将线程挂起
shouldParkAfterFailedAcquire ⽅法⽤于保证当前线程的前驱节点的waitStatus属性为SIGNAL,从⽽保证⾃⼰挂起后,前驱节点会负责在合适的时候唤醒⾃⼰
parkAndCheckInterrupt⽅法⽤于挂起当前线程,并检查中断状态如果最终成功获取锁,线程会从lock() ⽅法返回,继续往下执⾏,否则 线程会阻塞等待
往期推荐
END
以上是关于AQS之lock源码分析的主要内容,如果未能解决你的问题,请参考以下文章
Java高并发编程实战6,通过AQS源码分析lock()锁机制
Java高并发编程实战6,通过AQS源码分析lock()锁机制