AQS实现共享锁及CountDownLatch源码解析

Posted Pimow博客

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了AQS实现共享锁及CountDownLatch源码解析相关的知识,希望对你有一定的参考价值。

内容目录

AQS实现共享锁释放共享锁ConuntDownLatch简介CountDownLatch源码解析内部类syncCountDownLatch()初始化await()方法countDown()方法总结阅读更多文章

AQS实现共享锁

1:共享锁shared是一个乐观锁。可以允许多个线程阻塞点,可以多个线程同时获取到锁。
它允许一个资源可以被多个读操作,或者被一个写操作访问,但是两个操作不能同时访问。

  /**
   * 获取共享锁
   */

  public final void acquireShared(int arg) {
     //tryReleaseShared()返回值标志了是否允许其他线程继续进入获取失败;返回大于等于允许,否则不允许
    if (tryAcquireShared(arg) < 0)
       //获取锁失败调用该方法
       doAcquireShared(arg);
 }

    private void doAcquireShared(int arg) {
    //把当前节点添加对clh队列中,addWaiter()讲aqs实现独占锁已经详细分析,此处讲解大概
    //1:快速添加,判断tail节点是否为空,不为空直接添加
    //2:为空调用enq()方法,初始化head和tail
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
   try {
       boolean interrupted = false;
       for (;;) {
         //获取当前节点的prev节点
        final Node p = node.predecessor();
        //判断是否是head头结点
        if (p == head) {
            //返回负数表示获取失败;返回大于等于0表示成功
            int r = tryAcquireShared(arg);
            // 一旦共享获取成功,设置新的头结点,并且唤醒后继线程
            if (r >= 0) {
                setHeadAndPropagate(node, r);
                p.next = null// help GC
                if (interrupted)
                    selfInterrupt();
                failed = false;
                return;
            }
        }
        //shouldParkAfterFailedAcquire()之前也讲过,此处讲解大概
        //1:获取前驱节点状态,等于-1返回true,大于0循环删除掉取消的节点,
        //执行到这里代表节点是0或者PROPAGATE,CAS修改为SIGNAL,但是还不能park挂起线程。需要重试是否能获取,如果不能则挂起。
        //重试的目的是有可能代码执行到这里的时候上个线程unpark啦。
        if (shouldParkAfterFailedAcquire(p, node) &&
            parkAndCheckInterrupt())
            interrupted = true;
    }
finally {
    if (failed)
        cancelAcquire(node);
   }
}

/**
* 这个函数做的事情有两件:
* 1. 设置当前节点为head节点
* 2. 判断是否要唤醒后继节点
*/

 private void setHeadAndPropagate(Node node, int propagate) {
  // head节点赋值
  Node h = head;
  //把当前节点设置为head
  setHead(node);
/*
 * propagate是tryAcquireShared()的返回值,是传播唤醒的依据之一。
 * h.waitStatus为SIGNAL或者PROPAGATE时也根据node的下一个节点共享来决定是否传播唤醒,此处不涉及condition只需考虑waitstatus小于0
 * h == null 不知道是什么情况出现的,如果有知道的大佬希望可以指点
 * 这里为什么不能只用propagate > 0来决定是否可以传播? 网上查阅资料说是为了解决信号量的bug
 */

 if (propagate > 0 || h == null || h.waitStatus < 0 ||
    (h = head) == null || h.waitStatus < 0) {
    Node s = node.next;
    /*
     * s 如果是共享类型节点,则应该唤醒该节点
     * s=null,并不能说明s是尾节点 为什么会为空呢?由于cpu的切换,可能会导导致tail节点跟前面的节连接不上。enq()方法已经讲了原理
     */
 
    if (s == null || s.isShared())
        doReleaseShared();
   }
}

/**
 * 唤醒下一个线程或者设置传播状态。
 * 后继线程被唤醒后,会尝试获取共享锁,如果成功之后,则又会调用setHeadAndPropagate,将唤醒传播下去。
 * 这个函数的作用是保障在acquire和release存在竞争的情况下,保证队列中处于等待状态的节点能被唤醒。
 */

private void doReleaseShared() {

for (;;) {
    Node h = head;
    // 如果队列中存在尾节点,并且head节点不为空
    if (h != null && h != tail) {
        //获取h节点状态
        int ws = h.waitStatus;
         //如果等于SIGNAL,则把这个节点状态改为0,并且唤醒这个节点
        if (ws == Node.SIGNAL) {
            if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                continue;
            unparkSuccessor(h);
        }
        // 如果h节点的状态为0,需要设置为PROPAGATE。
        //这样的话获取锁的线程在执行setHeadAndPropagate时可以读到PROPAGATE,从而由获取锁的线程去释放后继等待线程。
        else if (ws == 0 &&
                 !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
            continue;
    }
    // 检查h是否仍然是head,如果不是的话需要再进行循环。
    if (h == head)
        break;
    }
}

释放共享锁

  public final boolean releaseShared(int arg) {
   if (tryReleaseShared(arg)) {
    //实现共享锁的时候已经介绍改方法
    doReleaseShared();
    return true;
    }
     return false;
  }

ConuntDownLatch简介

for example:大头儿子和小头爸爸各自出去玩,出门前,妈妈说,你们两个要一起回家,如果大头儿子没回来,那小头爸爸就只能在家门口等着,我不开门。你们必须要一起敲门回来我再开门。这个例子说明CountDownLatch有一个计数器,当计数器不为零时所有线程一直阻塞。当计数器为零时,则所有等待的线程就全部唤醒开始工作。

countDownLatch类中只有下面几个方法,实现蛮简单。其中有一个内部类sync继承AbstractQueuedSynchronizer

CountDownLatch源码解析

内部类sync

 /**
 * 使用AQS的state代表count的值
 */

private static final class Sync extends AbstractQueuedSynchronizer {
    private static final long serialVersionUID = 4982264981922014374L;
    //初始化state值,调用aqs的setState()设置值
    Sync(int count) {
        setState(count);
    }
    //获取state值
    int getCount() {
        return getState();
    }
    //获取共享锁
    protected int tryAcquireShared(int acquires) {
        return (getState() == 0) ? 1 : -1;
    }
    //释放共享锁
    protected boolean tryReleaseShared(int releases) {
        // 递减state,如果state数为0返回false
        for (;;) {
            int c = getState();
            if (c == 0)
                return false;
            //state减1,通过cas修改state值
            int nextc = c-1;
            if (compareAndSetState(c, nextc))
                //如果state=1返回true,返回true会调用aqs类doReleaseShared()方法
                return nextc == 0;
        }
    }
}

CountDownLatch()初始化

   public CountDownLatch(int count) {
      if (count < 0
  throw new IllegalArgumentException("count < 0");
      //构造函数内完成了sync的初始化,并设置state值
      this.sync = new Sync(count);
}  

await()方法

1:判断初始化的state是否等于0,等于直接往后执行。否则执行doAcquireSharedInterruptibly,会阻塞

 public void await() throws InterruptedException {
    //调用了aqs的acquireSharedInterruptibly()方法
    sync.acquireSharedInterruptibly(1);
  }

 public final void acquireSharedInterruptibly(int arg)
        throws InterruptedException 
{
     //判断线程是否终端,终端抛出异常
    if (Thread.interrupted())
        throw new InterruptedException();
    //tryAcquireShared()方法判断state是否等于0,0返回1,否则-1
    if (tryAcquireShared(arg) < 0)
        //如果返回-1说明还有线程没有执行countDown()
        doAcquireSharedInterruptibly(arg);
}

/**
 * 实现共享锁可中断
 */

private void doAcquireSharedInterruptibly(int arg)
    throws InterruptedException 
{
    //加入一个节点到CLH队列,这个加入节点的过程在之前的aqs文章已讲解
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        for (;;) {
            //获取当前节点的prev节点
            final Node p = node.predecessor();
            //判断节点是否是头节点
            if (p == head) {
                //判断state是否等于0,0返回1,否则-1
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null// help GC
                    failed = false;
                    return;
                }
            }
             //此处逻辑跟之前AQS实现独占锁逻辑一样
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

countDown()方法

1:state的值-1,

 public void countDown() {
    //调用aqs的releaseShared()方法,然后调用sync类中tryReleaseShared()并且返回true,说明要唤醒阻塞的线程,调用 doReleaseShared();
    sync.releaseShared(1);
  }

总结

一:AQS实现共享锁
1:获取共享同步状态
2:若获取失败,生成节点,进入队列
3:如果前驱为头结点,再次获取共享同步状态
4:获取成功则将自己设为头结点,如果后继节点是共享类型的,则唤醒
5:将节点状态设为 SIGNAL,失败则unpark。如果节点状态为0,修改为PROPAGATE
二:countDountLatch()
1:初始化state
2:调用一次countDown(),然后调用sync类中tryReleaseShared()并且返回true,说明要唤醒阻塞的线程,调用doReleaseShared();返回false说明不需阻塞。
3:await()判断初始化的state是否等于0,等于直接往后执行。否则执行doAcquireSharedInterruptibly。继续判断是否是头结点,并且再一次判断state的值。等于0设置为setHeadAndPropagate。否则阻塞。

阅读更多文章


以上是关于AQS实现共享锁及CountDownLatch源码解析的主要内容,如果未能解决你的问题,请参考以下文章

JDK源码分析深入源码分析CountDownLatch

AQS源码剖析第三篇--共享模式

CountDownLatch源码解析

JUC包中的CountDownLatch源码实现分析

浅析JUC-CountDownLatch

浅析JUC-CountDownLatch