Java Review - 并发编程_ CountDownLatch原理&源码剖析

Posted 小小工匠

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Java Review - 并发编程_ CountDownLatch原理&源码剖析相关的知识,希望对你有一定的参考价值。

文章目录


Pre

每日一博 - CountDownLatch使用场景分析以及源码分析

在日常开发中经常会遇到需要在主线程中开启多个线程去并行执行任务,并且主线程需要等待所有子线程执行完毕后再进行汇总的场景。

在CountDownLatch出现之前一般都使用线程的join()方法来实现这一点,但是join方法不够灵活,不能够满足不同场景的需要,所以JDK开发组提供了CountDownLatch这个类,使用CountDownLatch会更优雅.


小Demo


 
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

/**
 * @author 小工匠
 * @version 1.0
 * @description: TODO
 * @date 2021/12/19 10:46
 * @mark: show me the code , change the world
 */
public class CountDownLatchTest 

    // 创建一个CountDownLatch实例
    private static volatile CountDownLatch countDownLatch = new CountDownLatch(2);

    public static void main(String[] args) throws InterruptedException 

        ExecutorService executorService = Executors.newFixedThreadPool(2);

        executorService.submit(() -> 
            System.out.println(Thread.currentThread().getName() + " 模拟业务运行");

            try 
                TimeUnit.SECONDS.sleep(1);
                System.out.println(Thread.currentThread().getName() + " 业务运行Over");
             catch (InterruptedException e) 
                e.printStackTrace();
            finally 
                // 子线程执行结束,减1
                countDownLatch.countDown();
            


        );


        executorService.submit(() -> 
            System.out.println(Thread.currentThread().getName() + " 模拟业务运行");

            try 
                TimeUnit.SECONDS.sleep(1);
                System.out.println(Thread.currentThread().getName() + " 业务运行Over");
             catch (InterruptedException e) 
                e.printStackTrace();
            finally 
                // 子线程执行结束,减1
                countDownLatch.countDown();
            



        );

        // 等待子线程执行执行结束  返回
        countDownLatch.await();
        System.out.println( "子线程业务运行Over,主线程继续工作");

        executorService.shutdown();
    



如上代码中,

  • 创建了一个CountDownLatch实例,因为有两个子线程所以构造函数的传参为2。

  • 主线程调用countDownLatch.await()方法后会被阻塞。

  • 子线程执行完毕后调用countDownLatch.countDown()方法让countDownLatch内部的计数器减1

  • 所有子线程执行完毕并调用countDown()方法后计数器会变为0,这时候主线程的await()方法才会返回。


CountDownLatch VS join方法

  • 调用一个子线程的join()方法后,该线程会一直被阻塞直到子线程运行完毕

  • 而CountDownLatch则使用计数器来允许子线程运行完毕或者在运行中递减计数,也就是CountDownLatch可以在子线程运行的任何时候让await方法返回而不一定必须等到线程结束

  • 另外,使用线程池来管理线程时一般都是直接添加Runable到线程池,这时候就没有办法再调用线程的join方法了,就是说countDownLatch相比join方法让我们对线程同步有更灵活的控制


类图关系

从类图可以看出,CountDownLatch是使用AQS实现的。

通过下面的构造函数, 实际上是把计数器的值赋给了AQS的状态变量state,也就是这里使用AQS的状态值来表示计数器值。

 /**
     * Constructs a @code CountDownLatch initialized with the given count.
     *
     * @param count the number of times @link #countDown must be invoked
     *        before threads can pass through @link #await
     * @throws IllegalArgumentException if @code count is negative
     */
    public CountDownLatch(int count) 
        if (count < 0) throw new IllegalArgumentException("count < 0");
        this.sync = new Sync(count);
    
 Sync(int count) 
            setState(count);
        

        int getCount() 
            return getState();
        

核心方法&源码解析

接下来分析CountDownLatch中的几个重要的方法,看它们是如何调用AQS来实现功能的。

void await()

当线程调用CountDownLatch对象的await方法后,当前线程会被阻塞,直到下面的情况之一发生才会返回

  • 当所有线程都调用了CountDownLatch对象的countDown方法后,也就是计数器的值为0时

  • 其他线程调用了当前线程的interrupt()方法中断了当前线程,当前线程就会抛出InterruptedException异常,然后返回

   public void await() throws InterruptedException 
        sync.acquireSharedInterruptibly(1);
    

await()方法委托sync调用了AQS的acquireSharedInterruptibly方法

	// AQS获取共享资源时响应中断的方法
   public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException 
         // 响应中断 
        if (Thread.interrupted())
            throw new InterruptedException();
       // 查看当前计数器是否为0 ,为0 直接返回,否则进入AQS队列等待  
        if (tryAcquireShared(arg) < 0)
            doAcquireSharedInterruptibly(arg);
    

        protected int tryAcquireShared(int acquires) 
            return (getState() == 0) ? 1 : -1;
        

  • 由如上代码可知,该方法的特点是线程获取资源时可以被中断,并且获取的资源是共享资源。

  • acquireSharedInterruptibly首先判断当前线程是否已被中断,若是则抛出异常,否则调用sync实现的tryAcquireShared方法查看当前状态值(计数器值)是否为0,是则当前线程的await()方法直接返回,否则调用AQS的doAcquireSharedInterruptibly方法让当前线程阻塞。

  • 另外可以看到,这里tryAcquireShared传递的arg参数没有被用到,调用tryAcquireShared的方法仅仅是为了检查当前状态值是不是为0,并没有调用CAS让当前状态值减1。


