AQS源码探究_08 CyclicBarrier源码分析

Posted 兴趣使然の草帽路飞

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了AQS源码探究_08 CyclicBarrier源码分析相关的知识,希望对你有一定的参考价值。

1、简介

CyclicBarrier,回环栅栏,它会阻塞一组线程直到这些线程同时达到某个条件才继续执行。它与CountDownLatch很类似,但又不同,CountDownLatch需要调用countDown()方法触发事件,而CyclicBarrier不需要,它就像一个栅栏一样,当一组线程都到达了栅栏处才继续往下走。

  • 工作原理图:

请添加图片描述

请添加图片描述

  • CyclicBarrier与CountDownLatch的异同?
    • 两者都能实现阻塞一组线程等待被唤醒;
    • 前者是最后一个线程到达时自动唤醒;
    • 后者是通过显式地调用countDown()实现的;
    • 前者是通过重入锁及其条件锁实现的,后者是直接基于AQS实现的;
    • 前者具有“代”的概念,可以重复使用,后者只能使用一次;
    • 前者只能实现多个线程到达栅栏处一起运行;
    • 后者不仅可以实现多个线程等待一个线程条件成立,还能实现一个线程等待多个线程条件成立(详见CountDownLatch那章使用案例);

2、入门案例

在分析源码之前,先看一个入门案例:

  • 使用一个CyclicBarrier使得5个玩家线程保持同步,当5个线程同时到达 cyclicBarrier.await();处,大家再一起往下运行。
/**
 * date: 2021/5/10
 *
 * @author csp
 */
public class CyclicBarrierTest01 {
    /**
     * 案例:
     * 模拟过 “王者荣耀” 游戏开始逻辑
     */
    public static void main(String[] args) {
        // 第一步:定义玩家,定义5个
        String[] heros = {"安琪拉", "亚瑟", "马超", "张飞", "刘备"};

        // 第二步:创建固定线程数量的线程池,线程数量为5
        ExecutorService service = Executors.newFixedThreadPool(5);

        // 第三步:创建barrier,parties 设置为5
        CyclicBarrier barrier = new CyclicBarrier(5);

        // 第四步:通过for循环开启5任务,模拟开始游戏,传递给每个任务(英雄名称和barrier)
        for (int i = 0; i < 5; i++) {
            service.execute(new Player(heros[i], barrier));
        }

        // 所有线程执行完毕,关闭线程池释放资源
        service.shutdown();
    }

    /**
     * 玩家线程:
     */
    static class Player implements Runnable {
        // 英雄名称
        private String hero;
        // barrier
        private CyclicBarrier barrier;

        public Player(String hero, CyclicBarrier barrier) {
            this.hero = hero;
            this.barrier = barrier;
        }

