CyclicBarrier源码剖析

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了CyclicBarrier源码剖析相关的知识,希望对你有一定的参考价值。

CyclicBarrier是java.util.concurrent包中提供的同步工具。通过这个工具我们可以实现n个线程相互等待。我们可以通过参数指定达到公共屏障点之后的行为。

先上源码:

 

技术分享
  1 package java.util.concurrent;
  2 import java.util.concurrent.locks.*;
  3 
  4 public class CyclicBarrier {
  5 
  6     private static class Generation {
  7         boolean broken = false;
  8     }
  9 
 10     private final ReentrantLock lock = new ReentrantLock();
 11     private final Condition trip = lock.newCondition();
 12     private final int parties;
 13     private final Runnable barrierCommand;
 14     private Generation generation = new Generation();
 15     private int count;
 16 
 17     private void nextGeneration() {
 18         // signal completion of last generation
 19         trip.signalAll();
 20         // set up next generation
 21         count = parties;
 22         generation = new Generation();
 23     }
 24 
 25 
 26     private void breakBarrier() {
 27         generation.broken = true;
 28         count = parties;
 29         trip.signalAll();
 30     }
 31 
 32     private int dowait(boolean timed, long nanos) 
 33             throws InterruptedException, BrokenBarrierException, TimeoutException {
 34         final ReentrantLock lock = this.lock;
 35         lock.lock();
 36         try {
 37             final Generation g = generation;
 38 
 39             //小概率事件:该线程在等待锁的过程中,barrier被破坏
 40             if (g.broken)
 41                 throw new BrokenBarrierException();
 42 
 43             //小概率事件:该线程在等待锁的过程中被中断
 44             if (Thread.interrupted()) {
 45                 breakBarrier();
 46                 throw new InterruptedException();
 47             }
 48 
 49            int index = --count;
 50            //当有parties个线程到达barrier
 51            if (index == 0) {  // tripped
 52                 boolean ranAction = false;
 53                 try {
 54                    final Runnable command = barrierCommand;
 55                    //如果设置了barrierCommand,令最后到达的barrier的线程执行它
 56                    if (command != null)
 57                         command.run();
 58                     ranAction = true;
 59                     nextGeneration();
 60                     return 0;
 61                } finally {
 62                     //注意:当执行barrierCommand出现异常时,ranAction派上用场
 63                     if (!ranAction)
 64                         breakBarrier();
 65                }
 66            }
 67 
 68             // loop until tripped, broken, interrupted, or timed out
 69             for (;;) {
 70                 try {
 71                     if (!timed)
 72                         trip.await();
 73                     else if (nanos > 0L)
 74                         //注意:nanos值标识了是否超时,后续用这个nanos值判断是否breakBarrier
 75                         nanos = trip.awaitNanos(nanos);
 76                 } catch (InterruptedException ie) {
 77                     if (g == generation && ! g.broken) {
 78                         breakBarrier();
 79                         throw ie;
 80                     } else {
 81                         //小概率事件:该线程被中断,进入锁等待队列
 82                         //在等待过程中,另一个线程更新或破坏了generation
 83                         //当该线程获取锁之后,应重置interrupt标志而不是抛出异常
 84                         //原因在于:它中断的太晚了,generation已更新或破坏,它抛出InterruptedException的时机已经过去,
 85                         //两种情况:
 86                         //①g被破坏。已经有一个线程抛出了InterruptedException(也只能由第一个抛),与它同时等待的都抛BrokenBarrierException(后续检查broken标志会抛)。
 87                         //②g被更新:此时抛异常没意义(后续检查g更新后会return index),这里重置interrupt标志,让线程继续执行,让这个标志由上层处理
 88                         Thread.currentThread().interrupt();
 89                     }
 90                 }
 91 
 92                 //barrier被破坏,抛出异常
 93                 if (g.broken)
 94                     throw new BrokenBarrierException();
 95                 
 96                 //barrier正常进入下一循环,上一代await的线程继续执行
 97                 if (g != generation)
 98                     return index;
 99                 
100                 //只要有一个超时,就breakBarrier,后续线程抛的就是barrier损坏异常
101                 if (timed && nanos <= 0L) {
102                     breakBarrier();
103                     throw new TimeoutException();
104                 }
105             }
106         } finally {
107             lock.unlock();
108         }
109     }
110 
111 
112     public CyclicBarrier(int parties, Runnable barrierAction) {
113         if (parties <= 0) throw new IllegalArgumentException();
114         this.parties = parties;
115         this.count = parties;
116         this.barrierCommand = barrierAction;
117     }
118 
119     public CyclicBarrier(int parties) {
120         this(parties, null);
121     }
122 
123 
124     public int getParties() {
125         return parties;
126     }
127 
128 
129     public int await() throws InterruptedException, BrokenBarrierException {
130         try {
131             return dowait(false, 0L);
132         } catch (TimeoutException toe) {
133             throw new Error(toe); // cannot happen;
134         }
135     }
136 
137 
138     public int await(long timeout, TimeUnit unit)
139         throws InterruptedException,
140                BrokenBarrierException,
141                TimeoutException {
142         return dowait(true, unit.toNanos(timeout));
143     }
144 
145     
146     public boolean isBroken() {
147         final ReentrantLock lock = this.lock;
148         lock.lock();
149         try {
150             return generation.broken;
151         } finally {
152             lock.unlock();
153         }
154     }
155 
156     public void reset() {
157         final ReentrantLock lock = this.lock;
158         lock.lock();
159         try {
160             breakBarrier();   // break the current generation
161             nextGeneration(); // start a new generation
162         } finally {
163             lock.unlock();
164         }
165     }
166 
167     public int getNumberWaiting() {
168         final ReentrantLock lock = this.lock;
169         lock.lock();
170         try {
171             return parties - count;
172         } finally {
173             lock.unlock();
174         }
175     }
176 }
View Code

 

 

