JUCCyclicBarrier源码分析
Posted LL.LEBRON
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了JUCCyclicBarrier源码分析相关的知识,希望对你有一定的参考价值。
CyclicBarrier
1.简介
循环栅栏,用来进行线程协作,等待线程满足某个计数。构造时设置『计数个数』,每个线程执行到某个需要“同步”的时刻调用 await()
方法进行等待,当等待的线程数满足『计数个数』时,继续执行。
@Slf4j
public class TestCyclicBarrier
public static void main(String[] args)
ExecutorService service = Executors.newFixedThreadPool(2);
CyclicBarrier cyclicBarrier = new CyclicBarrier(2, () ->
log.debug("task1,task2 finish...");
);
//当计数变为0后,在次调用await会初始化为原来的值
for (int i = 0; i < 3; i++)
service.submit(() ->
log.debug("task1 begin...");
Sleeper.sleep(1);
try
cyclicBarrier.await();
catch (InterruptedException | BrokenBarrierException e)
e.printStackTrace();
);
service.submit(() ->
log.debug("task2 begin...");
Sleeper.sleep(2);
try
cyclicBarrier.await();
catch (InterruptedException | BrokenBarrierException e)
e.printStackTrace();
);
service.shutdown();
结果:
2.内部类
Generation,中文翻译为代,一代人的代,用于控制CyclicBarrier的循环使用。而CountDownLatch则做不到这一点,CountDownLatch是一次性的,无法重置其次数。
private static class Generation
boolean broken = false;
3.主要属性
//重入锁
private final ReentrantLock lock = new ReentrantLock();
//条件锁,名称为trip,绊倒的意思,可能是指线程来了先绊倒,等达到一定数量了再唤醒
private final Condition trip = lock.newCondition();
//需要等待的线程数量
private final int parties;
//当唤醒的时候执行的命令
private final Runnable barrierCommand;
//代
private Generation generation = new Generation();
//当前这一代还需要等待的线程数
private int count;
4.构造方法
构造方法需要传入一个parties变量,也就是需要等待的线程数。
public CyclicBarrier(int parties, Runnable barrierAction)
if (parties <= 0) throw new IllegalArgumentException();
//初始化parties
this.parties = parties;
//初始化count等于parties
this.count = parties;
//初始化都到达栅栏处执行的命令
this.barrierCommand = barrierAction;
5.await()方法
每个需要在栅栏处等待的线程都需要显式地调用await()方法等待其它线程的到来。
public int await() throws InterruptedException, BrokenBarrierException
try
//调用dowait方法,不需要超时
return dowait(false, 0L);
catch (TimeoutException toe)
throw new Error(toe); // cannot happen
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException
final ReentrantLock lock = this.lock;
//加锁
lock.lock();
try
//当前代
final Generation g = generation;
//检查
if (g.broken)
throw new BrokenBarrierException();
//中断检查
if (Thread.interrupted())
breakBarrier();
throw new InterruptedException();
//count的值减1
int index = --count;
//如果数量减到0了,走这段逻辑(最后一个线程走这里)
if (index == 0) // tripped
boolean ranAction = false;
try
//如果初始化的时候传了命令,这里执行
final Runnable command = barrierCommand;
if (command != null)
command.run();
ranAction = true;
//调用下一代方法
nextGeneration();
return 0;
finally
if (!ranAction)
breakBarrier();
// 这个循环只有非最后一个线程可以走到
for (;;)
try
if (!timed)
//调用condition的await()方法
trip.await();
else if (nanos > 0L)
//超时等待方法
nanos = trip.awaitNanos(nanos);
catch (InterruptedException ie)
if (g == generation && ! g.broken)
breakBarrier();
throw ie;
else
Thread.currentThread().interrupt();
//检查
if (g.broken)
throw new BrokenBarrierException();
//正常来说这里肯定不相等
//因为上面打破栅栏的时候调用nextGeneration()方法时generation的引用已经变化了
if (g != generation)
return index;
//超时检查
if (timed && nanos <= 0L)
breakBarrier();
throw new TimeoutException();
finally
lock.unlock();
private void nextGeneration()
// 调用condition的signalAll()将其队列中的等待者全部转移到AQS的队列中
trip.signalAll();
// 重置count
count = parties;
// 进入下一代
generation = new Generation();
dowait()
方法里的整个逻辑分成两部分:
- 最后一个线程走上面的逻辑,当count减为0的时候,打破栅栏,它调用
nextGeneration()
方法通知条件队列中的等待线程转移到AQS
的队列中等待被唤醒,并进入下一代。 - 非最后一个线程走下面的for循环逻辑,这些线程会阻塞在
condition
的await()
方法处,它们会加入到条件队列中,等待被通知,当它们唤醒的时候已经更新换“代”了,这时候返回。
6.总结
- CyclicBarrier会使一组线程阻塞在await()处,当最后一个线程到达时唤醒(只是从条件队列转移到AQS队列中)前面的线程大家再继续往下走
- CyclicBarrier不是直接使用AQS实现的一个同步器
- CyclicBarrier基于ReentrantLock及其Condition实现整个同步逻辑
CyclicBarrier与CountDownLatch的异同?
- 两者都能实现阻塞一组线程等待被唤醒
- 前者是最后一个线程到达时自动唤醒
- 后者是通过显式地调用countDown()实现的
- 前者是通过重入锁及其条件锁实现的,后者是直接基于AQS实现的
- 前者具有“代”的概念,可以重复使用,后者只能使用一次
- 前者只能实现多个线程到达栅栏处一起运行
- 后者不仅可以实现多个线程等待一个线程条件成立,还能实现一个线程等待多个线程条件成立
以上是关于JUCCyclicBarrier源码分析的主要内容,如果未能解决你的问题,请参考以下文章