ReentrantLock以及 Condition深度解析
Posted 小猪快跑22
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了ReentrantLock以及 Condition深度解析相关的知识,希望对你有一定的参考价值。
前言
之前写过关于AQS的文章,也讲了关于 ReetrantLock
的源码分析。这里主要是想结合源码分析下Condition
的 await 和 signal ,写ReentrantLock那么还是得提下 AQS 的。
一、关于AQS 中几个重要的属性
- AQS 中维护了一个很重要的变量 state, 它是int型的,表示加锁的状态,初始状态值为0;
- 另外 AQS 还维护了一个很重要的变量exclusiveOwnerThread,它表示的是获得锁的线程,也叫独占线程,ReentrantLock中的可重入就是用到该属性,当 state != 0 但是 exclusiveOwnerThread == 当前线程,那么就可以不需要再次走获取锁的逻辑即可重入。
- AQS中还有一个用来存储获取锁失败线程的队列,它是一个FIFO的双链表结构,以及 head 和 tail 结点。
- 双链表的结点 Node 包含的主要属性如下:
Node
: SHARED 和 EXCLUSIVE : 用于标识该结点是共享模式还是独占模式
int : waitStatus
: 标识该结点的等待状态,有如下几种值:
CANCELLED:值为1,表示该结点被取消了,比如使用 tryLock(1000, TimeUnit.MILLISECONDS) 获取锁超时就会被取消
SIGNAL: 值为-1,表示此结点的后继结点的线程已经通过LockSupport.park()挂起了,需要前面的结点在释放锁或者取消的时候来唤醒该线程
CONDITION: 值为-2,表示该结点在condition队列中,一般是通过 condition.await() 操作,然后该结点插入 condition 的尾部
PROPAGATE: 值为-3,共享模式的头结点可能处于此状态,表示往下传播。假设有2个线程同时释放锁,通过竞争,其中一条负责唤醒后继结点,而另一条则将头结点设置为此状态,新结点唤醒后,直接根据头结点的此状态来唤醒 下下个结点。
注意,当 new Node()
的时候 waitStatus 默认等于0。
二、ReetrantLock 的结构
- ReetrantLock 包含公平锁和非公平锁,默认的是非公平锁。
- ReetrantLock 包含一个抽象的内部类: Sync,Sync 继承了 AQS,如下:
abstract static class Sync extends AbstractQueuedSynchronizer
- ReetrantLock 中的 公平和非公平锁分别继承自 Sync
三、ReetrantLock 的获取和释放锁的分析
这里就以默认的非公平锁的源码来分析,版本为 jdk1.8
。
ReentrantLock lock = new ReentrantLock()
try {
lock.lock()
...
} finally {
lock.unLock()
}
和 synchronized
不一样,ReentrantLock
是不会主动释放锁的,所以需要在 finally
块中主动调用 unLock
释放。
这里假设有A,B,C,D 4个线程同时调用 lock 获取锁,且每个线程获取锁之后执行的任务都非常耗时。
1. lock.lock()
- 会调用
ReentrantLock
的 lock 方法,如下:
public void lock() {
sync.lock();
}
由于默认的是非公平锁,所以 调用的是 NonfairSync
的 lock 方法,如下:
final void lock() {
// state 的默认值是0,所以第一个线程来获取锁的时候 cas 操作返回true
// 线程释放锁的时候会把 state 再次设置为 0
if (compareAndSetState(0, 1)) // 注释【1】
// 把获取锁的线程设置为独占线程
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1); // 注释【2】
}
由于线程 A
是第一个请求锁的,所以通过 CAS 把 AQS 的 state
值设置为1,表示 线程A已经获取了锁, 然后
此时,线程B
再来获取锁的时候 走到注释【1】
,此时state的值为1,所以cas 操作会失败,接着走到 注释【2】
,acquire
是 AQS 里面的方法,如下:
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
上面的方法包含了3个重要的步骤:
tryAcquire
: 再次尝试去获取锁,该方法在是在 ReentrantLock 的内部类NonfairSync
中。addWaiter
:把当前线程转化为Node结点,且插入等待队列的末尾,如果是第一个插入等待队列的结点,还需要 new 一个空的结点作为head
结点。acquireQueued
:这个方法 判断 node 结点的前驱结点是否是head
结点,如果是,那么再次尝试去获取锁,如果获取锁成功,那么设置当前的 node 为head
结点;如果获取锁失败,那么会阻塞(挂起)当前线程。
tryAcquire
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
// 表示之前获取锁的线程已经释放了锁
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
// 表示之前获取锁的线程未释放锁,然后判断当前线程是否就是之前获取锁的线程
// 如果是,那么state + 1,这里就是锁的重入了
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
tryAcquire
获取锁成功则返回 true,否则返回 false 。下面看看addWaiter(Node.EXCLUSIVE)
, Node.EXCLUSIVE 前面也说过,指的是独占锁,代码如下:
private Node addWaiter(Node mode) {
// new Node ,Node 的thread 赋值为当前线程
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
// 第一个获取锁失败的线程走到这里时,head 和 tail 都等于null
if (pred != null) {
// 不等于null,说明等待队列里面有等待的线程了,
// 下面的操作就是把 node 插入到队列的而尾部然后返回 node
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
// 走到这里,说明等待队列为空
enq(node);
return node;
}
由于 线程B
是第一个未获取到锁的线程,此时 head
和 tail
都是 null,所以走到 enq(node)
:
private Node enq(final Node node) {
for (;;) {
Node t = tail; //注释【1】
if (t == null) { // 注释【2】
if (compareAndSetHead(new Node())) //注释【3】
tail = head; //注释【4】
} else {
node.prev = t; //注释【5】
if (compareAndSetTail(t, node)) { //注释【6】
t.next = node; //注释【7】
return t; //注释【8】
}
}
}
}
这里画图来展示吧:
注释【1】:由于此时 tail == null ,所以注释【2】成立;
注释【3】:new 一个空的结点,然后通过 cas 操作把这个空结点设置为 head,此时,head 结点的 waitStatus 值为0(默认值)
注释【4】:让 tail 指向 head,执行完 的效果如下:
此时 head 和 tail 都指向的是一个空结点,里面对应的线程为 null,且 waitStatus = 0
;
for 循环会再次走到 注释【1】
,t 赋值为 tail,所以注释【2】
条件不成立,走到 注释【5】
,即 线程B 的prev
指向t
也就是指向 tail
,如下图:
接着走到注释【6】
通过cas 操作让 tail 指向 线程B对应的 node 结点。
注释【7】的作用就是 让 t
的 next
指针指向 结点 node,然后返回 node 结点,如下:
到此,addWaiter(Node mode)
分析完成了,下面就说说acquireQueued(final Node node, int arg)
,代码如下:
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor(); // 注释【1】
if (p == head && tryAcquire(arg)) { // 注释【2】
setHead(node); // 注释【3】
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())// 注释【4】
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
注意,这里面也是一个无限循环操作,此时线程B已经在等待队列里面了:
注释【1】:此时 node 结点的前驱结点是 head
,所以执行 tryAcquire
,由于线程A执行耗时的操作,暂时不会释放锁,所以 注释【2】的 tryAcquire
条件不满足。走到注释【4】。
注释【4】:包含 shouldParkAfterFailedAcquire
和 parkAndCheckInterrupt
我们逐一分析,先看 shouldParkAfterFailedAcquire
,方法如下:
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL) // 注释【1】
/***
* 表示已经设置为SIGNAL,那么在这个pred结点释放锁的时候
* 能够唤醒pred的后继结点对应的线程
*/
return true;
if (ws > 0) {
// 表示前驱结点被取消了,那么从pred结点开始往前,
// 从等待队列中删除所有已经取消的结点
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else { // 注释【2】
// 通过 cas 把pred结点的waitStatus 设置为 SIGNAL
// 只有设置为SIGNAL后,才能唤醒后续结点
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
接着来, 线程B
对应结点的 前驱结点是 head
结点,前面说过了,head 结点的 waitStatus 的值为0,所有会走到 注释【2】,设置 head 的 waitStatus = -1(SIGNAL = -1),返回 false,返回到 acquireQueued
方法中,继续执行 for 循环,由于线程A 依然没有释放锁,还是走到 shouldParkAfterFailedAcquire方法,此时 pred 的waitStatus = SIGNAL,所已返回true,接着执行 acquireQueued
中的 parkAndCheckInterrupt
方法了,如下:
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
这个方法很简单,调用LockSupport.park阻塞 线程B
。
以上,执行完成后,head 的 waitStatus 的值为 -1。如下图:
到此,线程B 的 获取锁的操作执行完毕,被阻塞了,现在轮到 线程C
来请求锁,流程是一样的,还是最终走到这里:
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
由于线程A未释放锁,所以 线程C 尝试获取锁tryAcquire
失败,接着走 addWaiter
,此时就是将 线程C 对应的结点插入到 等待队列的尾部,插入后如下图:
然后 走到 acquireQueued
,前面分析过过代码,这里就简单说下结果。这里会先判断 线程C
结点的前驱结点是不是 head
结点,显然不是,线程C
结点的前驱结点是 线程B
对应的结点,所以会走到 shouldParkAfterFailedAcquire(pred, node)
,同样经过2次循环把 线程B
对应的结点的waitStatus值设置为1,然后阻塞线程B
。如下图所示:
同样,线程D的操作和线程C的操作是一样的,执行完获取锁的流程后如下图:
从上面的分析可知,ReentrantLock 所形成的等待队列非 tail
结点的 waitStatus = -1;
ReentrantLock 的 lock
方法总结:
- 当一个线程获取锁后,会将 state 置为1,然后 设置
exclusiveOwnerThread
为当前线程 - 如果当前线程再去获取锁的话,会把 state 加1 ,即锁重入
- 其他线程再去获取锁,如果锁未被其他线程释放,那么会把此线程对应的结点插入等待队列的末尾,然后判断此线程对应结点的前驱结点是否是
head
结点,如果是 head 结点,那么会尝试获取锁,获取到锁就直接return,未获取到锁,会把前驱结点的waitStatus
值设置为SIGNAL(-1),然后阻塞此线程;如果不是head结点,同样的逻辑 ,把前驱结点的waitStatus
值设置为SIGNAL(-1),然后阻塞此线程,对应如下的流程图:
以上是关于ReentrantLock以及 Condition深度解析的主要内容,如果未能解决你的问题,请参考以下文章
ReentrantLock与Condition构造有界缓存队列与数据栈
从使用角度看 ReentrantLock 和 Condition