boolean await(long timeout, TimeUnit unit)

当线程调用了CountDownLatch对象的该方法后,当前线程会被阻塞,直到下面的情况之一发生才会返回

  • 当所有线程都调用了CountDownLatch对象的countDown方法后,也就是计数器值为0时,这时候会返回true

  • 设置的timeout时间到了,因为超时而返回false

  • 其他线程调用了当前线程的interrupt()方法中断了当前线程,当前线程会抛出InterruptedException异常,然后返回

    public boolean await(long timeout, TimeUnit unit)
        throws InterruptedException 
        return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
    


void countDown()

线程调用该方法后,计数器的值递减,递减后如果计数器值为0则唤醒所有因调用await方法而被阻塞的线程,否则什么都不做

下面看下countDown()方法是如何调用AQS的方法的。

 /**
     * Decrements the count of the latch, releasing all waiting threads if
     * the count reaches zero.
     *
     * <p>If the current count is greater than zero then it is decremented.
     * If the new count is zero then all waiting threads are re-enabled for
     * thread scheduling purposes.
     *
     * <p>If the current count equals zero then nothing happens.
     */
    public void countDown() 
    	// 委托调用AQS的releaseShared
        sync.releaseShared(1);
    

AQS的方法

  /**
     * Releases in shared mode.  Implemented by unblocking one or more
     * threads if @link #tryReleaseShared returns true.
     *
     * @param arg the release argument.  This value is conveyed to
     *        @link #tryReleaseShared but is otherwise uninterpreted
     *        and can represent anything you like.
     * @return the value returned from @link #tryReleaseShared
     */
    public final boolean releaseShared(int arg) 
    	// 调用syn实现的tryReleaseShared
        if (tryReleaseShared(arg)) 
        	// AQS释放资源的方法
            doReleaseShared();
            return true;
        
        return false;
    

在如上代码中,releaseShared首先调用了sync实现的AQS的tryReleaseShared方法,代码如下

       protected boolean tryReleaseShared(int releases) 
            // Decrement count; signal when transition to zero
            // 循环进行CAS, 直到当前线程成功弯沉CAS使计数器值(状态值state)减一 并更新state
            for (;;) 
                int c = getState();
                // 1 如果状态值为0 ,则直接返回
                if (c == 0)
                    return false;
               // 2 使用CAS让计数器减1     
                int nextc = c-1;
                if (compareAndSetState(c, nextc))
                    return nextc == 0;
            
        

如上代码

  • 首先获取当前状态值(计数器值)。

  • 代码(1)判断如果当前状态值为0则直接返回false,从而countDown()方法直接返回

  • 否则执行代码(2)使用CAS将计数器值减1,CAS失败则循环重试,否则如果当前计数器值为0则返回true,返回true说明是最后一个线程调用的countdown方法,那么该线程除了让计数器值减1外,还需要唤醒因调用CountDownLatch的await方法而被阻塞的线程,具体是调用AQS的doReleaseShared方法来激活阻塞的线程

  • 这里代码(1)貌似是多余的,其实不然,之所以添加代码(1)是为了防止当计数器值为0后,其他线程又调用了countDown方法,如果没有代码(1),状态值就可能会变成负数。


long getCount()

获取当前计数器的值,也就是AQS的state的值,一般在测试时使用该方法

  /**
     * Returns the current count.
     *
     * <p>This method is typically used for debugging and testing purposes.
     *
     * @return the current count
     */
    public long getCount() 
        return sync.getCount();
    

在其内部还是调用了AQS的getState方法来获取state的值(计数器当前值)


小结

CountDownLatch是使用AQS实现的。使用AQS的状态变量state来存放计数器的值。

首先在初始化CountDownLatch时设置状态值(计数器值),当多个线程调用countdown方法时实际是原子性递减AQS的状态值。

当线程调用await方法后当前线程会被放入AQS的阻塞队列等待计数器为0再返回。其他线程调用countdown方法让计数器值递减1,当计数器值变为0时,当前线程还要调用AQS的doReleaseShared方法来激活由于调用await()方法而被阻塞的线程。

以上是关于Java Review - 并发编程_ CountDownLatch原理&源码剖析的主要内容,如果未能解决你的问题,请参考以下文章

Java Review - 并发编程_Unsafe

Java Review - 并发编程_Unsafe

Java Review - 并发编程_前置知识二

Java Review - 并发编程_抽象同步队列AQS

Java Review - 并发编程_ 回环屏障CyclicBarrier原理&源码剖析

Java Review - 并发编程_ThreadPoolExecutor原理&源码剖析