JUCCountdownLatch源码分析

Posted LL.LEBRON

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了JUCCountdownLatch源码分析相关的知识,希望对你有一定的参考价值。

CountdownLatch

1.简介

CountDownLatch,可以翻译为倒计时器,但是似乎不太准确,它的含义是允许一个或多个线程等待其它线程的操作执行完毕后再执行后续的操作。

CountDownLatch的通常用法和Thread.join()有点类似,等待其它线程都完成后再执行主任务。

其中构造参数用来初始化等待计数值,await() 用来等待计数归零,countDown() 用来让计数减一。

类结构:

CountDownLatch中只包含了Sync一个内部类,它没有公平/非公平模式,所以它算是一个比较简单的同步器了。

这里还要注意一点,CountDownLatch没有实现Serializable接口,所以它不是可序列化的。

栗子演示:

@Slf4j(topic = "c.TestCountdownLatch")
public class TestCountdownLatch 
    public static void main(String[] args) throws InterruptedException 
        CountDownLatch latch = new CountDownLatch(3);
        new Thread(() -> 
            log.debug("begin...");
            Sleeper.sleep(1);
            latch.countDown();
            log.debug("end...");
        ).start();
        new Thread(() -> 
            log.debug("begin...");
            Sleeper.sleep(2);
            latch.countDown();
            log.debug("end...");
        ).start();
        new Thread(() -> 
            log.debug("begin...");
            Sleeper.sleep(1.5);
            latch.countDown();
            log.debug("end...");
        ).start();
        log.debug("waiting...");
        latch.await();
        log.debug("wait end...");
    

输出结果:

可以配合线程池使用,改进如下:

@Slf4j(topic = "c.TestCountdownLatch")
public class TestCountdownLatch 
    public static void main(String[] args) throws InterruptedException 

        CountDownLatch latch = new CountDownLatch(3);
        //创建线程池
        ExecutorService service = Executors.newFixedThreadPool(4);
        //提交一个任务
        service.submit(() -> 
            log.debug("begin...");
            Sleeper.sleep(1);
            latch.countDown();
            log.debug("end...", latch.getCount());
        );
        service.submit(() -> 
            log.debug("begin...");
            Sleeper.sleep(1.5);
            latch.countDown();
            log.debug("end...", latch.getCount());
        );
        service.submit(() -> 
            log.debug("begin...");
            Sleeper.sleep(2);
            latch.countDown();
            log.debug("end...", latch.getCount());
        );
        service.submit(() -> 
            try 
                log.debug("waiting...");
                latch.await();
                log.debug("wait end...");
             catch (InterruptedException e) 
                e.printStackTrace();
            
        );
    

输出结果:

2.应用—同步等待多线程准备完毕

模拟游戏加载:

@Slf4j(topic = "c.TestCountdownLatch")
 class TestCountdownLatch 

    public static void main(String[] args) throws InterruptedException 
        ExecutorService service=Executors.newFixedThreadPool(10);
        CountDownLatch latch=new CountDownLatch(10);
        Random r=new Random();
        String[] all=new String[10];

        for(int j=0;j<10;j++)
            int t=j;
            service.submit(()->
                for(int i=0;i<=100;i++)
                    try 
                        Thread.sleep(r.nextInt(100));
                     catch (InterruptedException e) 
                        e.printStackTrace();
                    
                    all[t]=i+"%";
                    System.out.print("\\r"+Arrays.toString(all));
                
                latch.countDown();
            );
        
        latch.await();
        //所有玩家都加载100%才开始游戏
        System.out.println("\\n游戏开始");
        service.shutdown();
    

结果:

3.应用—同步等待多个远程调用结束

@RestController
public class TestCountDownlatchController 
    @GetMapping("/order/id")
    public Map<String, Object> order(@PathVariable int id) 
        HashMap<String, Object> map = new HashMap<>();
        map.put("id", id);
        map.put("total", "2300.00");
        sleep(2000);
        return map;
    
    @GetMapping("/product/id")
    public Map<String, Object> product(@PathVariable int id) 
        HashMap<String, Object> map = new HashMap<>();
        if (id == 1) 
            map.put("name", "小爱音箱");
            map.put("price", 300);
         else if (id == 2) 
            map.put("name", "小米手机");
            map.put("price", 2000);
        
        map.put("id", id);
        sleep(1000);
        return map;
    
    @GetMapping("/logistics/id")
    public Map<String, Object> logistics(@PathVariable int id) 
        HashMap<String, Object> map = new HashMap<>();
        map.put("id", id);
        map.put("name", "中通快递");
        sleep(2500);
        return map;
    
    private void sleep(int millis) 
        try 
            Thread.sleep(millis);
         catch (InterruptedException e) 
            e.printStackTrace();
        
    

rest 远程调用:

