JAVA并发包源码分析循环栅栏:CyclicBarrier
Posted 不清不慎的博客
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了JAVA并发包源码分析循环栅栏:CyclicBarrier相关的知识,希望对你有一定的参考价值。
一、认识CyclicBarrier
对于CyclicBarrier大多数人感到陌生,其实CyclicBarrier是一种多线程并发控制使用工具,和CountDownLatch非常类似,实现线程之间的计数等待,也就是说一个线程或者多个线程等待其他线程完成任务,不过比CountDowwnLatch复杂。
CyclicBarrier是循环栅栏的意思,所谓栅栏就是障碍物,阻止其他人进入,在多线程中,使用该工具类就是阻止线程执行,那么它是怎么阻止的呢?下面会详细介绍。前面的Cyclic意为循环,也就是说可以循环使用该计数器。举个简单例子,比如有5个线程,那么该工具类就要等待这五个线程都到达指定的障碍点,执行完相应的动作后,计数器才会清零,等待下一批线程的到达。
下面我们来看看CyclicBarrier内部的构造以及类之间的依赖关系:
上图是CyclicBarrier内部的部分代码,由上图可以画出该工具类的构造图如下:
二、使用场景
对于该工具类使用场景也很丰富,这里用一个简单的实例来说明。比如,这里有10个士兵司令下达命令,要求这10个士兵先全部集合来报道,报道完成之后再一起去执行任务,当每一个士兵的任务完成之后然后才会向司令报告任务执行完毕。
1 public CyclicBarrier(int parties, Runnable barrierAction) {
2 if (parties <= 0) throw new IllegalArgumentException();
3 this.parties = parties;
4 this.count = parties;
5 this.barrierCommand = barrierAction;
6 }
对于上述的CyclicBarrier构造方法,它接收两个参数,第一个参数就是计数器总数,参与计数的线程总数,第二个参数barrierAction是一个Runnable接口,它是当一次计数完成之后要做的动作。
对于上述案例,我们来用代码演示该场景:
1 package cn.just.thread.concurrent;
2import java.util.Random;
3import java.util.concurrent.BrokenBarrierException;
4import java.util.concurrent.CyclicBarrier;
5/**
6 * 测试循环栅栏:CycleBarrier(int parties,Runnable barrierAction);
7 * 第一个参数表示计数的总数,即参与的线程总数
8 * 第二个参数表示当一次计数完成后,系统会执行的动作
9 * @author Shinelon
10 *
11 */
12public class CycleBarrierDemo {
13 public static class Soldier implements Runnable{
14 private String soldier;
15 private final CyclicBarrier cyclic;
16 public Soldier(String soldier, CyclicBarrier cyclic) {
17 super();
18 this.soldier = soldier;
19 this.cyclic = cyclic;
20 }
21 @Override
22 public void run() {
23 try{
24 //等待所有士兵到齐
25 cyclic.await();
26 doWork();
27 //等待所有士兵去工作
28 cyclic.await();
29 }catch (InterruptedException e) {
30 e.printStackTrace();
31 }catch (BrokenBarrierException e) {
32 e.printStackTrace();
33 }
34 }
35 private void doWork() {
36 try{
37 Thread.sleep(Math.abs(new Random().nextInt()%10000));
38 }catch (InterruptedException e) {
39 e.printStackTrace();
40 }
41 System.out.println(soldier+":任务完成!");
42 }
43 }
44 public static class BarrierRun implements Runnable{
45 boolean flag;
46 int N;
47 public BarrierRun(boolean flag, int n) {
48 super();
49 this.flag = flag;
50 N = n;
51 }
52 @Override
53 public void run() {
54 if(flag){
55 System.out.println("司令:【士兵"+N+"个,任务完成】");
56 }else{
57 System.out.println("司令:【士兵"+N+"个,集合完毕】");
58 flag=true;
59 }
60 }
61 }
62 public static void main(String[] args) {
63 final int N=10;
64 Thread[] allSoldier=new Thread[N];
65 boolean flag=false;
66 CyclicBarrier cyclic=new CyclicBarrier(N, new BarrierRun(flag, N));
67 //设置障碍点,主要是为了执行这个方法
68 System.out.println("集合队伍");
69 for(int i=0;i<N;++i){
70 System.out.println("士兵"+i+"报道!");
71 allSoldier[i]=new Thread(new Soldier("士兵"+i, cyclic));
72 allSoldier[i].start();
73 }
74 }
75}
下面是运行结果:
上面的代码中,涉及到一个该工具类的内部方法:
await()等待所有的线程计数完成。该方法内部调用dowait方法,在dowait方法中用重入锁进行加锁。实现了一次计数器的等待过程。下面我们来深入源码探究。
三、深入源码
上面说道dowait方法,下面是该方法的源码:
1 private int dowait(boolean timed, long nanos)
2 throws InterruptedException, BrokenBarrierException,
3 TimeoutException {
4 final ReentrantLock lock = this.lock;
5 lock.lock();
6 try {
7 //标志着每一个线程,当一个线程到来就生成一个新生代
8 final Generation g = generation;
9 //当计数器被破坏,抛出BrokenBarrierException异常
10 if (g.broken)
11 throw new BrokenBarrierException();
12 //当线程被中断。抛出中断异常
13 if (Thread.interrupted()) {
14 breakBarrier();
15 throw new InterruptedException();
16 }
17 //当一个线程到来时count减1,直到count为0则计数完成
18 int index = --count;
19 if (index == 0) { // tripped
20 boolean ranAction = false;
21 try {
22 final Runnable command = barrierCommand;
23 if (command != null)
24 command.run();
25 ranAction = true;
26 //更新标志,唤醒所有等待线程
27 nextGeneration();
28 return 0;
29 } finally {
30 //如果计数完成,唤醒所有等待的线程,计数器重新开始工作
31 if (!ranAction)
32 breakBarrier();
33 }
34 }
35 // loop until tripped, broken, interrupted, or timed out
36 for (;;) {
37 try {
38 //如果当前线程没有超时则继续等待
39 if (!timed)
40 trip.await();
41 //如果调用超时,调用awaitNanos方法等待
42 else if (nanos > 0L)
43 nanos = trip.awaitNanos(nanos);
44 } catch (InterruptedException ie) {
45 //如果所有线程都已经到达或者被中断则计数完成,进入下一次循环
46 if (g == generation && ! g.broken) {
47 breakBarrier();
48 throw ie;
49 } else {
50 // We're about to finish waiting even if we had not
51 // been interrupted, so this interrupt is deemed to
52 // "belong" to subsequent execution.
53 Thread.currentThread().interrupt();
54 }
55 }
56 if (g.broken)
57 throw new BrokenBarrierException();
58 //如果不是同一个线程,则返回index
59 if (g != generation)
60 return index;
61 if (timed && nanos <= 0L) {
62 breakBarrier();
63 throw new TimeoutException();
64 }
65 }
66 } finally {
67 //释放锁
68 lock.unlock();
69 }
70 }
解释一下上面的源代码,对于每一个线程,它都会有一个generation进行标志用来区分不同的线程(我是这样理解的),因为generation对象中有一个属性broken标志着是否该计数器被破坏或者计数是否完成,默认是false:
1 private static class Generation {
2 boolean broken = false;
3 }
CyclicBarrier设置了两个异常,一个是BrokenBarrierException,另一个InterruptedException,InterruptedException异常相信大家都很熟悉,如果发生中断则抛出异常,BrokenBarrierException异常是当计数器被破坏的时候抛出。当一个线程来到的时候count-1,然后判断count是否为0,如果为零则计数完成,则执行下面相应的动作进入下一次的循环计数:
1 final Runnable command = barrierCommand;
2 if (command != null)
3 command.run();
4 ranAction = true;
5 //更新标志
6 nextGeneration();
1 /**
2 * Updates state on barrier trip and wakes up everyone.
3 * Called only while holding lock.
4 */
5 private void nextGeneration() {
6 // signal completion of last generation
7 trip.signalAll();
8 // set up next generation
9 count = parties;
10 generation = new Generation();
11 }
依据上面的场景我们可以理解,10个士兵执行任务,count为10,每次到来一个士兵则count-1,当10个士兵全部到来时则count为0,然后执行BarrierRun线程执行相应的动作。接着调用nextGeneration方法更新标志并且唤醒所有等待的线程继续向下执行。
在判断计数器是否完成一次计数时它调用breakBarrier()方法:
1/**
2 * Sets current barrier generation as broken and wakes up everyone.
3 * Called only while holding lock.
4 */
5 private void breakBarrier() {
6 generation.broken = true;
7 count = parties;
8 trip.signalAll();
9 }
这个方法同样会更新标志并且唤醒所有等待的线程。
在接下来的整个for循环中,判断了当前线程是否被中断,计数器是否被破坏,等待是否超时。
如果等待超时则调用awaitNanos方法继续等待,该方法时Contition接口的实现类的一个方法,让线程在合适的时间进行等待或者在特定的时间内得到通知,继续执行,该方法内部实现复杂,笔者能力有限,这里就不进行分析了。有兴趣的话可以自己查看源码。
2. 它会判断所有线程是否都已经到达,如果所有线程已经执行完毕到达则进行下一次循环:
1//如果所有线程都已经到达或者被中断则计数完成,进入下一次循环
2 if (g == generation && ! g.broken) {
3 breakBarrier();
4 throw ie;
5 }
3.同时它也会判断是否是同一个线程,并且更新标志。
1//如果不是同一个线程,则返回index
2 if (g != generation)
3 return index;
当该线程的所有任务都执行完毕后它就会释放锁。
至此,本文已经介绍完CyclicBarrier工具类的介绍,本人能力有限,如有不足之处还请指教。多谢!
原文出自【不清不慎的博客】
以上是关于JAVA并发包源码分析循环栅栏:CyclicBarrier的主要内容,如果未能解决你的问题,请参考以下文章