深入理解J.U.C并发工具类

Posted sakura1027

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了深入理解J.U.C并发工具类相关的知识,希望对你有一定的参考价值。

1. CountDownLatch

1.1 什么是CountDownLatch

如果要用一句话说明CountDownLatch的用处,那就是用来控制一个线程等待多个线程
技术图片

1.2 CountDownLatch典型应用

第一种用法:
假设有一场马拉松比赛,那么对参赛者的排名肯定是在所有参赛者跑完比赛之后进行,即N个线程执行操作,主线程等到N个子线程执行完毕之后再执行

package juc;

import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.stream.IntStream;

/**
 * @author zhaobin11@baidu.com
 */
public class CountDownLatchTest {

    public static void main(String[] args) throws InterruptedException {
        int threadCount = 10;
        final CountDownLatch latch = new CountDownLatch(threadCount);

        IntStream.range(0, threadCount).forEach(i ->
                new Thread(() -> {
                    System.out.println(Thread.currentThread().getId() + "号选手开始出发");
                    try {
                        // 模拟运动员跑步耗时
                        Thread.sleep(new Random().nextInt(1000));
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println(Thread.currentThread().getId() + "号选手已到达终点");
                    latch.countDown();
                }).start()
        );

        latch.await();
        System.out.println("所有选手到达终点,开始计算排名");
    }
}

运行结果为

13号选手开始出发
16号选手开始出发
15号选手开始出发
12号选手开始出发
14号选手开始出发
19号选手开始出发
18号选手开始出发
17号选手开始出发
20号选手开始出发
21号选手开始出发
19号选手已到达终点
20号选手已到达终点
17号选手已到达终点
21号选手已到达终点
16号选手已到达终点
13号选手已到达终点
15号选手已到达终点
18号选手已到达终点
14号选手已到达终点
12号选手已到达终点
所有选手到达终点,开始计算排名

CountDownLatch允许一个或多个线程一直等待,直到其他线程的操作执行完后再执行
CountDownLatch有3个重要的变量&方法

  1. count:初始化count数表示子线程计数器,只有为0时,主线程才会向下执行
  2. countDown()方法:计数器的值-1,如果计数达到0,释放所有等待的线程
  3. await()方法:使主线程等到计数器为0才执行

接下来的两种都是作者Doug Lea给出来的关于CountDownLatch比较好的一些实践,更具模板性
第二种用法:
CountDownLatch初始化为N可以用来作一个线程等待,直到N个线程完成某项操作,或某些动作已经完成N次

class Driver { // ...
    void main() throws InterruptedException {
        CountDownLatch startSignal = new CountDownLatch(1);
        CountDownLatch doneSignal = new CountDownLatch(N);

        for (int i = 0; i < N; ++i) // create and start threads
            new Thread(new Worker(startSignal, doneSignal)).start();

        doSomethingElse();            // don‘t let run yet
        startSignal.countDown();      // let all threads proceed
        doSomethingElse();
        doneSignal.await();           // wait for all to finish
    }
}

class Worker implements Runnable {
    private final CountDownLatch startSignal;
    private final CountDownLatch doneSignal;

    Worker(CountDownLatch startSignal, CountDownLatch doneSignal) {
        this.startSignal = startSignal;
        this.doneSignal = doneSignal;
    }

    public void run() {
        try {
            startSignal.await();
            doWork();
            doneSignal.countDown();
        } catch (InterruptedException ex) {
        } // return;
    }

    void doWork() { ...}
}

第三种用法:
另一个典型的用法是把一个问题划分为N个部分,每一个部分用一个线程执行并把所有的线程放在Executor里面排队,执行完成就调用countDown方法,当所有子线程都执行完成,接应线程就可以通过await,即不被CountDownLatch的await方法阻塞了

class Driver { // ...
    void main() throws InterruptedException {
        CountDownLatch doneSignal = new CountDownLatch(N);
        Executor e = ...

        for (int i = 0; i < N; ++i) // create and start threads
            e.execute(new WorkerRunnable(doneSignal, i));

        doneSignal.await();           // wait for all to finish
    }
}

class WorkerRunnable implements Runnable {
    private final CountDownLatch doneSignal;
    private final int i;