我们先来看一下CyclicBarrier的成员变量:

1 private final ReentrantLock lock = new ReentrantLock();
2 private final Condition trip = lock.newCondition();
3 private final int parties;
4 private final Runnable barrierCommand;
5 private Generation generation = new Generation();
6 private int count;

CyclicBarrier是通过独占锁lock和Condition对象trip来实现的,成员parties表示必须有parties个线程到达barrier,成员barrierCommand表示当parties个线程到达之后要执行的代码,成员count表示离触发barrierCommand还差count个线程(还有count个线程未到达barrier),成员generation表示当前的“代数”,“cyclic”表示可循环使用,generation是对一次循环的标识。注意:Generation是CyclicBarrier的一个私有内部类,他只有一个成员变量来标识当前的barrier是否已“损坏”:

1 private static class Generation {
2     boolean broken = false;
3 }

 

构造函数

 1 public CyclicBarrier(int parties, Runnable barrierAction) {
 2     if (parties <= 0) throw new IllegalArgumentException();
 3     this.parties = parties;
 4     this.count = parties;
 5     this.barrierCommand = barrierAction;
 6 }
 7 
 8 public CyclicBarrier(int parties) {
 9     this(parties, null);
10 }

CyclicBarrier提供了两种构造函数,没有指定barrierCommand的构造函数是调用第二个构造函数实现的。第二个构造函数有两个参数:parties和barrierAction,分别用来初始化成员parties和barrierCommand。注意,parties必须大于0,否则会抛出IllegalArgumentException。

 

await()方法

1 public int await() throws InterruptedException, BrokenBarrierException {
2     try {
3         return dowait(false, 0L);
4     } catch (TimeoutException toe) {
5      throw new Error(toe); // cannot happen;
6     }
7 }

await方法是由调用dowait方法实现的,两个参数分别代表是否定时等待和等待的时长。

 

doawait()方法

 1     private int dowait(boolean timed, long nanos) 
 2             throws InterruptedException, BrokenBarrierException, TimeoutException {
 3         final ReentrantLock lock = this.lock;
 4         lock.lock();
 5         try {
 6             final Generation g = generation;
 7 
 8             //小概率事件:该线程在等待锁的过程中,barrier被破坏
 9             if (g.broken)
10                 throw new BrokenBarrierException();
11 
12             //小概率事件:该线程在等待锁的过程中被中断
13             if (Thread.interrupted()) {
14                 breakBarrier();
15                 throw new InterruptedException();
16             }
17 
18            int index = --count;
19            //当有parties个线程到达barrier
20            if (index == 0) {  // tripped
21                 boolean ranAction = false;
22                 try {
23                    final Runnable command = barrierCommand;
24                    //如果设置了barrierCommand,令最后到达的barrier的线程执行它
25                    if (command != null)
26                         command.run();
27                     ranAction = true;
28                     nextGeneration();
29                     return 0;
30                } finally {
31                     //注意:当执行barrierCommand出现异常时,ranAction派上用场
32                     if (!ranAction)
33                         breakBarrier();
34                }
35            }
36 
37             // loop until tripped, broken, interrupted, or timed out
38             for (;;) {
39                 try {
40                     if (!timed)
41                         trip.await();
42                     else if (nanos > 0L)
43                         //注意:nanos值标识了是否超时,后续用这个nanos值判断是否breakBarrier
44                         nanos = trip.awaitNanos(nanos);
45                 } catch (InterruptedException ie) {
46                     if (g == generation && ! g.broken) {
47                         breakBarrier();
48                         throw ie;
49                     } else {
50                         //小概率事件:该线程被中断,进入锁等待队列
51                         //在等待过程中,另一个线程更新或破坏了generation
52                         //当该线程获取锁之后,应重置interrupt标志而不是抛出异常
53                         //原因在于:它中断的太晚了,generation已更新或破坏,它抛出InterruptedException的时机已经过去,
54                         //两种情况:
55                         //①g被破坏:已有一个线程抛出InterruptedException(只能由第一个抛),与它同时等待的都抛BrokenBarrierException(后续检查broken标志会抛)。
56                         //②g被更新:此时抛异常没意义(后续检查g更新后会return index),这里重置interrupt标志,让线程继续执行,让这个标志由上层处理
57                         Thread.currentThread().interrupt();
58                     }
59                 }
60 
61                 //barrier被破坏,抛出异常
62                 if (g.broken)
63                     throw new BrokenBarrierException();
64                 
65                 //barrier正常进入下一循环,上一代await的线程继续执行
66                 if (g != generation)
67                     return index;
68                 
69                 //只要有一个超时,就breakBarrier,后续线程抛的就是barrier损坏异常
70                 if (timed && nanos <= 0L) {
71                     breakBarrier();
72                     throw new TimeoutException();
73                 }
74             }
75         } finally {
76             lock.unlock();
77         }
78     }

