[源码分析]ReentrantLock & AbstractQueuedSynchronizer & Condition
Posted noking
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了[源码分析]ReentrantLock & AbstractQueuedSynchronizer & Condition相关的知识,希望对你有一定的参考价值。
首先声明一点: 我在分析源码的时候, 把jdk源码复制出来进行中文的注释, 有时还进行编译调试什么的, 为了避免和jdk原生的类混淆, 我在类前面加了"My". 比如把ReentrantLock改名为了MyReentrantLock, 在源码分析的章节里, 我基本不会对源码进行修改, 所以请忽视这个"My"即可.
一. 简介
锁是什么? 锁是一种标志, 或者是一种资源, 持有锁的线程才可以继续往下执行相应的内容. 未持有锁的线程需要等待这个锁资源. 直到获取到了这个锁, 才可以继续向下执行.
0. ReentrantLock的一个小demo
想自己运行这段代码的话, 把代码中的"MyReentrantLock" 改为 "ReentrantLock" 即可. (后续的代码也一样, 如果想自己运行, 还编译报错, 请把我修改的代码改回来. 也就是把"My"都去掉就好了)
public class Main { private static MyReentrantLock lock = new MyReentrantLock(); public static void main(String[] args) throws Exception { // 场景如下: 线程1先获得锁, 释放后, 线程2 再获得锁. new Thread(() -> { System.out.println("线程1启动"); lock.lock(); System.out.println("线程1抢到锁"); try { System.out.println("这里是业务逻辑1"); quietSleep(2);// 两秒后释放锁 System.out.println("两秒后"); } finally { lock.unlock(); System.out.println("线程1释放锁"); } }).start(); new Thread(() -> { System.out.println("线程2启动"); quietSleep(1); // 在这里进行谦让. 确保上面的线程能先运行. 也就是让上面的线程先获得锁 lock.lock(); System.out.println("线程2抢到锁"); try { System.out.println("这里是业务逻辑2"); } finally { lock.unlock(); System.out.println("线程2释放锁"); } }).start(); } public static void quietSleep(long sec) { try { Thread.sleep(sec * 1000); } catch (InterruptedException e) { e.printStackTrace(); } } }
输出的结果如下:
1. sync字段
首先来看一下ReentrantLock里唯一的一个字段
Sync继承自AQS(AbstractQueuedSynchronizer, 以下简称AQS) . 公平锁和非公平锁都继承了Sync. Sync是ReentrantLock类里锁的统一声明.
2. lock/unlock依赖Sync
ReentraintLock的 lock()和unlock()方法实际上都是靠Sync来实现的:
3. 锁内部类定义
Sync 和 公平锁 和 非公平锁 都是ReentrantLock的内部类, 类的定义部分如下(细节先隐藏起来了, 后面会讲):
4. ReentrantLock构造器
ReentrantLock有两个构造器.
1. 默认构造器是直接使用了非公平锁. 非公平锁就是不一定按照"先来后到"的顺序来进行争抢.
2. 带参构造器可以传递一个bool类型. true的时候为公平锁. 公平锁就是按照"先来后到"的顺序来进行争抢.
二. 公平锁申请锁
使用锁的第一个步骤, 当然就是先申请锁了, 咱么来分析一下源码, 看看申请锁的流程吧.
1. 公平锁获取锁的流程(单线程, 没有争抢)
首先从最外层的调用lock()方法开始. 咱们在Main方法里写下这两行代码:
MyReentrantLock就是ReentrantLock, 我复制了源代码, 然后改了个名字而已.
Reentraint类的lock()方法最终还是调用的sync.lock()
由于我们现在使用的是公平锁. 所以sync现在是FairSync. 所以sync.lockI()实际上就是FairSync类里的lock()方法
发现lock()调用的是acquire(1)这个方法, 这个方法是在AQS类里实现的.代码如下:
arg当时传进来的是1, 所以首先进行的是tryAcquire(1)来进行"尝试获取锁"的操作. 这时一种乐观的想法.
tryAcquire方法的具体实现在FairSync类里, 具体代码如下:
/** * @return 返回true: 获取到锁; 返回false: 未获取到锁 * 什么时候返回true呢? 1.没有线程在等待锁;2.重入锁,线程本来就持有锁,也就可以理所当然可以直接获取 * @implNote 尝试直接获取锁. */ protected final boolean tryAcquire(int acquires) { // 获取当前线程的引用 final Thread current = Thread.currentThread(); // 当前锁的计数器. 用于计算锁被获取的次数.在重入锁中表示锁重入的次数.由于这个锁是第一次被获取, 所以c==0 int c = getState(); // c==0, 也就是 state == 0 ,重入次数是0, 表示此时没有线程持有锁. if (c == 0) { // 公平锁, 所以要讲究先来后到 // 因为有可能是上一个持有锁的线程刚刚释放锁, 队列里的线程还没来得及争抢, 本线程就乱入了 // 所以每次公平锁抢锁之前, 都要判断一下等待队列里是否有其他线程 if (!hasQueuedPredecessors() && // 执行到这里说明等待队列里没有其他线程在等待. // 如果没有线程在等待,那就用CAS尝试一下,成功了就获取到锁了, // 不成功的话,只能说明一个问题,就在刚刚几乎同一时刻有个线程抢先了 =_= compareAndSetState(0, acquires)) { // 到这里就获取到锁了,标记一下,告诉大家,现在是我(当前线程)占用了锁 setExclusiveOwnerThread(current); // 成功获取锁了, 所以返回true return true; } //-- 由于现在模拟的是单纯地获取一次锁, 没有重入和争抢的情况, 所以执行不到这里, 上面的cas肯定会成功, 然后返回true } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }
争抢完锁之后会返回true, 然后回到上层方法acquire :
if语句里 && 前面是false, 不会继续往下执行了. 当前线程获取到了锁, 而且执行了所有该执行的内容, 就完事儿了.
2. 公平锁进行重入的流程
重入就是一个线程获取到了锁, 然后这个线程又一次申请(进入)了这个锁.
重入用synchronized来举例就是这样:
用ReentrantLock来举例子就是这样:
同一个线程(main线程) 首先进行了lock.lock()申请并占有了锁, 随后又执行了一次lock.lock(). 还没释放锁的情况下, 又一次申请锁. 这样就是重入了.
上面一小节已经分析了第一行的lock.lock()是如何获取到锁的, 所以我们只分析 重入的部分, 也就是后面那句lock.lock()的执行流程.
前面的执行过程一直是一模一样的, 直到这里:
/** * @return 返回true: 获取到锁; 返回false: 未获取到锁 * 什么时候返回true呢? 1.没有线程在等待锁;2.重入锁,线程本来就持有锁,也就可以理所当然可以直接获取 * @implNote 尝试直接获取锁. */ protected final boolean tryAcquire(int acquires) { // 获取当前线程的引用 final Thread current = Thread.currentThread(); // 当前锁的计数器. 由于前面的那句lock已经获取到锁了, 所以这里是status==1, 也就是 c==1 int c = getState(); // c==1, 表示当前有线程持有锁, 所以这段if是进不去了 if (c == 0) { if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } // 由于 c==1 , 无法进入if语句, 所以来看看满不满足这里的 else if // 这个锁被人占了, 但还是不死心, 于是看一下是不是当前线程自己占的这个锁. // (人家女生说有喜欢的人, 为什么不问问是不是自己呢 = =.) // 由于是同一个线程, 所以就是自己啦! 所以会进入这个else if分支, } else if (current == getExclusiveOwnerThread()) { // 代码执行到这里了, 就是所谓的 重入 了 // 这里的acquires的值是1, 所以nextc = 1 + 1 , 也就是2了 int nextc = c + acquires; // 小于0, 说明int溢出了 if (nextc < 0) throw new Error("Maximum lock count exceeded"); // 在这里把状态更新一下, 把state更新为2, 意思就是这个锁被同一个线程获得2次了. // (大家就可以以此类推, 下次再重入的话, 那么就会再+1, 就会变为3....) setState(nextc); // 重入完成, 返回true return true; } return false; }
还记得上小节讲的, 获取锁的时候进入的是这段代码的if语句, 而重入就不一样了, 进入的是 else if语句. 但最终返回的还是true, 表示成功.
上面讲的是无争强的情况, 接下来讲讲有争抢的情况.
3. 公平锁cas争抢失败
场景如下:
一开始锁是空闲状态, 然后两个线程同时争抢这把锁(在cas操作处发生了争抢).
一个线程cas操作成功, 抢到了锁; 另一个线程cas失败.
代码例子如下(代码的意思到位了, 但是这段代码最后不一定会在cas处进行争抢, 大家意会就好了):
cas操作成功的线程就和上面第1小节的一样, 就不用再重复描述了.
而cas争抢失败的线程会何去何从呢? 看我给大家分析:
/** * @return 返回true: 获取到锁; 返回false: 未获取到锁 * 什么时候返回true呢? 1.没有线程在等待锁;2.重入锁,线程本来就持有锁,也就可以理所当然可以直接获取 * @implNote 尝试直接获取锁. */ protected final boolean tryAcquire(int acquires) { // 获取当前线程的引用 final Thread current = Thread.currentThread(); // 当前锁的计数器. int c = getState(); // state == 0 表示此时没有线程持有锁 if (c == 0) { // 本场景中, 一开始锁是空闲的, 所以队列里没有等待的线程 if (!hasQueuedPredecessors() && // 两个线程在这里进行争抢 // cas抢成功的会进入到if代码块 // cas抢失败的, 就跳出整个if-else, 也就是直接到最后一行代码 compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } // cas 操作失败后, 会这直接执行到这里. 返回false. return false; }
在这里返回了false, 回到上一层函数.
第一个条件是true, 所以会继续往下执行acquireQueued方法. 来准备让这个失败的线程进入队列等待.
下面继续来给大家讲解 acquireQueued(addWaiter(Node.EXCLUSIVE), arg) .
先讲讲这个addWaiter(Node.EXCLUSIVE):
/** * 将当前线程封装为Node, 然后根据所给的模式, 进行入队操作 * * @param mode 有两种模式 Node.EXCLUSIVE 独占模式, Node.SHARED 共享模式 * @return 返回新节点, 这个新节点封装了当前线程. */ private Node addWaiter(Node mode) { // 这个mode没用上. Node node = new Node(Thread.currentThread(), mode); // 咱们刚才都没见到过tail被赋予了其他的值, 当然就是null了. Node pred = tail; // tail是null的话, pred就是null, 所以不会进入到这个if语句中.所以跳过这个if语句. if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } // 因为锁的等待队列是懒初始化, 直到有节点插入进来, 它才初始化. // 而现在这个挣钱失败的线程, 正好是锁建立以来, 第一个进入等待队列的线程. 所以现在才准备进行初始化. // 初始化完了后会把当前线程的相关信息和引用封装成Node节点, 然后插入到队列当中.并且制定head 和 tail. // tail就不等于null了, 所以下一次addWaiter方法被调用的时候, 就会执行上面的if语句了. 而不会跳过if语句, 来到这里进行初始化了. enq(node); // 返回这个Node节点. return node; }
目的就是要将这个cas失败的线程封装成节点, 然后插入到队尾中. (等待队列是懒初始化,)
如果队列已经初始化了, 那么tail就不会是null, 就会执行上面代码中的if语句, 调整一下指针的引用就好了.
但是如果队列还未初始化, 那么就应该先初始化, 再插入. 先初始化,再插入, 对应的代码是enq(node).
接下来讲解一下enq方法:
/** * 采用自旋的方式入队 * CAS设置tail,直到争抢成功. */ private Node enq(final Node node) { for (; ; ) { Node t = tail; // 最开始tail肯定是null, 进入if进行初始化head和tail. if (t == null) { // Must initialize // 设置head 和tail. cas来防止并发. if (compareAndSetHead(new Node())) tail = head; // if 语句执行完了后, 之后的for循环就会走else了. } else { // 争抢入队, 没抢到就继续for循环迭代.抢成功了就可以return了,不然一直循环. // 为什么是用cas来争抢呢? 因为怕是多个线程一起执行到这里啊 node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
刚才的addWaiter(Node.EXCLUSIVE) 分析完了, 总之就是addWaiter之后, 队列肯定是被创建完了, 而且还把node(当前线程的封装)插入到了队列的队尾. 并且返回了这个node. acquireQueued(addWaiter(Node.EXCLUSIVE), arg) 可以简化为 acquireQueued(node)
所以继续分析acquireQueued方法.
final boolean acquireQueued(final Node node, int arg) { // node是刚才addWaiter方法插入到队尾的节点 // arg 是 1 boolean failed = true; try { boolean interrupted = false; for (; ; ) { // 获取node节点的前驱. final Node p = node.predecessor(); // 如果node节点的前驱是head if (p == head // 那么可以再尝试着抢一下锁. // 等待队列里的第一个节点很乐观, 因为确实很有可能会马上轮到他 && tryAcquire(arg)) { // 如果这个node就是那么巧合, 刚刚锁被释放了, 这回重新抢就真的抢到了 // 那么就把当前节点设为头结点.(头结点的含义就是当前持有锁的线程) setHead(node); // 上一个节点既然已经释放了锁, 也就该GC了. 置为null, 方便GC收集 p.next = null; // help GC // 很明显是获取锁成功了啊, 所以failed = false failed = false; // 这么大一段代码, 只有这一处return return interrupted; } //---- 如果不是队头, 那么就会执行到这里. //---- 或者虽然作为等待队列里的第一名, 单由于持有锁的线程还是没有释放, 所以还是没抢到锁. 那么也会执行到这里 // 获取锁失败的时候是否该阻塞 if (shouldParkAfterFailedAcquire(p, node) // 在这里阻塞, 等待唤醒 && parkAndCheckInterrupt()) interrupted = true; } } finally { // 上面那段, 如果中途异常了的话, 就会执行到这里. (一般不会到这里的) if (failed) cancelAcquire(node); } }
上面这段代码中shouldParkAfterFailedAcquire方法 和 parkAndCheckInterrupt() 方法 还未解释. 一个一个来.
/** * 当前线程没有抢到锁,是否需要挂起当前线程 * * @param pred 前驱结点 * @param node 当前结点 * @return 如果线程需要被阻塞, 那么就返回true */ private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; // 前驱节点的 waitStatus == -1 ,说明前驱节点状态正常,当前线程需要挂起,直接可以返回true if (ws == Node.SIGNAL) return true; // 大于0, 其实就是等于1, Node.CANCELLED 是 1, 因为状态中只有这个状态是大于0的...说明前驱节点取消了排队 // 所以下面这块代码说的是, 在链表中从prev结点开始, 往前删掉CANCELLED状态的结点. // 只有CANCELLED状态值大于0 if (ws > 0) { do { node.prev = pred = pred.prev; // 删掉之后再往前看看, 看看前面是不是CANCELLED, 如果是, 那还得继续往前删 } while (pred.waitStatus > 0); pred.next = node; } else { // 在前面的两个if语句中排除掉了waitStatus值为-1和1的情况, // 只剩下0,-2,-3这三个状态了 // 然而在我们前面的源码中,都没有看到有设置waitStatus的, // 所以只剩下等于0的情况了 // 下面的操作就是, 如果waitStatus等于0, 那么就用cas将前驱结点的waitStatus设置为-1 compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; }
shouldParkAfterFailedAcquire里的前两行是在判断前驱节点prev的状态. 但是之前咱们分析代码, 并没有发现哪里设置了waitState.
所以waitState是默认值0.
所以shouldParkAfterFailedAcquire会直接执行下面的else, 在这里吧pred的waitState设置为-1, 然后返回false.
回到刚才的acquireQueued方法. 由于外层是for循环, 会在下一次for循环在此执行到shouldParkAfterFailedAcquire方法.
由于刚才已经把前驱节点prev的waitState改为1了, 所以这次在前两行判断prev的waitState时, 直接就满足条件, 然后return true了.
shouldParkAfterFailedAcquire方法return true了, 才会往下执行parkAndCheckInterrupt方法.
下面是parkAndCheckInterrupt()方法. 最终返回Thread.interrupted(). 返回线程是否被中断. (中断和挂起不是一回事 )
/** * 在这里线程阻塞. * 被唤醒的时候会返回, 如果被中断过, 那么就返回true * * @return {@code true} if interrupted */ private final boolean parkAndCheckInterrupt() { // 挂起. MyLockSupport.park(this); return Thread.interrupted(); }
LockSupport.park(this)会挂起当前线程. 但是LockSupport.park还有一个隐藏功能. 就是, 如果先对一个线程unpark, 再对这个线程park, 那么这次的park是失效的. 下一次park才会挂起.
原因就是, 对一个没有被park的线程进行unpark的时候, 会把标志位perm置为1. 而每次park的操作, 都是先去检查perm是否为1.
如果是1, 那么置为0, 并且这次不挂起.
如果perm为0, 那么就直接挂起这个线程.
4. 公平锁由于队列内有元素而失败
demo如下. 前两个线程, 其中一个获取锁成功, 另一个失败, 然后进入等待队列.
稍后, 第三个线程来获取锁, 但是这时由于等待队列中已经有元素在等待了. 所以会直接失败, 然后会被插入到等待队列的尾部.
上面的main方法中总共有三个线程想要占有锁. 前两个锁的争抢在上小节就已经模拟过了.
咱么现在只分析第三个线程申请锁的流程. 这个场景下的tryAcquire方法如下(会直接返回false):
/** * @return 返回true: 获取到锁; 返回false: 未获取到锁 * 什么时候返回true呢? 1.没有线程在等待锁;2.重入锁,线程本来就持有锁,也就可以理所当然可以直接获取 * @implNote 尝试直接获取锁. */ protected final boolean tryAcquire ( int acquires){ // 获取当前线程的引用 final Thread current = Thread.currentThread(); // 当前锁的计数器. int c = getState(); // 不会走这的if语句, 因为锁被其他线程占有, 肯定不是0 if (c == 0) { if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } // 由于队列内有元素, 所以if语句不执行, // 由于不是重入, else if 也不执行. // 直接返回false return false; }
这段方法返回false, 说明需要执行这个. acquireQueued(addWaiter(Node.EXCLUSIVE), arg). 先看看addWaiter方法有什么区别.
/** * 将当前线程封装为Node, 然后根据所给的模式, 进行入队操作 * * @param mode 有两种模式 Node.EXCLUSIVE 独占模式, Node.SHARED 共享模式 * @return 返回新节点, 这个新节点封装了当前线程. */ private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); // 以下几行代码想把当前node加到链表的最后面去,也就是进到阻塞队列的最后 Node pred = tail; // 如果tail不是空, 说明有头结点.说明这个队列已经被初始化了. // 因为本小节讲的就是: 因为公平锁的等待队列中有其他线程才导致当前线程争锁失败, 所以说明等待队列不仅被初始化了, 而且里面还有元素. if (pred != null) { // node设置自己的前驱为pred node.prev = pred; // 用CAS把当前节点node设置为队尾, 如果成功后,tail指针就指向了node if (compareAndSetTail(pred, node)) { // 如果cas争抢成功, 那么就会在这里返回.(而cas失败的, 会跳过这个if代码块, 会执行到下面的enq方法) // 剩下的就是整理一下链表数据结构的连接问题了 // pred调整自己的后继为node pred.next = node; return node; } } // 如果在上面的cas中设置失败, 那么还是会执行到这里. // 然后在enq方法里靠for循环+cas的形式, 不断尝试着插入到队尾. enq(node); return node; }
后续执行的就和上小节的一样了.就不重复了...
当然, 场景是举不完的, 举完的话就跟笛卡尔积那样了. 我这里只是靠这四个例子来尽量完整地分析了获取锁的流程.
三. 公平锁释放锁
刚才申请锁的流程. 但是争抢失败的那些线程, 最后都进入到了等待队列里, 然后就杳无音讯了.
那当前持有锁的线程释放锁后, 是如何唤醒等待队列里的线程, 让下一个线程获取锁的呢?
咱么接下来分析一下释放锁的过程吧.
1. 申请1次锁, 执行一些业务, 然后释放
咱们只关注unlock, lock就跳过了, 前面讲过了.
ReentrantLock类的lock()方法 代码如下:
而这个release是AQS里的方法. 源码如下:
其中arg变量值是1. 首先会执行tryRelease(1) 来尝试释放锁.
如果尝试成功了, 那么tryRelease(1)就会返回true, 就会继续执行if代码块里的内容.
如果尝试失败了, 那么tryRelease(1)就会返回false. 然后就会跳过if语句, 最终本段方法(release方法)也会返回false.
咱们先分析一下tryRelease方法吧(tryRelease方法的源码在Sync抽象类里):
protected final boolean tryRelease(int releases) { // releases == 1 // c 就是重入次数 -1 , 由于本场景下模拟的是简单的获取一次锁, 然后释放, 不涉及到重入. 所以getState() == 1 // 所以c = 1 - 1 , c现在等于0 int c = getState() - releases; // 判断当前的线程是不是持有锁的线程, 不然抛异常. // 这是为了其他的线程捣乱. if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); // 用于标记是否可以完全释放锁 boolean free = false; // c等于0, 说明没有重入了, 可以完全释放了. if (c == 0) { // 标记一下, 准备完全释放锁 free = true; // 把锁的持有者设置为空, 表示锁被释放. setExclusiveOwnerThread(null); } // 把刚才c==0 设置为state setState(c); // 表示是否完全释放. 本场景下返回true return free; }
由于返回的是true, 所以返回后还有if语句块要执行:
接下来分析一下其中的unparkSuccessor方法, 看看他是如何唤醒下一个节点的.(这个方法在AQS里)
unpark之后, 就会把之前park(挂起)的线程激活, 然后继续执行:
如果线程被中断了, 那么parkAndCheckInterrupt()方法会返回true, 然后就会执行interrupted = true 这句话.
挂起和中断不是一回事, 一般不会被中断的. 所以一般不会执行interrupted=true这句话.
外层是个for循环, 当前线程被激活后, 作为等待队列中的第一个线程, 来进行获取锁. 由于是公平锁, 所以可以放心拿到, 没有人会抢, 所以会正常获取到锁.
2. 重入锁的释放
释放重入的锁(同一个线程多次获取的锁), 执行流程唯一不同的就是tryRelease方法了, 其他的都一样, 可以直接参考上面一小节的.
咱么看看重入的时候, tryRelease是如何执行的吧.
protected final boolean tryRelease(int releases) { // 其实就是重入计数器 -1 // 而由于本线程获取了2次这个锁, 所以state字段的值为2 // 所以c = 2 - 1 // 所以现在c == 1 int c = getState() - releases; // 判断当前的线程是不是持有锁的线程, 不然抛异常. if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); // 用来标记是否完全释放锁 boolean free = false; // c现在等于1, 不会进入这个if代码块 if (c == 0) { free = true; setExclusiveOwnerThread(null); } // 设置重入计数器, 也就是让 state =1 setState(c); // 返回false. return free; }
本小节和上小节的区别就是这段代码了.既然这段方法返回false, 那么返回后, release方法的if代码块自然也就不执行了.
四. 非公平锁的获取
刚才讲解了公平锁, 那么接下来讲讲非公平锁, 到底是怎么个不公平呢?
1. 非公平锁与公平锁获取的区别
由于非公平锁的获取与公平锁的获取, 只有一点点区别. 所以咱么只分析出区别就好了, 其他的部分都一样的.
然后会调用到NonfairSync类里的lock()方法.
这里就体现出了区别.
公平锁里的lock()方法里面, 只有acquire(1).
而非公平锁在acquire(1)之前多了一次cas操作. 一上来就尝试着抢占锁, 看看有没有机会(万一真的这个时候持有锁的线程正好把锁释放了呢). 非公平锁根本不管是否有其他人在排队.上来就是一抢.
当这次cas失败了, 才会像公平锁一样进入acquire(1)方法:
这里和公平锁一样. 只是, 非公平锁的tryAcquire方法和公平锁的tryAcquire方法内部实现不一样.
看看非公平锁的tryAcquire方法吧:
咱们继续往下看看nonfairTryAcquire方法吧:
/** * 不公平地尝试获取锁. * 不公平的语义就是: 不用判断队列里是否有其他线程在等待, 直接抢. */ final boolean nonfairTryAcquire(int acquires) { // 获取当前线程的引用 final Thread current = Thread.currentThread(); // 当前线程的重入次数 int c = getState(); // 如果是0, 表示此时此刻锁还被被任何一个线程所占用 if (c == 0) { // 当c==0的时候, 公平锁锁是先判断队列里是否有其他线程在等待, 如果没有, 再去cas争抢. // 而非公平锁这里, 就是根本就不去理会等待队列, 自己抓到机会就赶紧抢 // cas来争抢, 让重入次数变1. // 用cas是因为这个地方会发生并发. // 多个抢占当然只有一个成功了 if (compareAndSetState(0, acquires)) { // 设置锁的拥有者为当前线程. setExclusiveOwnerThread(current); return true; } // 如果不是0, 说明锁被某一个线程占用了 // 既然被占用了, 那就有两种情况: 1. 被自己占用; 2. 被别的线程占用 // 所以先看看是不是自己占用的, 如果是自己占用的, 那就重入. } else if (current == getExclusiveOwnerThread()) { // 其实就是+1 int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); // 这里不会产生争抢, 不必用cas // 因为只有占用锁的这一个线程才能进入到这个else if 里 // 一个线程不可能发生争抢 setState(nextc); return true; } // 1. 如果在if里的cas争抢失败 // 2. 或者是不满足else if的条件 // 那就会直接返回false // 不管是成功还是失败, 都不会有线程的等待阻塞之类的. 都是立即返回. return false; }
这里的非公平锁的nonfairTryAcquire方法 和 公平锁的tryAcquire方法很像. 区别就是:
非公平锁是, 当c==0. 也就是此时此刻, 锁是空闲状态的时候. 直接就尝试着用cas来争抢锁, 看看是否能成功, 而不管等待队列是否还有其他线程再等待.
而公平锁在c==0的时候, 也就是state==0 的时候, 先去看看队列里是否有其他线程再等待, 如果队列里没有其他线程在等待, 才会去cas争抢. 不然就会把机会让给队列里的第一个线程, 而自己会进入到等待队列的尾部.
为什么c==0了, 队列里还有可能会有其他的元素在等待呢?
因为c==0只是说明当前锁的状态是空闲状态. 只是上一个线程刚刚把锁释放, 当前线程就来争抢锁了, 还没来得及唤醒等待队列里的第一个线程呢.
其他地方就跟公平锁都一样的, 就是多了本小节讲的两处cas.
五. 非公平锁的释放
1. 非公平锁会导致饥饿
也就是说, 上一个线程释放锁后, `等待队列` 里的第一个线程就会被激活, 然后会执行tryAcquire方法. 如果这个时候有新的线程来争抢,
由于是非公平模式, 有可能新的线程会抢到这个锁. 如果新的线程抢到了锁, 那么刚刚被激活的线程(等待队列里的第一个线程)就是执行tryAcquire失败, 这个方法执行失败就意味着会被再次被挂起. 如果并发量严重, 很可能`等待队列`里的所有线程在一定时间内都无法被正常调度.也就是产生了线程饥饿的现象.
六. Condition简介
1. condition简介和demo
public class Main { private static MyReentrantLock lock = new MyReentrantLock(); private static Condition condition = lock.newCondition(); public static void main(String[] args) { new Thread(Main::funcA).start(); new Thread(Main::funcB).start(); } public static void funcA() { lock.lock(); System.out.println("await之前"); try { condition.await(); // 在这里等待被其他线程通知(signal) } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("await之后"); lock.unlock(); } public static void funcB() { lock.lock(); System.out.println("signal之前"); condition.signal(); // 这里会通知funcA的await().让funcA()继续执行下去 System.out.println("signal之后"); lock.unlock(); } }
运行这段代码, 输出如下:
如果还是没有体会到区别, 那么把main方法里的第二行注释掉, 然后再执行一下:
输出结果如下:
也就是说, await()会使当前线程挂起, 需要其他线程通知他, 他才能被激活(唤醒).
七. Condition实例化
1. 获取condition的例子
2. condition实例化的源码
newCondition方法在ReentrantLock类里的实现如下:
Sync类里的newCondition()方法如下:
ConditionObject是AQS里的一个内部类,实现自Condition, 类的声明如下(具体源码后面再解释):
八. condition的等待(await) 和 通知(signal)
1. 只执行一句await()后的流程
await()方法的具体实现在AQS里的内部类ConditionObject类里:
public final void await() throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); // 添加到 condition 的`条件队列`中 Node node = addConditionWaiter(); // 完全释放锁,返回值是释放锁之前的 state 值 int savedState = fullyRelease(node); int interruptMode = 0; // 这里的isOnSyncQueue就是在判断node节点是否在锁的`等待队列`里 while (!isOnSyncQueue(node)) { // 在这里线程挂起 MyLockSupport.park(this); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } //---- 程序不会执行到下面, 因为在前面就已经挂起了. if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); }
这里面有几个方法之前没提到过, 在这里一一攻破.
先解决addConditionWaiter()方法:
/** * 将当前线程对应的节点入队,插入队尾, 并且作为本方法的返回值. */ private Node addConditionWaiter() { // 本例子中的场景下, 只执行过一次await()方法, 所以是第一个进入`条件队列`的元素. // 所以lastWaiter和firstWaiter肯定都是null. Node t = lastWaiter; // 本例子中t==null, 所以这段if暂时不考虑吧 if (t != null && t.waitStatus != Node.CONDITION) { unlinkCancelledWaiters(); t = lastWaiter; } // 新建节点 Node node = new Node(Thread.currentThread(), Node.CONDITION); // 因为t==null, 意思是队列目前还是空的, 所以这个节点是第一个节点, 所以是firstWaiter. if (t == null) firstWaiter = node; else t.nextWaiter = node; // 但node同时也是最后一个节点, 也就是lastWaiter lastWaiter = node; // 最后会返回本方法 return node; }
接下来是fullRelease(node)方法, 来完全释放锁:
final int fullyRelease(Node node) { boolean failed = true; try { int savedState = getState(); // 这里使用了当前的 state 作为 release 的参数,也就是完全释放掉锁,将 state 置为 0 if (release(savedState)) { failed = false; // 并且把释放锁之前的state值返回出去. (本例子中是1) return savedState; } else { throw new IllegalMonitorStateException(); } } finally { if (failed) node.waitStatus = Node.CANCELLED; } }
最后就是isOnSyncQueue(node)方法, 来判断锁的`等待队列`中有没有当前这个node:
/** * 这个方法就是判断 node 是否已经移动到sync queue了 * (signal 的时候会将节点从条件队列移到sync queue) */ final boolean isOnSyncQueue(Node node) { // 当进入Condition队列时,waitStatus肯定为CONDITION, // 如果同时别的线程调用signal,Node会从Condition队列中移除, // 并且移除时会清除CONDITION状态。 // 从移除到进入sync queue队列,中间这段时间prev必然为null,所以还是返回false,即被park if (node.waitStatus == Node.CONDITION || node.prev == null) // 本例子中, 会在这里返回 return false; //--- 本例子中, 程序不会往下执行了. 但是下面的代码还是分析一下吧. 这样待会儿就不用再重新讲个方法了. // 当别的线程进入sync queue队列时,会和前一个Node建立前后关系,所以如果next存在,说明一定在release队列中 if (node.next != null) // If has successor, it must be on queue return true; // 到这里还没找到, 那只能去锁的`等待队列`里一个一个找了 // 可能该Node刚刚最后一个进入release队列,所以是tail,其next必然是null,所以需要从队尾向前查找 // 这个方法的源码就不讲了, 太简单了, 就是链表从后往前找node.找到了就true.没找到就false. return findNodeFromTail(node); }
最终会执行到await()方法里的park()方法, 线程挂起. 等待被别的线程唤醒.
2. 只执行一句signal()后的流程
然后咱们看看signal()的源码.
由于firstWaiter==null, 所以first==null, signal方法直接就退出了.
3.一个线程await等待, 另一个线程用signal来唤醒
本场景的程序demo如下:
public class Main { private static Scanner scanner = new Scanner(System.in); private static MyReentrantLock lock = new MyReentrantLock(); private static Condition condition = lock.newCondition(); public static void main(String[] args) throws Exception { new Thread(Main::funcA).start(); new Thread(Main::funcB).start(); } public static void funcA() { lock.lock(); System.out.println("await之前"); try { condition.await(); // 在这里等待被其他线程通知(signal) } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("await之后"); lock.unlock(); } public static void funcB() { lock.lock(); System.out.println("signal之前"); System.out.print("请输入任意内容并回车, 以执行signal方法: "); scanner.next(); // 在这里进行阻塞, 在控制台输入任意内容后回车, 就会接触阻塞, 就会执行signal方法, 也就是通知funcA()方法. condition.signal(); // 这里会通知funcA的await().让funcA()继续执行下去 System.out.println("signal之后"); lock.unlock(); } }
首先, await()仍然执行到park这句, 然后挂起, 这点与本章第1小节的流程是一样的(看下图, 我选中的park那行代码, await就在这里挂起):
而此时控制台如下:
此时还没有执行signal, 因为我用输入流给signal方法进行阻塞了, 需要输入内容后回车, 就可以调用到signal方法.signal通知后,await就会被唤醒.
如下:
咱们分析一下signal是如何通知await, 然后让await线程被唤醒的:
因为刚才执行过await(), 所以firstWaiter不会是null. 所以会调用到doSignal方法:
上面这段代码也比较简单, 就是将firstWaiter为头的这个链表, 把第一个元素出队, 然后让第二个元素当新的头部. 然后让刚才出队的那个元素执行tansferForSignal方法.
/** * 将节点从条件队列转移到锁的`等待队列` * * true 代表成功转移 * false 代表在 signal 之前,节点已经取消了 */ final boolean transferForSignal(Node node) { /* * 在这里将 waitStatus 置为 0. * 如果成功设置为0, 那么继续往下面执行 * 如果CAS 失败,说明此 node 的 waitStatus 已不是 Node.CONDITION,说明节点已经取消,那么直接return false. */ if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) return false; /* * enq(node): 自旋进入阻塞队列的队尾.这个在将lock()方法的时候大家见到过.就是同一个方法. * 这里的返回值 p 是 node 在阻塞队列的前驱节点 */ Node p = enq(node); int ws = p.waitStatus; // ws > 0 说明 node 在阻塞队列中的前驱节点取消了等待锁,直接唤醒 node 对应的线程。 // 如果 ws <= 0, 那么 compareAndSetWaitStatus 将会被调用 // 因为节点入队后,需要把前驱节点的状态设为 Node.SIGNAL(-1) if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) // 如果前驱节点取消或者 CAS 失败,会进到这里唤醒线程(但是本场景下不会执行到这里) MyLockSupport.unpark(node.thread); // 返回true return true; }
将上面这段代码总结一下就是: 将本节点的waitState设置为了0. 然后让本节点插入到到锁的`等待队列`, 然后将前驱节点的waitState设置为了1. 然后返回了true.
这一行的tansferForSignal返回了true, 取反了就是false了, 所以退出了 while循环. 至此signal方法就执行完毕了.
signal干的主要事情就是: 把`条件队列`里的第一个元素转移(尾插)到了锁的`等待队列`里.
`条件队列`就是firstWaiter为头结点的一个链表.
`等待队列`就是咱们上面将lock() unlock()的时候提到的锁的等待队列.
signal方法执行完了后, 接下来就该执行unlock()方法了. 如下图:
unlock()所做的事情就是, 释放当前的锁, 然后激活`等待队列`里的第一个线程.
而在本场景下, 现在等待队列里有且仅有一个元素, 就是signal方法转移的那个元素.
unlock()之前分析过, unlock会调用release方法:
release方法所做的就是释放锁(第一个红色代码), 然后唤醒`等待队列`里的第一个线程(第二个红色代码).
unlock()方法执行完了后, 刚才await挂起的那个线程就又被激活了.
所以接下来执行的是acquireQueued方法, 这个方法在将锁的时候讲过, 所以这里简单讲解一下:
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (; ; ) { // 本场景下: node是队列里的第一个元素, 也就是await的线程对应的node. // 本场景下: p是node的前一个节点, 也就是head节点了 final Node p = node.predecessor(); // 本场景下: p==head. 锁现在空闲, tryAcquire也会成功. if (p == head && tryAcquire(arg)) { // 将node设置为新的head. head节点隐含的意思就是: head节点对应的线程是当前锁的持有者 setHead(node); p.next = null; // help GC failed = false; // 返回false. 因为本场景下该线程没有被中断过. return interrupted; } //--- 本场景下, 不会执行到下面的代码 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
最终这个方法返回了true. 接下来, await()方法继续执行剩下的几行代码就可以退出了:
这两行就是做了相应的维护操作, 和线程中断判断, 这里就不讲解了.
随后,await方法执行完了, 退出方法栈.
然后就继续往下执行. 执行System.out.println, 然后是unlock.
至此本段程序就执行完了.
以上是关于[源码分析]ReentrantLock & AbstractQueuedSynchronizer & Condition的主要内容,如果未能解决你的问题,请参考以下文章
concurrent互斥锁ReentrantLock & 源码分析