JAVA并发包源码分析循环栅栏:CyclicBarrier

Posted 不清不慎的博客

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了JAVA并发包源码分析循环栅栏:CyclicBarrier相关的知识,希望对你有一定的参考价值。

一、认识CyclicBarrier

对于CyclicBarrier大多数人感到陌生,其实CyclicBarrier是一种多线程并发控制使用工具,和CountDownLatch非常类似,实现线程之间的计数等待,也就是说一个线程或者多个线程等待其他线程完成任务,不过比CountDowwnLatch复杂。

CyclicBarrier是循环栅栏的意思,所谓栅栏就是障碍物,阻止其他人进入,在多线程中,使用该工具类就是阻止线程执行,那么它是怎么阻止的呢?下面会详细介绍。前面的Cyclic意为循环,也就是说可以循环使用该计数器。举个简单例子,比如有5个线程,那么该工具类就要等待这五个线程都到达指定的障碍点,执行完相应的动作后,计数器才会清零,等待下一批线程的到达。

下面我们来看看CyclicBarrier内部的构造以及类之间的依赖关系: 


上图是CyclicBarrier内部的部分代码,由上图可以画出该工具类的构造图如下:

【JAVA并发包源码分析】循环栅栏:CyclicBarrier


二、使用场景

对于该工具类使用场景也很丰富,这里用一个简单的实例来说明。比如,这里有10个士兵司令下达命令,要求这10个士兵先全部集合来报道,报道完成之后再一起去执行任务,当每一个士兵的任务完成之后然后才会向司令报告任务执行完毕。

1  public CyclicBarrier(int parties, Runnable barrierAction) {
2        if (parties <= 0) throw new IllegalArgumentException();
3        this.parties = parties;
4        this.count = parties;
5        this.barrierCommand = barrierAction;
6    }

对于上述的CyclicBarrier构造方法,它接收两个参数,第一个参数就是计数器总数,参与计数的线程总数,第二个参数barrierAction是一个Runnable接口,它是当一次计数完成之后要做的动作。 
对于上述案例,我们来用代码演示该场景:

 1  package cn.just.thread.concurrent;
2import java.util.Random;
3import java.util.concurrent.BrokenBarrierException;
4import java.util.concurrent.CyclicBarrier;
5/**
6 * 测试循环栅栏:CycleBarrier(int parties,Runnable barrierAction);
7 * 第一个参数表示计数的总数,即参与的线程总数
8 * 第二个参数表示当一次计数完成后,系统会执行的动作
9 * @author Shinelon
10 *
11 */

12public class CycleBarrierDemo {
13    public static class Soldier implements Runnable{
14        private String soldier;
15        private final CyclicBarrier cyclic;
16        public Soldier(String soldier, CyclicBarrier cyclic) {
17            super();
18            this.soldier = soldier;
19            this.cyclic = cyclic;
20        }
21        @Override
22        public void run() {
23            try{
24                //等待所有士兵到齐
25                cyclic.await();
26                doWork();
27                //等待所有士兵去工作
28                cyclic.await();
29            }catch (InterruptedException e) {
30                e.printStackTrace();
31            }catch (BrokenBarrierException e) {
32                e.printStackTrace();
33            }
34        }
35        private void doWork() {
36            try{
37                Thread.sleep(Math.abs(new Random().nextInt()%10000));
38            }catch (InterruptedException e) {
39                e.printStackTrace();
40            }
41            System.out.println(soldier+":任务完成!");
42        }
43    }
44    public static class BarrierRun implements Runnable{
45        boolean flag;
46        int N;
47        public BarrierRun(boolean flag, int n) {
48            super();
49            this.flag = flag;
50            N = n;
51        }
52        @Override
53        public void run() {
54            if(flag){
55                System.out.println("司令:【士兵"+N+"个,任务完成】");
56            }else{
57                System.out.println("司令:【士兵"+N+"个,集合完毕】");
58                flag=true;
59            }
60        }
61    }
62    public static void main(String[] args) {
63        final int N=10;
64        Thread[] allSoldier=new Thread[N];
65        boolean flag=false;
66        CyclicBarrier cyclic=new CyclicBarrier(N, new BarrierRun(flag, N));
67        //设置障碍点,主要是为了执行这个方法
68        System.out.println("集合队伍");
69        for(int i=0;i<N;++i){
70            System.out.println("士兵"+i+"报道!");
71            allSoldier[i]=new Thread(new Soldier("士兵"+i, cyclic));
72            allSoldier[i].start();
73        }
74    }
75}

下面是运行结果: 

上面的代码中,涉及到一个该工具类的内部方法: 
await()等待所有的线程计数完成。该方法内部调用dowait方法,在dowait方法中用重入锁进行加锁。实现了一次计数器的等待过程。下面我们来深入源码探究。

三、深入源码

上面说道dowait方法,下面是该方法的源码:

 1  private int dowait(boolean timed, long nanos)