dowait方法是CyclicBarrier的精华。应该重点来理解。

方法开头首先申请锁,然后做了两个判断:g.broken和Thread.interrupted(),这两个判断是分别处理两种小概率的事件:①该线程在等待锁的过程中,barrier被破坏②该线程在等待锁的过程中被中断。这两个事件应抛出相应的异常。接下来dowait方法修改了令count减1,如果此时count减为0,说明已经有parties个线程到达barrier,这时由最后到达barrier的线程去执行barrierCommand。注意,这里设置了一个布尔值ranAction,作用是来标识barrierCommand是否被正确执行完毕,如果执行失败,finally中会执行breakBarrier操作。如果count尚未减为0,则在Condition对象trip上执行await操作,注意:这里有一个InterruptedException的catch子句。当前线程在await中被中断时,会抛出InterruptedException,这时候如果g==generation&&!g.broken的话,我们执行breakBarrier操作,同时抛出这个异常;如果g!=generation或者g.broken==true的话,我们的操作是重置interrupt标志而不是抛出这个异常。这么做的原因我们分两种情况讨论:

①g被破坏,这也是一个小概率事件,当前线程被中断后进入锁等待队列,此时另一个线程由于某种原因(超时或者被中断)在他之前获取了锁并执行了breakBarrier方法,那么当前线程持有锁之后就不应再抛InterruptedException,逻辑上应该处理barrier被破坏事件,事实上在后续g.broken的检查中,他会抛出一个BrokenBarrierException。而当前的InterruptedException被我们捕获却没有做出处理,所以执行interrupt方法重置中断标志,交由上层程序处理。

②g被更新:说明当前线程在即将完成等待之际被中断,此时抛异常没意义(后续检查g更新后会return index),这里重置interrupt标志,让线程继续执行,让这个标志由上层处理。

后续对g.broken和g!=generation的判断,分表代表了被唤醒线程(非最后一个到达barrier的线程,也不是被中断或第一个超时的线程)的两种退出方法的方式:第一种是以barrier被破坏告终(然后抛异常),第二个是barrier等到parties个线程,寿终正寝(返回该线程的到达次序index)。

最后一个if是第一个超时线程执行breakBarrier操作并跑出异常。最后finally子句要释放锁。

至此,整个doawait方法流程就分析完毕了,我们可以发现,在barrier上等待的线程,如果以抛异常结束的话,只有第一个线程会抛InterruptedException或TimeoutException并执行breakBarrier操作,其他等待线程只能抛BrokenBarrierException,逻辑上这也是合理的:一个barrier只能因超时或中断被破坏一次。

 1 private void nextGeneration() {
 2     trip.signalAll();
 3     count = parties;
 4     generation = new Generation();
 5 }
 6 
 7 private void breakBarrier() {
 8     generation.broken = true;
 9     count = parties;
10     trip.signalAll();
11 }

doawait方法中用到的nextGeneration方法将所有等待线程唤醒,更新generation对象,复位count,进入下一轮任务。breakBarrier方法将generation状态值为broken,复位count(这个复位看上去没有用,但实际上,在broken之后reset之前,如果调用getNumberWaiting方法查看等待线程数的话,复位count是合理的),并唤醒所有等待线程。在调用reset更新generation之前,barrier将处于不可用状态。

 

reset()方法

 1 public void reset() {
 2     final ReentrantLock lock = this.lock;
 3     lock.lock();
 4     try {
 5         breakBarrier();   // break the current generation
 6         nextGeneration(); // start a new generation
 7     } finally {
 8         lock.unlock();
 9     }
10 }

reset方法先break当执行breakBarrier操作(如果有线程在barrier上等待,调用reset会导致BrokenBarrierException),再更新generation对象。

 

以上是关于CyclicBarrier源码剖析的主要内容,如果未能解决你的问题,请参考以下文章

Java Review - 并发编程_ 回环屏障CyclicBarrier原理&源码剖析

Java Review - 并发编程_ 回环屏障CyclicBarrier原理&源码剖析

AQS源码剖析第三篇--共享模式

JDKJDK源码分析-CyclicBarrier

源码分析:CyclicBarrier 之循环栅栏

JDK 源码解析 —— CyclicBarrier