        @Override
        public void run() {
            try {
                // 每个玩家加载进度不一样,这里使用随机数来模拟!
                TimeUnit.SECONDS.sleep(new Random().nextInt(10));
                System.out.println(hero + ":加载进度100%,等待其他玩家加载完成中...");
                // 只有当5个玩家线程都加载完毕后,栅栏才放行!
                barrier.await();
                System.out.println(hero + ":发现所有英雄加载完成,开始战斗吧!");
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (BrokenBarrierException e) {
                e.printStackTrace();
            }
        }
    }
}

运行结果如下:

张飞:加载进度100%,等待其他玩家加载完成中...
亚瑟:加载进度100%,等待其他玩家加载完成中...
马超:加载进度100%,等待其他玩家加载完成中...
安琪拉:加载进度100%,等待其他玩家加载完成中...
刘备:加载进度100%,等待其他玩家加载完成中...
刘备:发现所有英雄加载完成,开始战斗吧!
张飞:发现所有英雄加载完成,开始战斗吧!
安琪拉:发现所有英雄加载完成,开始战斗吧!
马超:发现所有英雄加载完成,开始战斗吧!
亚瑟:发现所有英雄加载完成,开始战斗吧!

3、源码分析

成员属性

// 重入锁: 因为barrier实现是依赖于Condition条件队列的,condition条件队列必须依赖lock才能使用。
private final ReentrantLock lock = new ReentrantLock();

// 条件锁,名称为trip,绊倒的意思,可能是指线程来了先绊倒,等达到一定数量了再唤醒:
// 线程挂起实现使用的condition队列。
// 条件:当前代所有线程到位,这个条件队列内的线程才会被唤醒。
private final Condition trip = lock.newCondition();

// // 需要等待的线程数量: Barrier需要参与进来的线程数量
private final int parties;

// 当唤醒的时候执行的命令: 当前代最后一个到位的线程需要执行的事件
private final Runnable barrierCommand;

// 代: 表示barrier对象 当前 “代”
private Generation generation = new Generation();

// 当前这一代还需要等待的线程数: 
// 表示当前“代”还有多少个线程未到位,初始值为parties。
private int count;

内部类

Generation,中文翻译为"代",一代人的代,用于控制CyclicBarrier的循环使用。

比如,上面示例中的5个线程完成后进入下一代,继续等待5个线程达到栅栏处再一起执行,而CountDownLatch则做不到这一点,CountDownLatch是一次性的,无法重置其次数。

/**
 * 表示:“代”
 */
private static class Generation {
    // 表示当前“代”是否被打破,如果代被打破,那么再来到这一代的线程 就会直接抛出BrokenException异常
    // 且在这一代挂起的线程都会被唤醒,然后抛出 BrokerException异常。
    boolean broken = false;
}

构造方法

构造方法需要传入一个parties变量,也就是需要等待的线程数。

/* 
 * parties:Barrier需要参与的线程数量,每次屏障需要参与的线程数
 * barrierAction:当前“代”最后一个到位的线程,需要执行的事件(可以为null)
 */
public CyclicBarrier(int parties, Runnable barrierAction) {
    // 因为小于等于0的barrier没有任何意义..没有任何线程可以参与进来~
    if (parties <= 0) throw new IllegalArgumentException();
    // 初始化parties
    this.parties = parties;
	// 初始化count等于parties,后面当前代每到位一个线程,count--
    this.count = parties;
    // 初始化都到达栅栏处执行的命令
    this.barrierCommand = barrierAction;
}

public CyclicBarrier(int parties) {
    this(parties, null);
}

成员方法

1. nextGeneration方法

/**
 * 开启下一代的方法,当这一代所有线程到位后(假设barrierCommand不为空,还需要最后一个线程执行完事件),
 * 会调用nextGeneration()开启新的一代。
 */
private void nextGeneration() {
    // 将在trip条件队列内挂起的线程全部唤醒
    trip.signalAll();

    // 重置count为parties
    count = parties;

    // 开启新的一代..使用一个新的generation对象,表示新的一代,新的一代和上一代没有任何关系。
    generation = new Generation();
}

2. breakBarrier方法

/**
 * 打破barrier屏障,在屏障内的线程都会抛出异常..
 */
private void breakBarrier() {
    // 将代中的broken设置为true,表示这一代是被打破了的,再来到这一代的线程,直接抛出异常.
    generation.broken = true;
    // 重置count为parties
    count = parties;
    // 将在trip条件队列内挂起的线程全部唤醒,唤醒后的线程会检查当前代是否是打破的,
    // 如果是打破的话,接下来的逻辑和开启下一代唤醒的逻辑不一样.
    trip.signalAll();
}

3. await()方法

  • 每个需要在栅栏处等待的线程都需要显式地调用await()方法等待其它线程的到来。
public int await(long timeout, TimeUnit unit)
    throws InterruptedException,
           BrokenBarrierException,
           TimeoutException {
     
	// 调用dowait方法,不需要超时           
    return dowait(true, unit.toNanos(timeout));
}

4. dowait方法(重点)

