实战:JUC 中 AQS之共享锁源码分析

Posted Java技术实战

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了实战:JUC 中 AQS之共享锁源码分析相关的知识,希望对你有一定的参考价值。

每天为您推送优质技术文章


搞清楚AQS独占锁的实现原理之后,再看共享锁的实现原理就会轻松很多。两种锁模式之间很多通用的地方本文只会简单说明一下,就不在赘述了,具体细节可以参考我的上篇文章

一、执行过程概述

获取锁的过程:

1、当线程调用 acquireShared()申请获取锁资源时,如果成功,则进入临界区。
2、当获取锁失败时,则创建一个共享类型的节点并进入一个FIFO等待队列,然后被挂起等待唤醒。
3、当队列中的等待线程被唤醒以后就重新尝试获取锁资源,如果成功则唤醒后面还在等待的共享节点并把该唤醒事件传递下去,即会依次唤醒在该节点后面的所有共享节点,然后进入临界区,否则继续挂起等待。

释放锁过程:

当线程调用 releaseShared()进行锁资源释放时,如果释放成功,则唤醒队列中等待的节点,如果有的话。

二、源码深入分析

基于上面所说的共享锁执行流程,我们接下来看下源码实现逻辑:

此方法是共享模式下线程获取共享资源的顶层入口。它会获取指定量的资源,获取成功则直接返回,获取失败则进入等待队列,直到获取到资源为止

首先来看下获取锁的方法 acquireShared(),如下

    
      
      
    
  1. public final void acquireShared(int arg) {

  2. //尝试获取共享锁,返回值小于0表示获取失败

  3. if (tryAcquireShared(arg) < 0)

  4. //执行获取锁失败以后的方法

  5. doAcquireShared(arg);

  6. }

这里 tryAcquireShared()方法是留给用户去实现具体的获取锁逻辑的。关于该方法的实现有两点需要特别说明:

1、该方法必须自己检查当前上下文是否支持获取共享锁,如果支持再进行获取。该方法返回值是个重点。
2、如果返回值小于0表示获取锁失败,需要进入等待队列。
3、如果返回值等于0表示当前线程获取共享锁成功,但它后续的线程是无法继续获取的,也就是不需要把它后面等待的节点唤醒。
4、如果返回值大于0,表示当前线程获取共享锁成功且它后续等待的节点也有可能继续获取共享锁成功,也就是说此时需要把后续节点唤醒让它们去尝试获取共享锁。

有了上面的约定,我们再来看下 doAcquireShared方法的实现:

doAcquireShared(int arg)

    
      
      
    
  1. //参数不多说,就是传给acquireShared()的参数

  2. private void doAcquireShared(int arg) {

  3. //添加等待节点到队列尾部,方法跟独占锁一样,唯一区别就是节点类型变为了共享型,不再赘述

  4. final Node node = addWaiter(Node.SHARED);

  5. boolean failed = true;

  6. try {

  7. boolean interrupted = false;

  8. for (;;) {

  9. final Node p = node.predecessor();

  10. //表示前面的head节点已经获取到锁,自己会尝试获取锁

  11. if (p == head) {

  12. //尝试获取资源

  13. int r = tryAcquireShared(arg);

  14. //注意上面说的, 等于0表示不用唤醒后继节点,大于0需要

  15. if (r >= 0) {

  16. //这里是重点,获取到锁以后的唤醒操作,后面详细说

  17. //将head指向自己,还有剩余资源可以再唤醒之后的线程

  18. setHeadAndPropagate(node, r);

  19. p.next = null;

  20. //如果是因为中断醒来则设置中断标记位

  21. if (interrupted)

  22. selfInterrupt();

  23. failed = false;

  24. return;

  25. }

  26. }

  27. //挂起逻辑跟独占锁一样,不再赘述

  28. //判断状态,寻找安全点,进入waiting状态,等着被unpark()或interrupt()

  29. if (shouldParkAfterFailedAcquire(p, node) &&

  30. parkAndCheckInterrupt())

  31. interrupted = true;

  32. }

  33. } finally {

  34. //获取失败的取消逻辑跟独占锁一样,不再赘述

  35. if (failed)

  36. cancelAcquire(node);

  37. }

  38. }

1、创建一个新节点(共享模式),加入到队尾,这个过程和独占模式一样,不再赘述;
2、判断新节点的前置节点是否为头节点,如果不是头节点,就将前置节点的状态标志位设置为 SIGNAL,当前线程可以安全地挂起,整个过程结束;
3、如果它的前趋是头节点,就让前置在共享模式下获取锁,如果获取成功,把当前节点设置为头节点;
4、设置为头节点之后,满足释放锁条件就阻塞等待释放锁。

满足释放锁的条件为:允许传播或者需要通知继任节点,或者继任节点是共享模式的节点

以下源码分析

    
      
      
    
  1. final Node node = addWaiter(Node.SHARED);

  2. shouldParkAfterFailedAcquire(p, node)

  3. parkAndCheckInterrupt()

  4. cancelAcquire(node);

参考我的上篇文章

独占锁模式获取成功以后设置头结点然后返回中断状态,结束流程。

共享锁模式获取成功以后,调用了 setHeadAndPropagate方法,从方法名就可以看出除了设置新的头结点以外还有一个传递动作,一起看下代码:

