AQS源码探究_08 CyclicBarrier源码分析
Posted 兴趣使然の草帽路飞
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了AQS源码探究_08 CyclicBarrier源码分析相关的知识,希望对你有一定的参考价值。
1、简介
CyclicBarrier,回环栅栏,它会阻塞一组线程直到这些线程同时达到某个条件才继续执行。它与CountDownLatch很类似,但又不同,CountDownLatch需要调用countDown()
方法触发事件,而CyclicBarrier不需要,它就像一个栅栏一样,当一组线程都到达了栅栏处才继续往下走。
- 工作原理图:
- CyclicBarrier与CountDownLatch的异同?
- 两者都能实现阻塞一组线程等待被唤醒;
- 前者是最后一个线程到达时自动唤醒;
- 后者是通过显式地调用countDown()实现的;
- 前者是通过重入锁及其条件锁实现的,后者是直接基于AQS实现的;
- 前者具有“代”的概念,可以重复使用,后者只能使用一次;
- 前者只能实现多个线程到达栅栏处一起运行;
- 后者不仅可以实现多个线程等待一个线程条件成立,还能实现一个线程等待多个线程条件成立(详见CountDownLatch那章使用案例);
2、入门案例
在分析源码之前,先看一个入门案例:
- 使用一个CyclicBarrier使得5个玩家线程保持同步,当5个线程同时到达
cyclicBarrier.await();
处,大家再一起往下运行。
/**
* date: 2021/5/10
*
* @author csp
*/
public class CyclicBarrierTest01 {
/**
* 案例:
* 模拟过 “王者荣耀” 游戏开始逻辑
*/
public static void main(String[] args) {
// 第一步:定义玩家,定义5个
String[] heros = {"安琪拉", "亚瑟", "马超", "张飞", "刘备"};
// 第二步:创建固定线程数量的线程池,线程数量为5
ExecutorService service = Executors.newFixedThreadPool(5);
// 第三步:创建barrier,parties 设置为5
CyclicBarrier barrier = new CyclicBarrier(5);
// 第四步:通过for循环开启5任务,模拟开始游戏,传递给每个任务(英雄名称和barrier)
for (int i = 0; i < 5; i++) {
service.execute(new Player(heros[i], barrier));
}
// 所有线程执行完毕,关闭线程池释放资源
service.shutdown();
}
/**
* 玩家线程:
*/
static class Player implements Runnable {
// 英雄名称
private String hero;
// barrier
private CyclicBarrier barrier;
public Player(String hero, CyclicBarrier barrier) {
this.hero = hero;
this.barrier = barrier;
}
@Override
public void run() {
try {
// 每个玩家加载进度不一样,这里使用随机数来模拟!
TimeUnit.SECONDS.sleep(new Random().nextInt(10));
System.out.println(hero + ":加载进度100%,等待其他玩家加载完成中...");
// 只有当5个玩家线程都加载完毕后,栅栏才放行!
barrier.await();
System.out.println(hero + ":发现所有英雄加载完成,开始战斗吧!");
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}
}
}
运行结果如下:
张飞:加载进度100%,等待其他玩家加载完成中...
亚瑟:加载进度100%,等待其他玩家加载完成中...
马超:加载进度100%,等待其他玩家加载完成中...
安琪拉:加载进度100%,等待其他玩家加载完成中...
刘备:加载进度100%,等待其他玩家加载完成中...
刘备:发现所有英雄加载完成,开始战斗吧!
张飞:发现所有英雄加载完成,开始战斗吧!
安琪拉:发现所有英雄加载完成,开始战斗吧!
马超:发现所有英雄加载完成,开始战斗吧!
亚瑟:发现所有英雄加载完成,开始战斗吧!
3、源码分析
成员属性
// 重入锁: 因为barrier实现是依赖于Condition条件队列的,condition条件队列必须依赖lock才能使用。
private final ReentrantLock lock = new ReentrantLock();
// 条件锁,名称为trip,绊倒的意思,可能是指线程来了先绊倒,等达到一定数量了再唤醒:
// 线程挂起实现使用的condition队列。
// 条件:当前代所有线程到位,这个条件队列内的线程才会被唤醒。
private final Condition trip = lock.newCondition();
// // 需要等待的线程数量: Barrier需要参与进来的线程数量
private final int parties;
// 当唤醒的时候执行的命令: 当前代最后一个到位的线程需要执行的事件
private final Runnable barrierCommand;
// 代: 表示barrier对象 当前 “代”
private Generation generation = new Generation();
// 当前这一代还需要等待的线程数:
// 表示当前“代”还有多少个线程未到位,初始值为parties。
private int count;
内部类
Generation,中文翻译为"代",一代人的代,用于控制CyclicBarrier的循环使用。
比如,上面示例中的5个线程完成后进入下一代,继续等待5个线程达到栅栏处再一起执行,而CountDownLatch则做不到这一点,CountDownLatch是一次性的,无法重置其次数。
/**
* 表示:“代”
*/
private static class Generation {
// 表示当前“代”是否被打破,如果代被打破,那么再来到这一代的线程 就会直接抛出BrokenException异常
// 且在这一代挂起的线程都会被唤醒,然后抛出 BrokerException异常。
boolean broken = false;
}
构造方法
构造方法需要传入一个parties变量,也就是需要等待的线程数。
/*
* parties:Barrier需要参与的线程数量,每次屏障需要参与的线程数
* barrierAction:当前“代”最后一个到位的线程,需要执行的事件(可以为null)
*/
public CyclicBarrier(int parties, Runnable barrierAction) {
// 因为小于等于0的barrier没有任何意义..没有任何线程可以参与进来~
if (parties <= 0) throw new IllegalArgumentException();
// 初始化parties
this.parties = parties;
// 初始化count等于parties,后面当前代每到位一个线程,count--
this.count = parties;
// 初始化都到达栅栏处执行的命令
this.barrierCommand = barrierAction;
}
public CyclicBarrier(int parties) {
this(parties, null);
}
成员方法
1. nextGeneration方法
/**
* 开启下一代的方法,当这一代所有线程到位后(假设barrierCommand不为空,还需要最后一个线程执行完事件),
* 会调用nextGeneration()开启新的一代。
*/
private void nextGeneration() {
// 将在trip条件队列内挂起的线程全部唤醒
trip.signalAll();
// 重置count为parties
count = parties;
// 开启新的一代..使用一个新的generation对象,表示新的一代,新的一代和上一代没有任何关系。
generation = new Generation();
}
2. breakBarrier方法
/**
* 打破barrier屏障,在屏障内的线程都会抛出异常..
*/
private void breakBarrier() {
// 将代中的broken设置为true,表示这一代是被打破了的,再来到这一代的线程,直接抛出异常.
generation.broken = true;
// 重置count为parties
count = parties;
// 将在trip条件队列内挂起的线程全部唤醒,唤醒后的线程会检查当前代是否是打破的,
// 如果是打破的话,接下来的逻辑和开启下一代唤醒的逻辑不一样.
trip.signalAll();
}
3. await()方法
- 每个需要在栅栏处等待的线程都需要显式地调用
await()
方法等待其它线程的到来。
public int await(long timeout, TimeUnit unit)
throws InterruptedException,
BrokenBarrierException,
TimeoutException {
// 调用dowait方法,不需要超时
return dowait(true, unit.toNanos(timeout));
}
4. dowait方法(重点)
dowait()
方法里的整个逻辑分成两部分:
(1)最后一个线程走上面的逻辑,当count减为0的时候,打破栅栏,它调用nextGeneration()
方法通知条件队列中的等待线程转移到AQS的队列中等待被唤醒,并进入下一代。
(2)非最后一个线程走下面的for循环逻辑,这些线程会阻塞在condition的await()
方法处,它们会加入到条件队列中,等待被通知,当它们唤醒的时候已经更新换“代”了,这时候返回。
/**
* timed:表示当前调用await方法的线程是否指定了超时时长,如果true 表示 线程是响应超时的
* nanos:线程等待超时时长纳秒,如果 timed == false ===> nanos == 0
*/
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
// 获取barrier全局锁对象
final ReentrantLock lock = this.lock;
// 加锁
// 为什么要加锁呢?
// 因为 barrier的挂起和唤醒依赖的组件都是condition。
lock.lock();
try {
// 当前代: 获取barrier当前的 “代”
final Generation g = generation;
// 检查: 如果当前代是已经被打破状态,则当前调用await方法的线程,直接抛出Broken异常
if (g.broken)
throw new BrokenBarrierException();
// 中断检查: 如果当前线程的中断标记位为true,则打破当前代,然后当前线程抛出中断异常
if (Thread.interrupted()) {
// 1.设置当前代的状态为broken状态
// 2.唤醒在trip条件队列内的线程
breakBarrier();
throw new InterruptedException();
}
// 执行到这里,说明 当前线程中断状态是正常的 false, 当前代的broken为 false(未打破状态)
// 正常逻辑...
// count的值减1
// 假设 parties 给的是 5,那么index对应的值为 4,3,2,1,0
int index = --count;
// 如果数量减到0了,走这段逻辑(最后一个线程走这里):
// 条件成立:说明当前线程是最后一个到达barrier的线程,此时需要做什么呢?
if (index == 0) { // tripped
// 标记:true表示 最后一个线程 执行cmd时未抛异常。 false,表示最后一个线程执行cmd时抛出异常了.
// cmd就是创建 barrier对象时 指定的第二个 Runnable接口实现,这个可以为null
boolean ranAction = false;
try {
// 如果初始化的时候传了命令,这里执行
final Runnable command = barrierCommand;
// 条件成立:说明创建barrier对象时 指定 Runnable接口了,这个时候最后一个到达的线程 就需要执行这个接口
if (command != null)
command.run();
// command.run()未抛出异常的话,那么线程会执行到这里。
ranAction = true;
// 调用下一代方法: 开启新的一代
// 1.唤醒trip条件队列内挂起的线程,被唤醒的线程 会依次 获取到lock,然后依次退出await方法。
// 2.重置count 为 parties
// 3.创建一个新的generation对象,表示新的一代
nextGeneration();
// 返回0,因为当前线程是此 代 最后一个到达的线程,所以Index == 0
return 0;
} finally {
if (!ranAction)
// 如果command.run()执行抛出异常的话,会进入到这里。
breakBarrier();
}
}
// 执行到这里,说明当前线程 并不是最后一个到达Barrier的线程..此时需要进入一个自旋中.
// 这个循环只有非最后一个线程可以走到
// 自旋,一直到条件满足、当前代被打破、线程被中断,等待超时
for (;;) {
try {
// 条件成立:说明当前线程是不指定超时时间的
if (!timed)
// 调用condition的await()方法:
// 当前线程 会 释放掉lock,然后进入到trip条件队列的尾部,然后挂起自己,等待被唤醒。
trip.await();
else if (nanos > 0L)
// 超时等待方法:
// 说明当前线程调用await方法时 是指定了超时时间的!
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
// 抛出中断异常,会进来这里。
// 什么时候会抛出InterruptedException异常呢?
// Node节点在 条件队列内 时 收到中断信号时 会抛出中断异常!
// 条件一:g == generation 成立,说明当前代并没有变化。
// 条件二:! g.broken 当前代如果没有被打破,那么当前线程就去打破,并且抛出异常..
if (g == generation && ! g.broken) {
breakBarrier();
throw ie;
} else {
// 执行到else有几种情况?
// 1.代发生了变化,这个时候就不需要抛出中断异常了,因为 代已经更新了,这里唤醒后就走正常逻辑了..只不过设置下 中断标记。
// 2.代没有发生变化,但是代被打破了,此时也不用返回中断异常,执行到下面的时候会抛出 brokenBarrier异常。也记录下中断标记位。
Thread.currentThread().interrupt();
}
}
// 唤醒后,执行到这里,有几种情况?
// 1.正常情况,当前barrier开启了新的一代(trip.signalAll())
// 2.当前Generation被打破,此时也会唤醒所有在trip上挂起的线程
// 3.当前线程trip中等待超时,然后主动转移到 阻塞队列 然后获取到锁 唤醒。
// 检查:
// 条件成立:当前代已经被打破
if (g.broken)
// 线程唤醒后依次抛出BrokenBarrier异常。
throw new BrokenBarrierException();
// 唤醒后,执行到这里,有几种情况?
// 1.正常情况,当前barrier开启了新的一代(trip.signalAll()),
// 3.当前线程trip中等待超时,然后主动转移到 阻塞队列 然后获取到锁 唤醒。
// 正常来说这里肯定不相等
// 因为上面打破栅栏的时候调用nextGeneration()方法时generation的引用已经变化了
// 条件成立:说明当前线程挂起期间,最后一个线程到位了,然后触发了开启新的一代的逻辑,此时唤醒trip条件队列内的线程。
if (g != generation)
// 返回当前线程的index。
return index;
// 唤醒后,执行到这里,有几种情况?
// 3.当前线程trip中等待超时,然后主动转移到阻塞队列然后获取到锁 唤醒。
// 超时检查
if (timed && nanos <= 0L) {
// 打破barrier
breakBarrier();
// 抛出超时异常.
throw new TimeoutException();
}
}
} finally {
lock.unlock();
}
}
4、总结
- CyclicBarrier会使一组线程阻塞在await()处,当最后一个线程到达时唤醒(只是从条件队列转移到AQS队列中)前面的线程大家再继续往下走;
- CyclicBarrier不是直接使用AQS实现的一个同步器;
- CyclicBarrier基于ReentrantLock及其Condition实现整个同步逻辑;
以上是关于AQS源码探究_08 CyclicBarrier源码分析的主要内容,如果未能解决你的问题,请参考以下文章