    WorkerRunnable(CountDownLatch doneSignal, int i) {
        this.doneSignal = doneSignal;
        this.i = i;
    }

    public void run() {
        try {
            doWork(i);
            doneSignal.countDown();
        } catch (InterruptedException ex) {
        } // return;
    }

    void doWork() { ...}
}

1.3 CountDownLatch源码分析

// CountDownLatch底层由AQS支持
public class CountDownLatch {
    // 内部类Sync
    private static final class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 4982264981922014374L;

        Sync(int count) {
            setState(count);
        }

        int getCount() {
            return getState();
        }

        protected int tryAcquireShared(int acquires) {
            return (getState() == 0) ? 1 : -1;
        }

        protected boolean tryReleaseShared(int releases) {
            // Decrement count; signal when transition to zero
            for (; ; ) {
                int c = getState();
                if (c == 0)
                    return false;
                int nextc = c - 1;
                if (compareAndSetState(c, nextc))
                    return nextc == 0;
            }
        }
    }

    private final Sync sync;

    // 构造一个给定初始化为count的CountDownLatch count是线程能通过await方法之前需要调用countDown方法的次数
    public CountDownLatch(int count) {
        if (count < 0) throw new IllegalArgumentException("count < 0");
        this.sync = new Sync(count);
    }

    /*
    使当前线程在倒计数为零前一直等待,除非线程被中断
    如果当前count值为0,那么await方法会立即返回    如果当前count值大于0,当前线程会处于休眠状态直到:
    1.由于调用countDown方法使得count值达到0    2.其他某个线程中断了当前线程
    如果当前线程在进入await方法时已经设置了中断状态,或者在等待时被中断,则抛出InterruptedException异常,并清除当前线程的已中断状态
     */
    public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }

    /*
    使当前线程在倒计数为零前一直等待,除非线程被中断或超出了指定的等待时间
    如果当前count值为0,那么await方法会立即返回true    如果当前count值大于0,当前线程会处于休眠状态直到:
    1.由于调用countDown方法使得count值达到0    2.其他某个线程中断了当前线程    3.已超出指定的等待时间
    如果计数达到零,那么await(long timeout, TimeUnit unit)会返回true
    如果当前线程在进入await方法时已经设置了中断状态,或者在等待时被中断,则抛出InterruptedException异常,并清除当前线程的已中断状态
    如果超出了指定的等待时间,则返回值为false,如果指定时间小于或等于0,那么await(long timeout, TimeUnit unit)根本不会等待,即不会阻塞主线程
    timeout:要等待的最长时间    unit:timeout参数的时间单位

    如果计数到达零,则返回true    如果在计数到达零之前超过了等待时间,则返回false
    */
    public boolean await(long timeout, TimeUnit unit)
            throws InterruptedException {
        return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
    }

    /*
    递减闭锁的count值,如果计数到达零,则释放所有等待的线程
    当前count值大于0,递减    当前count等于0,被阻塞的线程可以被调度了,且count值不会递减了
    */
    public void countDown() {
        sync.releaseShared(1);
    }

    public long getCount() {
        return sync.getCount();
    }

    public String toString() {
        return super.toString() + "[Count = " + sync.getCount() + "]";
    }
}






以上是关于深入理解J.U.C并发工具类的主要内容,如果未能解决你的问题,请参考以下文章

j.u.c系列(09)---之并发工具类:CyclicBarrier

Java并发和高并发学习总结- J.U.C之工具类

阿里集团中间件4面:J.U.C并发框架+RocketMQ +MyCat+锁机制+架构

高并发第八弹:J.U.C起航(java.util.concurrent)

并发编程-线程安全策略之并发容器(J.U.C)中的集合类

并发容器J.U.C --组件FutureTaskForkJoinBlockingQueue