2        throws InterruptedException, BrokenBarrierException,
3               TimeoutException
{
4        final ReentrantLock lock = this.lock;
5        lock.lock();
6        try {
7            //标志着每一个线程,当一个线程到来就生成一个新生代
8            final Generation g = generation;
9            //当计数器被破坏,抛出BrokenBarrierException异常
10            if (g.broken)
11                throw new BrokenBarrierException();
12            //当线程被中断。抛出中断异常
13            if (Thread.interrupted()) {
14                breakBarrier();
15                throw new InterruptedException();
16            }
17            //当一个线程到来时count减1,直到count为0则计数完成
18            int index = --count;
19            if (index == 0) {  // tripped
20                boolean ranAction = false;
21                try {
22                    final Runnable command = barrierCommand;
23                    if (command != null)
24                        command.run();
25                    ranAction = true;
26                    //更新标志,唤醒所有等待线程
27                    nextGeneration();
28                    return 0;
29                } finally {
30                    //如果计数完成,唤醒所有等待的线程,计数器重新开始工作
31                    if (!ranAction)
32                        breakBarrier();
33                }
34            }
35            // loop until tripped, broken, interrupted, or timed out
36            for (;;) {
37                try {
38                    //如果当前线程没有超时则继续等待
39                    if (!timed)
40                        trip.await();
41                      //如果调用超时,调用awaitNanos方法等待
42                    else if (nanos > 0L)
43                        nanos = trip.awaitNanos(nanos);
44                } catch (InterruptedException ie) {
45                    //如果所有线程都已经到达或者被中断则计数完成,进入下一次循环
46                    if (g == generation && ! g.broken) {
47                        breakBarrier();
48                        throw ie;
49                    } else {
50                        // We're about to finish waiting even if we had not
51                        // been interrupted, so this interrupt is deemed to
52                        // "belong" to subsequent execution.
53                        Thread.currentThread().interrupt();
54                    }
55                }
56                if (g.broken)
57                    throw new BrokenBarrierException();
58                //如果不是同一个线程,则返回index
59                if (g != generation)
60                    return index;
61                if (timed && nanos <= 0L) {
62                    breakBarrier();
63                    throw new TimeoutException();
64                }
65            }
66        } finally {
67            //释放锁
68            lock.unlock();
69        }
70    }

解释一下上面的源代码,对于每一个线程,它都会有一个generation进行标志用来区分不同的线程(我是这样理解的),因为generation对象中有一个属性broken标志着是否该计数器被破坏或者计数是否完成,默认是false:

1  private static class Generation {
2        boolean broken = false;
3    }

CyclicBarrier设置了两个异常,一个是BrokenBarrierException,另一个InterruptedException,InterruptedException异常相信大家都很熟悉,如果发生中断则抛出异常,BrokenBarrierException异常是当计数器被破坏的时候抛出。当一个线程来到的时候count-1,然后判断count是否为0,如果为零则计数完成,则执行下面相应的动作进入下一次的循环计数:

1  final Runnable command = barrierCommand;
2                    if (command != null)
3                        command.run();
4                    ranAction = true;
5                    //更新标志
6                    nextGeneration();


 1 /**
2     * Updates state on barrier trip and wakes up everyone.
3     * Called only while holding lock.
4     */

5 private void nextGeneration() {
6        // signal completion of last generation
7        trip.signalAll();
8        // set up next generation
9        count = parties;
10        generation = new Generation();
11    }


依据上面的场景我们可以理解,10个士兵执行任务,count为10,每次到来一个士兵则count-1,当10个士兵全部到来时则count为0,然后执行BarrierRun线程执行相应的动作。接着调用nextGeneration方法更新标志并且唤醒所有等待的线程继续向下执行。 
在判断计数器是否完成一次计数时它调用breakBarrier()方法:

1/**
2     * Sets current barrier generation as broken and wakes up everyone.
3     * Called only while holding lock.
4     */

5    private void breakBarrier() {
6        generation.broken = true;
7        count = parties;
8        trip.signalAll();
9    }


这个方法同样会更新标志并且唤醒所有等待的线程。

在接下来的整个for循环中,判断了当前线程是否被中断,计数器是否被破坏,等待是否超时。

  1. 如果等待超时则调用awaitNanos方法继续等待,该方法时Contition接口的实现类的一个方法,让线程在合适的时间进行等待或者在特定的时间内得到通知,继续执行,该方法内部实现复杂,笔者能力有限,这里就不进行分析了。有兴趣的话可以自己查看源码。 

 2. 它会判断所有线程是否都已经到达,如果所有线程已经执行完毕到达则进行下一次循环:

1//如果所有线程都已经到达或者被中断则计数完成,进入下一次循环
2                    if (g == generation && ! g.broken) {
3                        breakBarrier();
4                        throw ie;
5                    }


3.同时它也会判断是否是同一个线程,并且更新标志。

1//如果不是同一个线程,则返回index
2                if (g != generation)
3                    return index;


当该线程的所有任务都执行完毕后它就会释放锁。

至此,本文已经介绍完CyclicBarrier工具类的介绍,本人能力有限,如有不足之处还请指教。多谢!

原文出自【不清不慎的博客】

以上是关于JAVA并发包源码分析循环栅栏:CyclicBarrier的主要内容,如果未能解决你的问题,请参考以下文章

Java并发包源码分析

死磕 java并发包之AtomicInteger源码分析

死磕 java同步系列之CyclicBarrier源码解析——有图有真相

深入java并发包源码AQS的介绍与使用

Java并发包中CyclicBarrier的源码分析和使用

死磕 java并发包之LongAdder源码分析