RestTemplate restTemplate = new RestTemplate();
log.debug("begin");
ExecutorService service = Executors.newCachedThreadPool();
CountDownLatch latch = new CountDownLatch(4);
Future<Map<String,Object>> f1 = service.submit(() -> 
    Map<String, Object> r =
        restTemplate.getForObject("http://localhost:8080/order/1", Map.class, 1);
    return r;
);
Future<Map<String, Object>> f2 = service.submit(() -> 
    Map<String, Object> r =
        restTemplate.getForObject("http://localhost:8080/product/1", Map.class, 1);
    return r;
);
Future<Map<String, Object>> f3 = service.submit(() -> 
    Map<String, Object> r =
        restTemplate.getForObject("http://localhost:8080/product/1", Map.class, 2);
    return r;
);
Future<Map<String, Object>> f4 = service.submit(() -> 
    Map<String, Object> r =
        restTemplate.getForObject("http://localhost:8080/logistics/1", Map.class, 1);
    return r;
);
System.out.println(f1.get());
System.out.println(f2.get());
System.out.println(f3.get());
System.out.println(f4.get());
log.debug("执行完毕");
service.shutdown();

执行结果:

19:51:39.711 c.TestCountDownLatch [main] - begin 
total=2300.00, id=1 
price=300, name=小爱音箱, id=1 
price=2000, name=小米手机, id=2 
name=中通快递, id=1 
19:51:42.407 c.TestCountDownLatch [main] - 执行完毕

4.内部类Sync

Sync重写了tryAcquireShared()tryReleaseShared()方法,并把count存到state变量中去。

private static final class Sync extends AbstractQueuedSynchronizer 
    private static final long serialVersionUID = 4982264981922014374L;

    //传入初始次数
    Sync(int count) 
        setState(count);
    

    //获取剩余的次数
    int getCount() 
        return getState();
    

    //尝试获取共享锁
    protected int tryAcquireShared(int acquires) 
        //这里state等于0的时候返回的是1,也就是说count减为0的时候获取总是成功
        //state不等于0的时候返回的是-1,也就是count不为0的时候总是要排队
        return (getState() == 0) ? 1 : -1;
    

    //尝试释放锁
    protected boolean tryReleaseShared(int releases) 
        for (;;) 
            //获取state的值
            int c = getState();
            //等于0,无法再是否,返回false
            if (c == 0)
                return false;
            //减一
            int nextc = c-1;
            //原子更新state的值
            if (compareAndSetState(c, nextc))
                //减为0的时候返回true,这时会唤醒后面排队的线程
                return nextc == 0;
        
    

5.构造方法

构造方法需要传入一个count,也就是初始次数。

public CountDownLatch(int count) 
    if (count < 0) throw new IllegalArgumentException("count < 0");
    this.sync = new Sync(count);

6.await()方法

await()方法是等待其它线程完成的方法,它会先尝试获取一下共享锁,如果失败则进入AQS的队列中排队等待被唤醒。

根据Sync的源码,state不等于0的时候tryAcquireShared()返回的是-1,也就是说count未减到0的时候所有调用await()方法的线程都要排队。

//java.util.concurrent.CountDownLatch.await()
public void await() throws InterruptedException 
    //调用AQS的acquireSharedInterruptibly()方法
    sync.acquireSharedInterruptibly(1);

//java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly()
public final void acquireSharedInterruptibly(int arg)
        throws InterruptedException 
    if (Thread.interrupted())
        throw new InterruptedException();
    //试获取锁,如果失败则排队
    if (tryAcquireShared(arg) < 0)
        doAcquireSharedInterruptibly(arg);

7.countDown()方法

countDown()方法,会释放一个共享锁,也就是count的次数会减1。

根据上面Sync的源码,我们知道,tryReleaseShared()每次会把count的次数减1,当其减为0的时候返回true,这时候才会唤醒等待的线程。

doReleaseShared()是唤醒等待的线程,这个方法我们在前面的章节中分析过了。

//java.util.concurrent.CountDownLatch.countDown()
public void countDown() 
    //调用AQS的释放共享锁方法
    sync.releaseShared(1);

//java.util.concurrent.locks.AbstractQueuedSynchronizer.releaseShared
public final boolean releaseShared(int arg) 
    //如果尝试释放锁成功,就唤醒下一个节点
    if (tryReleaseShared(arg)) 
        //唤醒下一个节点
        doReleaseShared();
        return true;
    
    return false;

8.总结

  1. CountDownLatch表示允许一个或多个线程等待其它线程的操作执行完毕后再执行后续的操作
  2. CountDownLatch使用AQS的共享锁机制实现
  3. CountDownLatch初始化的时候需要传入次数count
  4. 每次调用countDown()方法count的次数减1
  5. 每次调用await()方法的时候会尝试获取锁,这里的获取锁其实是检查AQS的state变量的值是否为0
  6. 当count的值(也就是state的值)减为0的时候会唤醒排队着的线程(这些线程调用await()进入队列)

以上是关于JUCCountdownLatch源码分析的主要内容,如果未能解决你的问题,请参考以下文章

Mesos源码分析

Mybatis源码分析

Spring源码分析专题——目录

ARouter源码分析

Handler源码分析

Eureka源码分析(六) TimedSupervisorTask