jdk1.8 J.U.C并发源码阅读------CyclicBarrier源码解析
Posted Itzel_yuki
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了jdk1.8 J.U.C并发源码阅读------CyclicBarrier源码解析相关的知识,希望对你有一定的参考价值。
一、继承关系
public class CyclicBarrier
功能:
实现n个线程(线程之间是独占锁的关系)相互等待,最后一个执行wait操作的线程需要执行指定的命令barrierCommand,并且还需要唤醒其他所有等待的线程,还要初始化相关参数,为下一轮相互等待做好准备。n个线程中任何一个线程发生中断,超时,则设置代标志generation.broken为true,其他线程检测到该标识说明此轮等待失败。
实现方式:
使用的是AQS中独占锁CLH队列+CONDITION队列的实现方式。当调用barrier.await方法时,该线程要获得锁,因此进入CLH队列队尾,当获取到锁后,执行condition.wait操作时,将该线程节点从CLH队列队首删除,添加到CONDITION队列中。将count-1个线程以上述方式逐个添加到CONDITIO队列中,当最后一个线程获取到锁后,执行barrierCommand任务,然后执行condition.signalAll方法,唤醒CONDITION队列中所有节点,CONDITION队列从队首开始逐个将node添加到CLH队列中,开始获取锁,执行完后继操作后从barrier.await中返回。
说明:
调用时不要求和lock结合使用,barrier内部的await方法中使用了lock和condition.await和signal操作。
二、成员变量
/** The lock for guarding barrier entry */
private final ReentrantLock lock = new ReentrantLock();//独占锁
/** Condition to wait on until tripped */
private final Condition trip = lock.newCondition();//条件,lock中的一个CONDITION队列
/** The number of parties */
private final int parties;//参与相互等待的线程数量
/* The command to run when tripped */
private final Runnable barrierCommand;//最后一个执行dowait的线程需要执行的代码
/** The current generation */
private Generation generation = new Generation();//下一批循环等待的代标志
private int count;//参与等待的线程数量
三、构造方法
//指定参与相互等待的线程的数量parties,
public CyclicBarrier(int parties, Runnable barrierAction)
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;
this.barrierCommand = barrierAction;
public CyclicBarrier(int parties)
this(parties, null);
四、内部类
(1)Generation
private static class Generation
boolean broken = false;
五、方法说明
(1)await:
public int await() throws InterruptedException, BrokenBarrierException
try
return dowait(false, 0L);
catch (TimeoutException toe)
throw new Error(toe); // cannot happen
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException
final ReentrantLock lock = this.lock;
//加锁
lock.lock();
try
final Generation g = generation;
//g.broken==true表示这一轮相互等待的线程中,有线程抛出了异常,则该线程在此处也要抛出异常
if (g.broken)
throw new BrokenBarrierException();
//当前线程是否发生了中断,是则调用breakBarrier设置generation的broken值,以便通知其他相互等待的线程有线程发生异常。
if (Thread.interrupted())
breakBarrier();
throw new InterruptedException();
int index = --count;
if (index == 0) // 最后一个执行dowait的线程执行此处的代码
boolean ranAction = false;
try
final Runnable command = barrierCommand;
//最后一个到达的线程执行barrierCommand
if (command != null)
command.run();
ranAction = true;
//该轮等待结束,可以开启下一轮等待,唤醒所有该轮中的阻塞线程
nextGeneration();
return 0;
finally
//如果ranAction为false,则该轮相互等待失败,调用breakBarrier通知
if (!ranAction)
breakBarrier();
// 循环,直到该线程被唤醒,或者超时,或者发生中断
for (;;)
try
//该线程执行await操作,释放该线程占有的锁,唤醒CLH队列中的后继节点
if (!timed)
trip.await();
//awaitNanos:加入时间等待
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos);
catch (InterruptedException ie)
//发生异常,若当前线程是该轮线程中第一个发生异常的线程,则执行breakBarrier通知其他线程
if (g == generation && ! g.broken)
breakBarrier();
throw ie;
else
//否则,该线程之前已经有线程发生异常,此处只需要中断当前线程即可
Thread.currentThread().interrupt();
//如果该轮相互等待失败,则抛出异常
if (g.broken)
throw new BrokenBarrierException();
//说明此轮等待已经结束
if (g != generation)
return index;
//超时,调用breakBarrier通知其他线程本轮相互等待失败
if (timed && nanos <= 0L)
breakBarrier();
throw new TimeoutException();
finally
//释放锁
lock.unlock();
//这一轮相互等待过程中如果某个线程发生异常,则该轮等待的标志generation.broken设置为true,通知其他相互等待的线程有线程抛出异常
private void breakBarrier()
generation.broken = true;
count = parties;
//唤醒CONDITION队列中所有线程
trip.signalAll();
//开启下一轮等待
private void nextGeneration()
//唤醒上一轮等待中的所有线程
trip.signalAll();
// 重新初始化count和generation
count = parties;
generation = new Generation();
(2)await
public int await(long timeout, TimeUnit unit)
throws InterruptedException,
BrokenBarrierException,
TimeoutException
return dowait(true, unit.toNanos(timeout));
(3)reset:重新初始化参数,开启新的一轮等待。
public void reset()
final ReentrantLock lock = this.lock;
lock.lock();
try
breakBarrier(); // break the current generation
nextGeneration(); // start a new generation
finally
lock.unlock();
六、使用示例
class Persion extends Thread
String name;
CyclicBarrier barrier;
int time;
public Persion(String name,CyclicBarrier barrier,int time)
super();
this.name=name;
this.barrier=barrier;
this.time=time;
@Override
public void run()
LockSupport.parkNanos(time*10000);
System.out.println(name+"来了,开始等待其他人。");
try
barrier.await();
catch (InterruptedException | BrokenBarrierException e)
// TODO Auto-generated catch block
e.printStackTrace();
System.out.println(name+"发言...");
public static void testCyclicBarrier()
CyclicBarrier barrier=new CyclicBarrier(5, new Runnable()
@Override
public void run()
// TODO Auto-generated method stub
System.out.println("所有人都到齐了,开始开会!");
);
Persion persion1=new Persion("张三", barrier, 1);
Persion persion2=new Persion("李四", barrier, 2);
Persion persion3=new Persion("王五", barrier, 3);
Persion persion4=new Persion("赵六", barrier, 1);
Persion persion5=new Persion("钱七", barrier, 2);
persion1.start();
persion2.start();
persion3.start();
persion4.start();
persion5.start();
输出结果:
张三来了,开始等待其他人。
钱七来了,开始等待其他人。
赵六来了,开始等待其他人。
王五来了,开始等待其他人。
李四来了,开始等待其他人。
所有人都到齐了,开始开会!
李四发言...
张三发言...
赵六发言...
钱七发言...
王五发言...
以上是关于jdk1.8 J.U.C并发源码阅读------CyclicBarrier源码解析的主要内容,如果未能解决你的问题,请参考以下文章
jdk1.8 J.U.C并发源码阅读------ReentrantLock源码解析
jdk1.8 J.U.C并发源码阅读------ReentrantLock源码解析
jdk1.8 J.U.C并发源码阅读------CountDownLatch源码解析
jdk1.8 J.U.C并发源码阅读------CountDownLatch源码解析