CountDownLatch和CyclicBarrier
Posted vv_a
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了CountDownLatch和CyclicBarrier相关的知识,希望对你有一定的参考价值。
CountDownLatch
线程同步协作,await等待其他所有线程完成倒计时后,恢复运行
state的初始值为count
内部维护了一个AQS同步器,每次countDown后,会进行CAS修改state减1,修改后state为0,则唤醒被阻塞的线程
CountDownLatch是共享锁的一种实现
调用await的时候,如果state不为0,就证明任务还没有执行完毕,await会一直阻塞,await方法之后的语句不会被执行。
然后,CountDownLatch会自旋CAS判断state是否为0,如果state为0,就会释放所有等待的线程,await之后的语句得到执行
CountDownLatch latch = new CountDownLatch(3); // 计数值一般和创建的线程数相同
new Thread(() ->
try
TimeUnit.SECONDS.sleep(1);
catch (InterruptedException e)
e.printStackTrace();
latch.countDown(); // 计数减一
).start();
new Thread(() ->
try
TimeUnit.SECONDS.sleep(5);
catch (InterruptedException e)
e.printStackTrace();
latch.countDown();
).start();
new Thread(() ->
try
TimeUnit.SECONDS.sleep(1);
catch (InterruptedException e)
e.printStackTrace();
latch.countDown();
).start();
log.debug("waiting...");
latch.await(); // 阻塞, 等待计数归0(也即3个线程执行完毕), 主线程被唤醒
log.debug("wait end...");
CountDownLatch latch = new CountDownLatch(3);
ExecutorService pool = Executors.newFixedThreadPool(4);
for (int i = 0; i < 3; i++)
pool.submit(() ->
try
TimeUnit.SECONDS.sleep(1);
catch (InterruptedException e)
e.printStackTrace();
log.debug(": 计数减一",Thread.currentThread().getName());
latch.countDown();
);
pool.submit(() ->
log.debug("waiting...");
try
latch.await(); // 等待上面三个线程跑完任务
catch (InterruptedException e)
e.printStackTrace();
log.debug("wait end");
);
ExecutorService pool = Executors.newFixedThreadPool(10); // 10名玩家
CountDownLatch latch = new CountDownLatch(10); // 计数
Random random = new Random();
String[] all = new String[10];
for (int i = 0; i < 10; i++)
int d = i;
pool.submit(() ->
for (int j = 0; j <= 100; j++)
try
Thread.sleep(random.nextInt(100));
catch (InterruptedException e)
e.printStackTrace();
all[d] = j + "%";
System.out.print("\\r" + Arrays.toString(all));
latch.countDown();
);
latch.await(); // 等待10个玩家都加载到"100%"
System.out.println("\\n游戏开始");
CountDownLatch是一次性的,计算器的值只能在构造器中设置,之后不可以再次设置,CountDownLatch使用完毕后,不能再次被使用。
CyclicBarrier
线程池的线程数要和计数值一致
CountDownLatch的实现是基于AQS,CycliBarrier是基于ReentrantLock(ReentrantLock也属于AQS同步器)和Condition。
CyclicBarrier内部维护count变量作为计数器,count的初始值为parties的值。
每个线程到了栅栏这里,将count减1,减1后不是0,就到Condition中阻塞。
count减1后变为0了,表示当前线程是这一代栅栏等待的最后一个线程,当前线程执行构造器中传的任务,然后唤醒之前被阻塞的所有线程。
然后重置count,开启下一代。
// 场景:有一个栅栏(或者称为障碍物),假设构造时parties传了3
// 前2个线程调用await被阻塞住,第3个线程来了,栅栏才会放行,3个线程才可以继续执行后面的代码
// 第1个和第2个线程调用了await, count值减1, 减完之后不是0, 所以这两个线程都会到Condition休息室中等待
// 第3个线程调用await时,count值减1之后变为0,(如果构造CyclicBarrier时传了task,则第3个线程先会执行那个task)
// 然后第3个线程唤醒Condition中的所有线程,并且将count重置为3,然后第3个线程继续执行await后面的代码
public CyclicBarrier(int parties, Runnable barrierAction)
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;// 一代中栅栏需要等待的线程
this.count = parties;// 计数器(每次调用await, count减1)
this.barrierCommand = barrierAction; // 第parties个线程调用await后,会执行这个task,然后唤醒Condition中的线程
public CyclicBarrier(int parties)
this(parties, null);
private void nextGeneration()
// signal completion of last generation
trip.signalAll(); // 唤醒Condition中的所有线程
// set up next generation
count = parties; // 重置count
generation = new Generation();
int index = --count;
if (index == 0) // tripped
boolean ranAction = false;
try
final Runnable command = barrierCommand;
if (command != null)
command.run(); // 是由第parties个线程执行
ranAction = true;
nextGeneration();
return 0;
finally
if (!ranAction)
breakBarrier();
应用:
ExecutorService pool = Executors.newFixedThreadPool(2);
// 内部维护的count为0后, (一代中最后一个进入barrier的线程)还会将其置为2的
// 构造器传的task, 也是由(一代中最后一个进入barrier的线程)来执行
CyclicBarrier barrier = new CyclicBarrier(2,() -> log.debug("所有任务执行完毕"));
for (int i = 0; i < 3; i++)
pool.submit(() ->
log.debug("task1 begin ====>");
try
barrier.await(); // 每次await,count会减1
catch (InterruptedException | BrokenBarrierException e)
e.printStackTrace();
);
pool.submit(() ->
log.debug("task2 begin ====>");
try
barrier.await();
catch (InterruptedException | BrokenBarrierException e)
e.printStackTrace();
);
02:51:54.950 [pool-1-thread-1] DEBUG cn.study.CountDown - task1 begin ====>
02:51:54.950 [pool-1-thread-2] DEBUG cn.study.CountDown - task2 begin ====>
02:51:54.952 [pool-1-thread-2] DEBUG cn.study.CountDown - 所有任务执行完毕// 这个task是由(这一代中最后一个进入barrier的线程)运行的
02:51:54.953 [pool-1-thread-2] DEBUG cn.study.CountDown - task2 begin ====>
02:51:54.953 [pool-1-thread-1] DEBUG cn.study.CountDown - task1 begin ====>
02:51:54.953 [pool-1-thread-1] DEBUG cn.study.CountDown - 所有任务执行完毕// 这个task是由(这一代中最后一个进入barrier的线程)运行的
02:51:54.953 [pool-1-thread-1] DEBUG cn.study.CountDown - task1 begin ====>
02:51:54.953 [pool-1-thread-2] DEBUG cn.study.CountDown - task2 begin ====>
02:51:54.953 [pool-1-thread-2] DEBUG cn.study.CountDown - 所有任务执行完毕// 这个task是由(这一代中最后一个进入barrier的线程)运行的
源码分析-CyclicBarrier
CyclicBarrier
从用法上来说,CyclicBarrier可能看出是CountDownLatch的高级版本,增加了重置的功能,对于多个线程的中断提供了通知的功能。
具体的用法通过api就有比较详细的介绍。
内部类Generation-如何实现重置功能的
首先CyclicBarrier内部有一个内部静态类Generation。当然在每个CyclicBarrier实例中也有一个Generation域
这个类只有一个内部域broken用来表示当前的屏障是否被打破了。
private static class Generation
boolean broken = false;
Generation只在线程不中断的情况下用来判断CyclicBarrier的状态的。
是由于有count个线程调用了await来正常中断的——即所谓的开闸状态。
还是由于其他特殊原因打破了CyclicBarrier(也就是当前CyclicBarrier无效了)——即所谓的打破状态。
而如果需要重置也就是讲CyclicBarrier实例中的域来重新构建一个新的Generation就可以了。
工作原理
域
private final ReentrantLock lock = new ReentrantLock();//所有方法都通过这个锁来同步。之所以不使用内置锁主要是因为需要抛出异常。此外这里需要的实际上是共享锁,而内置锁不能实现共享锁。
private final Condition trip = lock.newCondition();//通过lock得到的一个状态变量
private final int parties;//通过构造器传入的参数,表示总的等待线程的数量。
private final Runnable barrierCommand;//当屏障正常打开后运行的程序,通过最后一个调用await的线程来执行。
private Generation generation = new Generation();当前的Generation。每当屏障失效或者开闸之后都会自动替换掉。从而实现重置的功能。
锁、条件队列、状态变量、条件谓词之间的关系。
方法
最主要的就是await()方法。
实现的功能:
调用await()的线程会等待直到有足够数量的线程调用await——也就是开闸状态,
- 当最后一个线程到达
或者出现下面的情况——也就是打破状态。
- 有其他线程中断当前线程。则抛出interruptException
- 指定了限时操作,并到达线程,则抛出TimeoutException
- 如果barrier被重置,或者屏障处于打破状态,则抛出BrokenBarrierException
什么样的情况会出现打破状态?当任意等待线程抛出BrokenBarrierException的时候会使得当前屏障处于打破状态。
await方法是通过一个内部方法dowait来实现的。
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException
final ReentrantLock lock = this.lock;
lock.lock();
try
final Generation g = generation;
if (g.broken)//如果当前Generation是处于打破状态则传播这个BrokenBarrierExcption
throw new BrokenBarrierException();
if (Thread.interrupted())
breakBarrier();//如果当前线程被中断则使得当前generation处于打破状态,重置剩余count。并且唤醒状态变量。这时候其他线程会传播BrokenBarrierException.
throw new InterruptedException();
int index = --count;//尝试降低当前count
if (index == 0) // tripped//如果当前状态将为0,则Generation处于开闸状态。运行可能存在的command,设置下一个Generation。相当于每次开闸之后都进行了一次reset。
boolean ranAction = false;
try
final Runnable command = barrierCommand;
if (command != null)
command.run();
ranAction = true;
nextGeneration();
return 0;
finally
if (!ranAction)//如果运行command失败也会导致当前屏障被打破。
breakBarrier();
// loop until tripped, broken, interrupted, or timed out
for (;;)
try
if (!timed)//阻塞在当前的状态变量。
trip.await();
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos);
catch (InterruptedException ie)
if (g == generation && ! g.broken) //如果当前线程被中断了则使得屏障被打破。并抛出异常。
breakBarrier();
throw ie;
else
// We're about to finish waiting even if we had not
// been interrupted, so this interrupt is deemed to
// "belong" to subsequent execution.
Thread.currentThread().interrupt();//这种捕获了InterruptException之后调用Thread.currentThread().interrupt()是一种通用的方式。但是之前源码中好像都没有体现。我第一次见这个好像是java并发实践中。这样做的目的是什么?其实就是为了保存中断状态,从而让其他更高层次的代码注意到这个中断。但是需要注意的是这里需要其他代码予以配合才行否则这样做其实是比较危险的一种方式,因为这相当于吞了这个异常。
//从阻塞恢复之后,需要重新判断当前的状态。
if (g.broken)
throw new BrokenBarrierException();
if (g != generation)
return index;
if (timed && nanos <= 0L)
breakBarrier();
throw new TimeoutException();
finally
lock.unlock();
此外再看下两个小过程:
这两个小过程当然是需要锁的,但是由于这两个方法只是通过其他方法调用,所以依然是在持有锁的范围内运行的。这两个方法都是对域进行操作。
nextGeneration实际上在屏障开闸之后重置状态。以待下一次调用。
breakBarrier实际上是在屏障打破之后设定打破状态,以唤醒其他线程并通知。
private void nextGeneration()
// signal completion of last generation
trip.signalAll();
// set up next generation
count = parties;
generation = new Generation();
private void breakBarrier()
generation.broken = true;
count = parties;
trip.signalAll();
reset
reset方法比较简单。但是这里还是要注意一下要先打破当前屏蔽,然后再重建一个新的屏蔽。否则的话可能会导致信号丢失。
public void reset()
final ReentrantLock lock = this.lock;
lock.lock();
try
breakBarrier(); // break the current generation
nextGeneration(); // start a new generation
finally
lock.unlock();
以上是关于CountDownLatch和CyclicBarrier的主要内容,如果未能解决你的问题,请参考以下文章
CyclicBarrier和CountDownLatch的差别
基于 AQS 的并发编程: CountDownLatch 和 semaphore