深入理解J.U.C并发工具类
Posted sakura1027
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了深入理解J.U.C并发工具类相关的知识,希望对你有一定的参考价值。
1. CountDownLatch
1.1 什么是CountDownLatch
如果要用一句话说明CountDownLatch的用处,那就是用来控制一个线程等待多个线程
1.2 CountDownLatch典型应用
第一种用法:
假设有一场马拉松比赛,那么对参赛者的排名肯定是在所有参赛者跑完比赛之后进行,即N个线程执行操作,主线程等到N个子线程执行完毕之后再执行
package juc;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.stream.IntStream;
/**
* @author zhaobin11@baidu.com
*/
public class CountDownLatchTest {
public static void main(String[] args) throws InterruptedException {
int threadCount = 10;
final CountDownLatch latch = new CountDownLatch(threadCount);
IntStream.range(0, threadCount).forEach(i ->
new Thread(() -> {
System.out.println(Thread.currentThread().getId() + "号选手开始出发");
try {
// 模拟运动员跑步耗时
Thread.sleep(new Random().nextInt(1000));
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getId() + "号选手已到达终点");
latch.countDown();
}).start()
);
latch.await();
System.out.println("所有选手到达终点,开始计算排名");
}
}
运行结果为
13号选手开始出发
16号选手开始出发
15号选手开始出发
12号选手开始出发
14号选手开始出发
19号选手开始出发
18号选手开始出发
17号选手开始出发
20号选手开始出发
21号选手开始出发
19号选手已到达终点
20号选手已到达终点
17号选手已到达终点
21号选手已到达终点
16号选手已到达终点
13号选手已到达终点
15号选手已到达终点
18号选手已到达终点
14号选手已到达终点
12号选手已到达终点
所有选手到达终点,开始计算排名
CountDownLatch允许一个或多个线程一直等待,直到其他线程的操作执行完后再执行
CountDownLatch有3个重要的变量&方法
- count:初始化count数表示子线程计数器,只有为0时,主线程才会向下执行
- countDown()方法:计数器的值-1,如果计数达到0,释放所有等待的线程
- await()方法:使主线程等到计数器为0才执行
接下来的两种都是作者Doug Lea给出来的关于CountDownLatch比较好的一些实践,更具模板性
第二种用法:
CountDownLatch初始化为N可以用来作一个线程等待,直到N个线程完成某项操作,或某些动作已经完成N次
class Driver { // ...
void main() throws InterruptedException {
CountDownLatch startSignal = new CountDownLatch(1);
CountDownLatch doneSignal = new CountDownLatch(N);
for (int i = 0; i < N; ++i) // create and start threads
new Thread(new Worker(startSignal, doneSignal)).start();
doSomethingElse(); // don‘t let run yet
startSignal.countDown(); // let all threads proceed
doSomethingElse();
doneSignal.await(); // wait for all to finish
}
}
class Worker implements Runnable {
private final CountDownLatch startSignal;
private final CountDownLatch doneSignal;
Worker(CountDownLatch startSignal, CountDownLatch doneSignal) {
this.startSignal = startSignal;
this.doneSignal = doneSignal;
}
public void run() {
try {
startSignal.await();
doWork();
doneSignal.countDown();
} catch (InterruptedException ex) {
} // return;
}
void doWork() { ...}
}
第三种用法:
另一个典型的用法是把一个问题划分为N个部分,每一个部分用一个线程执行并把所有的线程放在Executor里面排队,执行完成就调用countDown方法,当所有子线程都执行完成,接应线程就可以通过await,即不被CountDownLatch的await方法阻塞了
class Driver { // ...
void main() throws InterruptedException {
CountDownLatch doneSignal = new CountDownLatch(N);
Executor e = ...
for (int i = 0; i < N; ++i) // create and start threads
e.execute(new WorkerRunnable(doneSignal, i));
doneSignal.await(); // wait for all to finish
}
}
class WorkerRunnable implements Runnable {
private final CountDownLatch doneSignal;
private final int i;
WorkerRunnable(CountDownLatch doneSignal, int i) {
this.doneSignal = doneSignal;
this.i = i;
}
public void run() {
try {
doWork(i);
doneSignal.countDown();
} catch (InterruptedException ex) {
} // return;
}
void doWork() { ...}
}
1.3 CountDownLatch源码分析
// CountDownLatch底层由AQS支持
public class CountDownLatch {
// 内部类Sync
private static final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 4982264981922014374L;
Sync(int count) {
setState(count);
}
int getCount() {
return getState();
}
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (; ; ) {
int c = getState();
if (c == 0)
return false;
int nextc = c - 1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
}
private final Sync sync;
// 构造一个给定初始化为count的CountDownLatch count是线程能通过await方法之前需要调用countDown方法的次数
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
/*
使当前线程在倒计数为零前一直等待,除非线程被中断
如果当前count值为0,那么await方法会立即返回 如果当前count值大于0,当前线程会处于休眠状态直到:
1.由于调用countDown方法使得count值达到0 2.其他某个线程中断了当前线程
如果当前线程在进入await方法时已经设置了中断状态,或者在等待时被中断,则抛出InterruptedException异常,并清除当前线程的已中断状态
*/
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
/*
使当前线程在倒计数为零前一直等待,除非线程被中断或超出了指定的等待时间
如果当前count值为0,那么await方法会立即返回true 如果当前count值大于0,当前线程会处于休眠状态直到:
1.由于调用countDown方法使得count值达到0 2.其他某个线程中断了当前线程 3.已超出指定的等待时间
如果计数达到零,那么await(long timeout, TimeUnit unit)会返回true
如果当前线程在进入await方法时已经设置了中断状态,或者在等待时被中断,则抛出InterruptedException异常,并清除当前线程的已中断状态
如果超出了指定的等待时间,则返回值为false,如果指定时间小于或等于0,那么await(long timeout, TimeUnit unit)根本不会等待,即不会阻塞主线程
timeout:要等待的最长时间 unit:timeout参数的时间单位
如果计数到达零,则返回true 如果在计数到达零之前超过了等待时间,则返回false
*/
public boolean await(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}
/*
递减闭锁的count值,如果计数到达零,则释放所有等待的线程
当前count值大于0,递减 当前count等于0,被阻塞的线程可以被调度了,且count值不会递减了
*/
public void countDown() {
sync.releaseShared(1);
}
public long getCount() {
return sync.getCount();
}
public String toString() {
return super.toString() + "[Count = " + sync.getCount() + "]";
}
}
以上是关于深入理解J.U.C并发工具类的主要内容,如果未能解决你的问题,请参考以下文章
j.u.c系列(09)---之并发工具类:CyclicBarrier
阿里集团中间件4面:J.U.C并发框架+RocketMQ +MyCat+锁机制+架构