  • dowait()方法里的整个逻辑分成两部分:

(1)最后一个线程走上面的逻辑,当count减为0的时候,打破栅栏,它调用nextGeneration()方法通知条件队列中的等待线程转移到AQS的队列中等待被唤醒,并进入下一代。

(2)非最后一个线程走下面的for循环逻辑,这些线程会阻塞在condition的await()方法处,它们会加入到条件队列中,等待被通知,当它们唤醒的时候已经更新换“代”了,这时候返回。

/**
 * timed:表示当前调用await方法的线程是否指定了超时时长,如果true 表示 线程是响应超时的
 * nanos:线程等待超时时长纳秒,如果 timed == false ===> nanos == 0
 */
private int dowait(boolean timed, long nanos)
        throws InterruptedException, BrokenBarrierException,
        TimeoutException {
    // 获取barrier全局锁对象
    final ReentrantLock lock = this.lock;
    // 加锁
    // 为什么要加锁呢?
    // 因为 barrier的挂起和唤醒依赖的组件都是condition。
    lock.lock();
    try {
        // 当前代: 获取barrier当前的 “代”
        final Generation g = generation;

        // 检查: 如果当前代是已经被打破状态,则当前调用await方法的线程,直接抛出Broken异常
        if (g.broken)
            throw new BrokenBarrierException();

        // 中断检查: 如果当前线程的中断标记位为true,则打破当前代,然后当前线程抛出中断异常
        if (Thread.interrupted()) {
            // 1.设置当前代的状态为broken状态  
            // 2.唤醒在trip条件队列内的线程
            breakBarrier();
            throw new InterruptedException();
        }

        // 执行到这里,说明 当前线程中断状态是正常的 false, 当前代的broken为 false(未打破状态)
        // 正常逻辑...
		
        // count的值减1
        // 假设 parties 给的是 5,那么index对应的值为 4,3,2,1,0
        int index = --count;
        // 如果数量减到0了,走这段逻辑(最后一个线程走这里):
        // 条件成立:说明当前线程是最后一个到达barrier的线程,此时需要做什么呢?
        if (index == 0) {  // tripped
            // 标记:true表示 最后一个线程 执行cmd时未抛异常。  false,表示最后一个线程执行cmd时抛出异常了.
            // cmd就是创建 barrier对象时 指定的第二个 Runnable接口实现,这个可以为null
            boolean ranAction = false;
            try {
				// 如果初始化的时候传了命令,这里执行
                final Runnable command = barrierCommand;
                // 条件成立:说明创建barrier对象时 指定 Runnable接口了,这个时候最后一个到达的线程 就需要执行这个接口
                if (command != null)
                    command.run();

                // command.run()未抛出异常的话,那么线程会执行到这里。
                ranAction = true;

                // 调用下一代方法: 开启新的一代
                // 1.唤醒trip条件队列内挂起的线程,被唤醒的线程 会依次 获取到lock,然后依次退出await方法。
                // 2.重置count 为 parties
                // 3.创建一个新的generation对象,表示新的一代
                nextGeneration();
                // 返回0,因为当前线程是此 代 最后一个到达的线程,所以Index == 0
                return 0;
            } finally {
                if (!ranAction)
                    // 如果command.run()执行抛出异常的话,会进入到这里。
                    breakBarrier();
            }
        }

        // 执行到这里,说明当前线程 并不是最后一个到达Barrier的线程..此时需要进入一个自旋中.

		// 这个循环只有非最后一个线程可以走到
        // 自旋,一直到条件满足、当前代被打破、线程被中断,等待超时
        for (;;) {
            try {
                // 条件成立:说明当前线程是不指定超时时间的
                if (!timed)
                    // 调用condition的await()方法:
                    // 当前线程 会 释放掉lock,然后进入到trip条件队列的尾部,然后挂起自己,等待被唤醒。
                    trip.await();
                else if (nanos > 0L)
					// 超时等待方法:
                    // 说明当前线程调用await方法时 是指定了超时时间的!
                    nanos = trip.awaitNanos(nanos);
            } catch (InterruptedException ie) {
                // 抛出中断异常,会进来这里。
                // 什么时候会抛出InterruptedException异常呢?
                // Node节点在 条件队列内 时 收到中断信号时 会抛出中断异常!


                // 条件一:g == generation 成立,说明当前代并没有变化。
                // 条件二:! g.broken 当前代如果没有被打破,那么当前线程就去打破,并且抛出异常..
                if (g == generation && ! g.broken) {
                    breakBarrier();
                    throw ie;
                } else {
                    // 执行到else有几种情况?
                    // 1.代发生了变化,这个时候就不需要抛出中断异常了,因为 代已经更新了,这里唤醒后就走正常逻辑了..只不过设置下 中断标记。
                    // 2.代没有发生变化,但是代被打破了,此时也不用返回中断异常,执行到下面的时候会抛出  brokenBarrier异常。也记录下中断标记位。
                    Thread.currentThread().interrupt();
                }
            }

            // 唤醒后,执行到这里,有几种情况?
            // 1.正常情况,当前barrier开启了新的一代(trip.signalAll())
            // 2.当前Generation被打破,此时也会唤醒所有在trip上挂起的线程
            // 3.当前线程trip中等待超时,然后主动转移到 阻塞队列 然后获取到锁 唤醒。

            // 检查: 
            // 条件成立:当前代已经被打破
            if (g.broken)
                // 线程唤醒后依次抛出BrokenBarrier异常。
                throw new BrokenBarrierException();

            // 唤醒后,执行到这里,有几种情况?
            // 1.正常情况,当前barrier开启了新的一代(trip.signalAll()),
            // 3.当前线程trip中等待超时,然后主动转移到 阻塞队列 然后获取到锁 唤醒。
			// 正常来说这里肯定不相等
			// 因为上面打破栅栏的时候调用nextGeneration()方法时generation的引用已经变化了
           
            // 条件成立:说明当前线程挂起期间,最后一个线程到位了,然后触发了开启新的一代的逻辑,此时唤醒trip条件队列内的线程。
            if (g != generation)
                // 返回当前线程的index。
                return index;

            // 唤醒后,执行到这里,有几种情况?
            // 3.当前线程trip中等待超时,然后主动转移到阻塞队列然后获取到锁 唤醒。
            // 超时检查
            if (timed && nanos <= 0L) {
                // 打破barrier
                breakBarrier();
                // 抛出超时异常.
                throw new TimeoutException();
            }
        }
    } finally {
        lock.unlock();
    }
}

4、总结

  • CyclicBarrier会使一组线程阻塞在await()处,当最后一个线程到达时唤醒(只是从条件队列转移到AQS队列中)前面的线程大家再继续往下走;
  • CyclicBarrier不是直接使用AQS实现的一个同步器;
  • CyclicBarrier基于ReentrantLock及其Condition实现整个同步逻辑;

以上是关于AQS源码探究_08 CyclicBarrier源码分析的主要内容,如果未能解决你的问题,请参考以下文章

AQS源码探究_09 Semaphore源码分析

AQS源码探究_09 Semaphore源码分析

AQS源码探究_02 AQS简介及属性分析

AQS源码探究_02 AQS简介及属性分析

AQS源码探究_07 CountDownLatch源码分析

AQS源码探究_07 CountDownLatch源码分析