多图预警带你了解ReentrantLock底层执行原理揭开AQS的神秘面纱
Posted java叶新东老师
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了多图预警带你了解ReentrantLock底层执行原理揭开AQS的神秘面纱相关的知识,希望对你有一定的参考价值。
什么是AQS
AQS全名为AbstractQueuedSynchronizer
,是JDK1.5之后并发包java.util.concurrent
(简称JUC)里面的一个抽象类类,这是一个在并发编程很常用的工具类,看名字就知道,这是一个队列,并且是线程安全的队列,比较特别的是,在操作数据的时候,是使用CAS(Compare And Swap)来保证原子性的,而不是大家熟知的synchronized;使用这个AQS可以实现ReentrantLock、CountDownLatch(倒计时门栓)、Semaphore(信号量)、ReentrantReadWriteLock(读写锁)等一些列的并发辅助工具;
我们看看它是如何实现这些工具类的,很显然,它们都在里面维护了一个内部抽象类Sync,由Sync实现AQS
ReentrantLock 【独占锁】
CountDownLatch 【倒计时门栓】
Semaphore 【信号量】
ReentrantReadWriteLock 【读写锁】
AQS 的内部结构
AQS常用方法
- isHeldExclusively():该线程是否正在独占资源。只有用到condition才需要去实现它。
- tryAcquire(int):独占方式。尝试获取资源,成功则返回true,失败则返回false。
- tryRelease(int):独占方式。尝试释放资源,成功则返回true,失败则返回false。
- tryAcquireShared(int):共享方式(读写锁会用到)。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。
- tryReleaseShared(int):共享方式(读写锁会用到)。尝试释放资源,如果释放后允许唤醒后续等待结点返回true,否则返回false。
如果你有兴趣,愿意折腾,那你完全可以继承AQS然后重写以上的方法来实现自己的锁机制;
·staste 锁标志位
AQS 内部维护了一个state
字段,主要用来标志是否已上锁,默认值是0,表示未上锁,一旦有线程获得锁,则会将state
字段置为1,在ReentrantLock中,当有重入锁重复获得锁时会在原来的基础上在 加1,解锁则进行减 1,减至 0时代表锁为空闲状态,即没有线程获取锁,是自由状态的;并且该字段被volatile
关键字修饰,在各个线程中保证了可见性以及防止指令重排序;
/**
* The synchronization state.
*/
private volatile int state;
·CLH队列
在AQS内部还维护了一个CLH队列,在源码中是这样介绍的:
The wait queue is a variant of a “CLH” (Craig, Landin, and
* Hagersten) lock queue. CLH locks are normally used for
* spinlocks.
意思是这个队列由 Craig,、Landin 和 Hagersten 三个人发明,所以取每个人名字的头字母组成,最后一句话的意思是说:这个CLH队列通常用来做自旋操作;源码里面还画出了一个粗略的队列图
但其实,这个CLH要比源码图上复杂的多,这个队列是一个双向链表,也是一个双端队列,
- 双向队列 : 除了元素本身外,还维护了2个指针,一个(prev)指向上一个元素,另一个(next)指向下一个元素
- 双端队列:单端队列只有头部,如果你要找某个元素的话,就必须从头部开始,一个个节点开始往下找;双端队列既有头也有尾,你可以选择从头部开始往下找元素,也可以从尾部开始往上找元素;
在源码中,不管队头还是队尾,都是一个Node节点
/**
* Head of the wait queue, lazily initialized. Except for
* initialization, it is modified only via method setHead. Note:
* If head exists, its waitStatus is guaranteed not to be
* CANCELLED.
*/
private transient volatile Node head;
/**
* Tail of the wait queue, lazily initialized. Modified only via
* method enq to add new wait node.
*/
private transient volatile Node tail;
在来看看Node节点这个类里面都有哪些玩意,以及这些玩意都是干啥的~
static final class Node {
static final AbstractQueuedSynchronizer.Node SHARED = new AbstractQueuedSynchronizer.Node();
static final AbstractQueuedSynchronizer.Node EXCLUSIVE = null;
static final int CANCELLED = 1;
static final int SIGNAL = -1;
static final int CONDITION = -2;
static final int PROPAGATE = -3;
volatile int waitStatus;
volatile AbstractQueuedSynchronizer.Node prev;
volatile AbstractQueuedSynchronizer.Node next;
volatile Thread thread;
AbstractQueuedSynchronizer.Node nextWaiter;
}
依次介绍下Node节点中的属性都代表什么意思,有什么作用
- SHARED : 共享模式节点
- EXCLUSIVE:独占模式节点
- prev: 前置节点,这是一个指针,指向队列中上一个元素
- next:后继节点,这也是一个指针,指向队列中的下一个元素
- thread :每个线程在进入队列时都会封装成一个Node节点,而thread变量就是用来存储这个线程的指针;
- waitStatus : 等待状态,下面说明
以下是waitStatus的状态值
- CANCELLED:(waitStatus的状态)由于超时或中断,此节点被取消。节点一旦被取消了就不会再改变状态。特别是,取消节点的线程不会再阻塞。一直循环获取锁
- SIGNAL :表示这个节点已挂起在等待唤醒。,就是告诉上一个节点,执行完时间片后通知一下自己;此节点后面的节点已(或即将)被阻止(通过park),因此当前节点在释放或取消时必须断开后面的节点,为了避免竞争,acquire方法时前面的节点必须是SIGNAL状态,然后重试原子acquire,最后在失败时调用patk()方法阻塞
- CONDITION :等待条件状态,在等待队列中, 此节点当前在条件队列中。标记为CONDITION的节点会被移动到一个特殊的条件等待队列(此时状态将设置为0),直到条件时才会被重新移动到同步等待队列 。(此处使用此值与字段的其他用途无关,但简化了机制。)
- PROPAGATE:传播, 状态需要向后传播,应将releaseShared传播到其他节点。这是在doReleaseShared中设置的(仅适用于头部节点),以确保传播继续,即使此后有其他操作介入。
CLH队列的真正面目
通过以上的介绍,如果要准确地描述出CLH队列,那么,AQS的CLH队列应该是下图的样子,其中T1
、T2
、T3
就是thread
变量指向每个不同的线程
通过 ReentrantLock 解读AQS
我们都知道 ReentrantLock 是一个独占锁,那么当我们调用lock() 方法方法的时候,它里面都走了哪些事情呢?公平锁和非公平锁的区别又在哪里呢?其实还要分上锁的次数,首次上锁、第二次上锁、第三次上锁走的流程都不一样,那么接下来我们就来由浅入深地慢慢揭开它神秘的面纱!
温馨提示
流程图中都会显示使用到哪些方法,每一个方法在后面会有解释,所以大家看文章的时候不要着急,除了文字之外,我都会尽量用流程图来说明清除
非公平锁
非公平锁就像它的字面意思那样,不公平的嘛,在这个模式下,每个线程就像是强盗一样,同时去抢一把锁,谁抢到就是谁的。官方的解释是每个线程以抢占的方式进行获取锁资源,多个线程同时争抢的情况下只会有一个线程获得锁,当获取锁的线程执行完业务代码后会释放锁,并且线程会引起下一轮争抢,依次循环往复;直到队列 中的线程都执行完为止;
接下来我们运行以下代码,看看它上锁的过程中都经过了哪些流程
package com.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class LockTest extends Thread{
private ReentrantLock lock = null;
// 延时的时间,单位:ms
private long time ;
// 构造方法
public LockTest(ReentrantLock lock,long time) {
this.time = time;
this.lock = lock;
}
@Override
public void run() {
lock.lock(); // 上锁
try {
// 开始延时,延时代表着在执行业务逻辑所花费的时间
if(time > 0) Thread.sleep(time);
} catch (InterruptedException e) {
e.printStackTrace();
}
lock.unlock(); // 解锁
}
}
class MainClass{
public static void main(String[] args) {
// 实例化锁,构造函数的参数,默认为true:独占锁,
ReentrantLock lock = new ReentrantLock();
// 创建3个线程来争抢锁
//第一个线程,进入后延时2000秒,方便我们debug
LockTest lockTest = new LockTest(lock,2000000);
lockTest.setName("线程AA");
lockTest.start();
// 因为第一个线程独占,所以第二个会进入队列
LockTest lockTest1 = new LockTest(lock,0);
lockTest1.setName("线程BB");
lockTest1.start();
// 第三个线程也会进入队列
LockTest lockTest2 = new LockTest(lock,0);
lockTest2.setName("线程cc");
lockTest2.start();
}
}
非公平锁–第一个线程上锁过程(首次上锁)
首先,第一次上锁的时候跟队列没有任何关系,因为你第一个线程是没有人跟你抢的,轻而易举的就拿到了这把锁,然后执行业务代码,需要注意的是,此时CLH队列还没有形成,因为只有一个线程的情况下不需要队列;
非公平锁–第二个线程抢锁
为什么第一次就叫上锁,第二次就叫抢锁呢?因为啊,第一个线程每人跟它抢,直接拿就行;粗略地看,第二次抢锁过程是这样的
非公平锁会进行2次抢锁,可以说是非常简单粗暴,一上来先抢一次,抢不到在判断锁资源是否空闲,如果空闲的话在抢一次,最后抢不到才会进入队列并阻塞当前线程;使其不在自旋;注意,严格意义上说ReentrantLock不是自旋锁,它属于自适应自旋 ,不会无限自旋下去,因为自旋也是需要消耗CPU资源的,
上面的图只是一个大概流程图,因为我要由浅入深嘛,如果一下子讲的太深了,会让人觉得很复杂,接下来我们看看详细的流程
说明
· 粉红色部分表示抢锁失败的流程走向
· 绿色部分表示抢锁成功的流程走向
· 黄色部分表示注释的一些细节
· 紫色部分表示正在使用CAS尝试上锁
看到这张图有没有菊花一紧的感觉? 没错,确实很复杂,这个流程图是我一点点手动debug后画出来的;里面牵扯到的东西很绕,特别是循环这一块;当然我们只需要记住上面那个简化的流程图就好,关于细节方面的东西,你必须自己去调试过才能知道它里面流程的具体走向;但第二个线程进行抢锁失败后,会进入队列,此时,队列结构如下
有细心的童鞋会发现,这里怎么会有个空节点呢? 其实这个节点是用来占位的,给谁占位的呢?当然是给T1线程占位的啦,因为我们刚刚的线程延时了两千秒啊,足足半个小多小时呢,T1线程还在执行中,这个空节点就是T1的;
那么这时候问题又来了,它为什么要弄个空节点来占位啊? 直接把空节点删掉,把T2放进head节点里面不就行了吗?
答:其实是因为我们要用到节点里面的waitStatus
属性,就是等待状态,这个状态决定了我们要不要唤醒下一个线程,在独占模式下,这个状态只会是0
或者-1
,所以不管怎么样,它都会唤醒队列中的下一个线程;感兴趣的童鞋可以回到上面介绍waitStatus
属性值在看一遍,这里不过多介绍;
非公平锁–第三个线程抢锁
第三次抢锁时可参照上面的比较流程图做推理,主要区别就是初始化队列那一块,
- 第二个线程抢锁时,因为队列还没初始化,所以抢锁失败时会先去初始化这个队列,然后在把头尾节点指向对应的内存空间,
- 第三个线程抢锁时,因为队列已经创建好了,所以直接在队列尾部插入第三个线程的节点;
所以当第三个线程进入队列后,结构是这样的
非公平锁的加锁过程就已经讲完了,因为公平锁和非公平锁的解锁过程是一样的,所以解锁这一块单独放到最后面去介绍;接下来我们先看看公平锁的加锁流程
公平锁
公平锁其实很简单嘛,就像去银行办业务一样,每个人必须先拿号,然后在人群中排队,一直等叫到自己号的时候,就可以进去窗口办理业务了;
运行以下的代码
package com.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class LockTest extends Thread{
private ReentrantLock lock = null;
// 延时的时间,单位:ms
private long time ;
// 构造方法
public LockTest(ReentrantLock lock,long time) {
this.time = time;
this.lock = lock;
}
@Override
public void run() {
lock.lock(); // 上锁
try {
// 开始延时,延时代表着在执行业务逻辑所花费的时间
if(time > 0) Thread.sleep(time);
} catch (InterruptedException e) {
e.printStackTrace();
}
lock.unlock(); // 解锁
}
}
class MainClass{
public static void main(String[] args) {
// 实例化锁,构造函数的参数,默认为true:独占锁,,这里我们换成false:公平锁
ReentrantLock lock = new ReentrantLock(false);
// 创建3个线程来争抢锁
//第一个线程,进入后延时2000秒,方便我们debug
LockTest lockTest = new LockTest(lock,2000000);
lockTest.setName("线程AA");
lockTest.start();
// 因为第一个线程独占,所以第二个会进入队列
LockTest lockTest1 = new LockTest(lock,0);
lockTest1.setName("线程BB");
lockTest1.start();
// 第三个线程也会进入队列
LockTest lockTest2 = new LockTest(lock,0);
lockTest2.setName("线程cc");
lockTest2.start();
}
}
公平锁–第一次上锁
和非公平锁一样,第一次上锁时跟队列没什么关系,但是公平锁和非公平锁的第一次上锁还是有区别的,具体哪些区别我们先看流程图
通过流程图可以看到,公平锁在上锁前会先去队列里面看看有没有节点,因为公平锁是公平的,队列里面的节点要比当前线程优先,所以它必须要先把队列里面的线程执行完才能执行当前线程;因为现在是第一次上锁,队列里面肯定是没有其他线程,而且队列都还没创建,所以这个线程肯定能拿到锁;
公平锁–第二次上锁
以后不管是第二次上锁还是第三次、第四次还是第N次,除非锁已被释放,不然所有的线程都会在后面排队,因为它必须符合公平的概念:先到先得;
解锁
在解锁的层面,公平锁和非公平锁是一样的,因为队列里面就已经保证了顺序,所以,在解锁的时候,除了释放锁之外,还要将队列中的线程唤醒,当然了,唤醒也是按顺序来的,有人就会说了,不对啊,你公平锁按顺序来唤醒没问题,但是非公平锁也是这么干的吗?我想告诉你,还真是这么干的,因为非公平锁,在上锁的时候就已经抢过锁资源了,已经达到了非公平的概念,所以,这边唤醒的时候就是公平的了!接下来我们看看解锁流程
相对于加锁,解锁要简单地多,流程也不复杂,代码的可读性也高,上面这张图,是我经过精细调整过的,只要你认识字,就一定能看懂;
源码解析
非公平锁的 lock()
方法
/**
* Performs lock. Try immediate barge, backing up to normal
* acquire on failure.
*/
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
非公平的简单粗暴就是在这里,一上来不管三七二十一,先抢一次,抢到了,直接执行,抢不到在执行acquire(1)
方法;
compareAndSetState(0, 1)
表示使用cas抢锁,内部逻辑是判断state
是否为0,如果是0的话,就将其改为1 ,state =0
表示锁是空闲状态,state =1
表示锁已被线程独占;能进入到setExclusiveOwnerThread(Thread.currentThread());
方法就表示cas已经抢到锁了, 接下来就是设置标志位,方法内容如下
protected final void setExclusiveOwnerThread(Thread thread) {
exclusiveOwnerThread = thread;
}
可以看到里面非常简单, 就是设置一下属性,将获取锁的线程指向当前线程,
综上所述,lock()
方法的流程走向如下
公共 acquire(int arg) 方法
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
acquire()
方法是AQS的方法, 在if判断语句里面,它先调用了tryAcquire(arg)
方法, 这个方法是用来抢锁的,但是在非公平锁里面,它不是一定会抢,这得看条件,里面的方法我们一会在说;需要注意的,可以看到tryAcquire(arg)
方法的前面有个感叹号!
,这意思是说,如果抢不到的话,我就执行 acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
方法,这个方法主要是做入队和阻塞的,参数中还有个addWaiter
方法,我们上面讲到的空节点,就是在这个方法里面创建的;下面会细讲里面的流程,综上所述,acquire()
方法里面的流程走向如下
非公平锁 tryAcquire() 方法
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
tryAcquire()
中间做了一层转换,其实调用的是nonfairTryAcquire()
方法
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
进入方法后,先去执行 c = getState()
来获取state
的状态,刚刚我们讲到,0表示空闲,1表示以上锁;如果是空闲状态的话,就在抢一次锁,compareAndSetState(0, acquires)
方法就是用来上锁的,抢不到则判断当前线程是否获得独占锁的线程,如果是,就代表着锁正在重入,重入锁的逻辑就在这里面判断的,重入锁也叫递归锁,感兴趣的可以看我另外一篇文章:原来java有这么多把锁,图解java中的17把锁 ,里面有详细的介绍;综上所述,tryAcquire()
方法流程走向如下
addWaiter(Node mode)方法
/**
* Creates and enqueues node for current thread and given mode.
*
* @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
* @return the new node
*/
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
此方法也是AQS的方法,这个主要的作用就是在队尾追加节点,做的都是一些变量指向工作,先判断tall队尾是否存在,如果存在则将当前节点追加到队尾;没啥好说的,最重要的是enq方法。下一步就做这个方法的解析
enq(node)方法
/**
* Inserts node into queue, initializing if necessary. See picture above.
* @param node the node to insert
* @return node's predecessor
*/
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
进入enq()
后直接就是一个循环, 但是大家不要被它的for (;;)
给蒙蔽了, 虽然看起来像个死循环,但是循环最多只会循环2次;第一次循环执行时,会先判断队列是否存在,但是它判断的是tail
(队尾)是否为空,这是因为队头和队尾是相对应的,队头为空的话,队尾一定是空的,相反,如果队头有值,队尾也一定有值,所以第一个if
里面,如果队列为空,就是把队头给创建出来,这里创建的就是刚刚说的空节点
,仔细看这行代码compareAndSetHead(new Node())
,直接new 一个Node节点,但是不做其他的任何处理,只是一个单纯的空节点,刚刚说过了,这个空节点是给当前获得锁的线程占位的;tail = head
队头和队尾都指向同一块内存,它们俩都是空节点,这时候队列的雏形就形成了;接着往下一个循环,第二次循环执行就是else里面的内容了,因为刚刚第一次已经把队列创建出来了,最关键的就是compareAndSetTail(t, node)
这行代码,它将当前线程塞到队尾,队头还是空节点,执行完就退出循环,此时队列的结构如下
综上所述,enq()
方法的执行流程走向如下
acquireQueued(final Node node, int arg)方法
/**
* Acquires in exclusive uninterruptible mode for thread already in
* queue. Used by condition wait methods as well as acquire.
*
* @param node the node
* @param arg the acquire argument
* @return {@code true} if interrupted while waiting
*/
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} f以上是关于多图预警带你了解ReentrantLock底层执行原理揭开AQS的神秘面纱的主要内容,如果未能解决你的问题,请参考以下文章
听说你在写Python爬虫,你对浏览器的开发者工具了解多少?多图预警