setHeadAndPropagate(Node node, int propagate)

    
      
      
    
  1. //两个入参,一个是当前成功获取共享锁的节点,一个就是tryAcquireShared方法的返回值,注意上面说的,它可能大于0也可能等于0

  2. private void setHeadAndPropagate(Node node, int propagate) {

  3. Node h = head; //记录当前头节点

  4. //设置新的头节点,即把当前获取到锁的节点设置为头节点,指向自己

  5. setHead(node);

  6. //这里意思有两种情况是需要执行唤醒操作

  7. //1.propagate > 0 表示调用方指明了后继节点需要被唤醒

  8. //2.头节点后面的节点需要被唤醒(waitStatus<0),不论是老的头结点还是新的头结点

  9. if (propagate > 0 || h == null || h.waitStatus < 0 ||

  10. (h = head) == null || h.waitStatus < 0) {

  11. Node s = node.next;

  12. //如果当前节点的后继节点是共享类型获取没有后继节点,则进行唤醒

  13. //这里可以理解为除非明确指明不需要唤醒(后继等待节点是独占类型),否则都要唤醒

  14. if (s == null || s.isShared())

  15. //后面详细说

  16. doReleaseShared();

  17. }

  18. }


  19. private void setHead(Node node) {

  20. head = node;

  21. node.thread = null;

  22. node.prev = null;

  23. }

此方法在 setHead()的基础上多了一步,就是自己苏醒的同时,如果条件符合(比如还有剩余资源),还会去唤醒后继结点,毕竟是共享模式!

梳理一下 acquireShared()的流程::

tryAcquireShared()尝试获取资源,成功则直接返回; 失败则通过 doAcquireShared()进入等待队列 park(),直到被 unpark()/interrupt()并成功获取到资源才返回。整个等待过程也是忽略中断的。 其实跟 acquire()的流程大同小异,只不过多了个自己拿到资源后,还会去唤醒后继队友的操作(这才是共享嘛)。

doReleaseShared

最终的唤醒操作也很复杂,专门拿出来分析一下:注:这个唤醒操作在 releaseShare()方法里也会调用。

    
      
      
    
  1. private void doReleaseShared() {

  2. for (;;) {

  3. //唤醒操作由头结点开始,注意这里的头节点已经是上面新设置的头结点了

  4. //其实就是唤醒上面新获取到共享锁的节点的后继节点

  5. Node h = head;

  6. if (h != null && h != tail) {

  7. int ws = h.waitStatus;

  8. //表示后继节点需要被唤醒

  9. if (ws == Node.SIGNAL) {

  10. //这里需要控制并发,因为入口有setHeadAndPropagate跟release两个,避免两次unpark

  11. if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))

  12. continue;

  13. //执行唤醒操作

  14. unparkSuccessor(h);

  15. }

  16. //如果后继节点暂时不需要唤醒,则把当前节点状态设置为PROPAGATE确保以后可以传递下去

  17. else if (ws == 0 &&

  18. !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))

  19. continue;

  20. }

  21. //如果头结点没有发生变化,表示设置完成,退出循环

  22. //如果头结点发生变化,比如说其他线程获取到了锁,为了使自己的唤醒动作可以传递,必须进行重试

  23. if (h == head)

  24. break;

  25. }

  26. }

接下来看下释放共享锁的过程:

releaseShared

此方法是共享模式下线程释放共享资源的顶层入口。它会释放指定量的资源,如果彻底释放了(即state=0),它会唤醒等待队列里的其他线程来获取资源

    
      
      
    
  1. public final boolean releaseShared(int arg) {

  2. //尝试释放共享锁

  3. if (tryReleaseShared(arg)) {

  4. //唤醒过程,详情见上面分析

  5. doReleaseShared();

  6. return true;

  7. }

  8. return false;

  9. }

注:上面的 setHeadAndPropagate()方法表示等待队列中的线程成功获取到共享锁,这时候它需要唤醒它后面的共享节点(如果有),但是当通过 releaseShared()方法去释放一个共享锁的时候,接下来等待独占锁跟共享锁的线程都可以被唤醒进行尝试获取。

简单应用

相信大家已经基本理解AQS的原理了。

不同的自定义同步器争用共享资源的方式也不同。自定义同步器在实现时只需要实现共享资源state的获取与释放方式即可,至于具体线程等待队列的维护(如获取资源失败入队/唤醒出队等),AQS已经在顶层实现好了。自定义同步器实现时主要实现以下几种方法:

isHeldExclusively():该线程是否正在独占资源。只有用到condition才需要去实现它。

tryAcquire(int):独占方式。尝试获取资源,成功则返回true,失败则返回false。

tryRelease(int):独占方式。尝试释放资源,成功则返回true,失败则返回false。

tryAcquireShared(int):共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。

tryReleaseShared(int):共享方式。尝试释放资源,成功则返回true,失败则返回false。

三、总结

跟独占锁相比,共享锁的主要特征在于当一个在等待队列中的共享节点成功获取到锁以后(它获取到的是共享锁),既然是共享,那它必须要依次唤醒后面所有可以跟它一起共享当前锁资源的节点,毫无疑问,这些节点必须也是在等待共享锁(这是大前提,如果等待的是独占锁,那前面已经有一个共享节点获取锁了,它肯定是获取不到的)。当共享锁被释放的时候,可以用读写锁为例进行思考,当一个读锁被释放,此时不论是读锁还是写锁都是可以竞争资源的。

长按二维码关注「Java技术实战」

点个在看再走呗!

以上是关于实战:JUC 中 AQS之共享锁源码分析的主要内容,如果未能解决你的问题,请参考以下文章

JUC锁框架_AbstractQueuedSynchronizer详细分析

JUC学习之共享模型工具之JUC并发工具包上

Java并发之AQS源码分析

JUC系列LOCK框架系列之五 核心锁类AbstractQueuedSynchonizer

ReentrantLock 源码分析以及 AQS

Java高并发编程实战6,通过AQS源码分析lock()锁机制