JAVA进阶之路-CountDownLatch源码走读

Posted LuckyZhouStar

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了JAVA进阶之路-CountDownLatch源码走读相关的知识,希望对你有一定的参考价值。

前言

本章用到了之前谈到的AQS,就是在该FIFO阻塞框架的基础上改造的,不理解的,可以去看JAVA进阶之路-AbstractQueuedSynchronizer(AQS)源码走读

用途

CountDownLatch是一个同步工具类,它允许一个或多个线程一直等待,直到其他线程的操作执行完后再执行。说得通俗易懂一点,也可以理解成是一个带有计数器功能的锁,当计数器的数值为 0 时就会释放这个锁。

CountDownLatch 和 ReentrantLock 一样,都是基于 AQS 实现的锁。(关于 AQS 和 ReentrantLock,可以参考 ReentrantLock 源码解析)但它们的不同之处在于,ReentrantLock 是独占锁,CountDownLatch 是共享锁。

所谓独占锁就是在同一时间内只有一条线程能够持有这个锁,而共享锁则是在同一时间内可以有一个或多个线程持有这个锁。
主要的就是两个方法,一个是await方法,一个是countDown方法

代码解读

//初始化state变量值
  public CountDownLatch(int count) 
        if (count < 0) throw new IllegalArgumentException("count < 0");
        this.sync = new Sync(count);
    


//阻塞方法,获取锁的过程
 public void await() throws InterruptedException 
        sync.acquireSharedInterruptibly(1);
    

阻塞方法,获取锁的过程
   public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException 
            //如果该阻塞的线程,某个地方被中断过的话,直接抛出异常
        if (Thread.interrupted())
            throw new InterruptedException();
            //尝试获取锁代码
        if (tryAcquireShared(arg) < 0)
        //小于0的时候,代表还有线程处于阻塞状态
            doAcquireSharedInterruptibly(arg);
    


//如果当前的count值不等于0,直接返回-1,否则返回1
 protected int tryAcquireShared(int acquires) 
            return (getState() == 0) ? 1 : -1;
        

 
 //该部分代码与AQS的代码原理一样,增加一个node节点,来阻塞当前的线程,如果调用wait多次的话,就会形成一个CLH的阻塞队列节点链表,内部通过自旋的方式来进行判断获取锁的逻辑
    private void doAcquireSharedInterruptibly(int arg)
        throws InterruptedException 
        final Node node = addWaiter(Node.SHARED);
        try 
            for (;;) 
                final Node p = node.predecessor();
                if (p == head) 
                //如果当前节点是头节点,就再次获取锁
                    int r = tryAcquireShared(arg);
                    //如果count被减少为0了,就进行释放操作
                    if (r >= 0) 
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        return;
                    
                
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    throw new InterruptedException();
            
         catch (Throwable t) 
            cancelAcquire(node);
            throw t;
        
    

  private void setHeadAndPropagate(Node node, int propagate) 
        Node h = head; // Record old head for check below
        setHead(node);
        if (propagate > 0 || h == null || h.waitStatus < 0 ||
            (h = head) == null || h.waitStatus < 0) 
            Node s = node.next;
            if (s == null || s.isShared())
                doReleaseShared();
        
    



//通过自旋的方式,来释放阻塞节点上的线程
 private void doReleaseShared() 
   
        for (;;) 
            Node h = head;
            if (h != null && h != tail) 
                int ws = h.waitStatus;
                if (ws == Node.SIGNAL) 
                    if (!h.compareAndSetWaitStatus(Node.SIGNAL, 0))
                        continue;            // loop to recheck cases
                    unparkSuccessor(h);
                
                else if (ws == 0 &&
                         !h.compareAndSetWaitStatus(0, Node.PROPAGATE))
                    continue;                // loop on failed CAS
            
            if (h == head)                   // loop if head changed
                break;
        
    

//对count减1操作
 public void countDown() 
        sync.releaseShared(1);
    

//释放锁操作
public final boolean releaseShared(int arg) 
        if (tryReleaseShared(arg)) 
        //如果当前count只为0.则进行释放锁的逻辑
            doReleaseShared();
            return true;
        
        return false;
    

//cas的方式来更新count值
 protected boolean tryReleaseShared(int releases) 
            // Decrement count; signal when transition to zero
            for (;;) 
                int c = getState();
                if (c == 0)
                    return false;
                int nextc = c - 1;
                if (compareAndSetState(c, nextc))
                    return nextc == 0;
            
        

总结

1、内部通过一个count值来控制线程之间的状态交互
2、当await的时候,会在CLH中添加一个node节点,并阻塞当前的线程,内部通过自旋的方式来一直判断当前count值,从而来释放当前线程
3、countdown()的时候,会对count值减1,并判断如果为0的话,就会进行释放锁的逻辑,也就是环形CLH队列中各个节点上的线程

以上是关于JAVA进阶之路-CountDownLatch源码走读的主要内容,如果未能解决你的问题,请参考以下文章

java进阶之路-java中的threadlocal源码实现

JAVA进阶之路-AbstractQueuedSynchronizer(AQS)源码走读

JAVA进阶之路-AbstractQueuedSynchronizer(AQS)源码走读

《Netty进阶之路》目录

死磕 java同步系列之CountDownLatch源码解析

Java Review - 并发编程_ CountDownLatch原理&源码剖析