25.CyclicBarrire的功能和作用
Posted 纵横千里,捭阖四方
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了25.CyclicBarrire的功能和作用相关的知识,希望对你有一定的参考价值。
CyclicBarrire的意思是可循环(Cyclic)使用的屏障Barrire,主要作用是让一组线程达到一个屏障(也可以称为同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会打开,所有被屏障拦截的线程才能继续往下执行,线性进入屏障通过CyclicBarrire的await()方法实现的。
1 基本使用
我们先写个例子看一下如何使用CyclicBarrire。
public class CyclicBarrierExample
public static void main(String[] args)
int parties = 4;
CyclicBarrier barrier = new CyclicBarrier(parties, () ->
System.out.println("所有线程执行完毕,继续处理其他任务");
);
for (int i = 0; i < parties; i++)
new ImportDataTask(barrier).start();
static class ImportDataTask extends Thread
private CyclicBarrier cyclicBarrier;
public ImportDataTask(CyclicBarrier cyclicBarrier)
this.cyclicBarrier = cyclicBarrier;
@Override
public void run()
try
Thread.sleep(1000);
System.out.println("线程 " + Thread.currentThread().getName() + "数据导入完毕,等待其他线程");
cyclicBarrier.await();
catch (Exception e)
e.printStackTrace();
该代码做的事情是:构建一个要求4个线程参与的CyclicBarrier实例,定义四个线程分别执行Writer写入,每个Writer线程执行写入完成后,调用cyclicBarrier.await()阻塞线程。当四个线程都调用await()方法之后,这四个线程都会被唤醒继续往下执行。
其中CyclicBarrier的构造参数代表参与的线程数量,当有线程调用await()方法时先阻塞线程,只有达到该数量的线程都调用await()方法后,这些线程才全部被唤醒。
我们通过下面的图示来进一步理解:
2 实现原理
CyclicBarrier包含了两层意思,第一个是前面说的屏障点,线程调用await()方法都会阻塞再屏障点,知道所有线程都达到屏障点再放行。第二个层面是Cyclic循环,当所有线程通过当前屏障点之后 ,又可以进入下一轮的屏障点进行等待,可以不断循环。
在CyclicBarrier中定义了两个int类型的变量,分别是parties和count,这两个变量的作用如下:
-
parties表示每次要求达到屏障点的线程数,只有满足指定数量的线程,所有线程才会被唤醒。
-
count用来实现内部的计数器,初始值就是parties,后续再每个线程调用await()方法时,会对count--,当count=0时会唤醒所有的线程。
以下是CyclicBarrier中定义的成员变量,可以看到,内部使用了重入锁和Condition,也就是说CyclicBarrier中的线程阻塞和唤醒是基于Condition实现的。
public class CyclicBarrier
private static class Generation
boolean broken = false;
private final ReentrantLock lock = new ReentrantLock();
private final Condition trip = lock.newCondition();
private final int parties;
private final Runnable barrierCommand;
private Generation generation = new Generation();
private int count;
另外,CyclicBarrier有一个静态内部类Generation,该类的对象代表屏障点当前generation(代),每次当所有线程通过屏障点后,表示当前generation已经过去了,会进入下一个generation,CyclicBarrier用其实现循环等待。
await方法的代码如下:
public int await() throws InterruptedException, BrokenBarrierException
try
return dowait(false, 0L);
catch (TimeoutException toe)
throw new Error(toe); // cannot happen
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException
final ReentrantLock lock = this.lock;
lock.lock();//获得重入锁
try
final Generation g = generation;
//确认当前generation的barrier是否失效
if (g.broken)
throw new BrokenBarrierException();
if (Thread.interrupted())
breakBarrier();
throw new InterruptedException();
//统计已经到达当前generation的线程数量
int index = --count;
//如果是0,表示所有线程都达到了屏障点
if (index == 0) // tripped
boolean ranAction = false;
try
final Runnable command = barrierCommand;
if (command != null)
//如果CyclicBarrier的回调不为空,则直接触发回调
command.run();
ranAction = true;
//进入下一个屏障周期
nextGeneration();
return 0;
finally
//如果执行屏障点回调任务失败,则将屏障点失效
if (!ranAction)
breakBarrier();
// 循环等待,直到所有线程达到屏障点,或者屏障点失效,线程中断、等待超时
for (;;)
try
//是否带有等待超时时间,如果没有,则直接调用await()方法阻塞当前线程
if (!timed)
trip.await();
else if (nanos > 0L)
//否则采用超时等待
nanos = trip.awaitNanos(nanos);
catch (InterruptedException ie) //被其他线程通过interrupt()方法唤醒
//如果是当前generation且没有被broken,则让屏障失效并抛出异常
if (g == generation && ! g.broken)
breakBarrier();
throw ie;
else
Thread.currentThread().interrupt();
//有任何一个线程被中断时,都会调用breakBarrier()方法,而在该方法中会唤醒所有处于await()阻塞情况下的线程。
//如果其他线程被唤醒,那么也需要抛出异常
if (g.broken)
throw new BrokenBarrierException();
//被唤醒的generation和当前的不同,不做处理
if (g != generation)
return index;
//如果在等待超时之后被唤醒,说明还有线程没有达到屏障点,则让屏障点失效
if (timed && nanos <= 0L)
breakBarrier();
throw new TimeoutException();
finally
lock.unlock();
上面代码较长,整体逻辑如下:
-
正常情况下,线程调用cyclicBarrier.await()方法直接阻塞当前线程,所以在dowait()方法中调用trip.await()方法阻塞当前线程。
-
每个线程在调用cyclicBarrier.await()方法时,都会在代码中通过int index=-count对计数器进行递减,如果为0,则可以直接唤醒所有线程(nextGeneration()),并且如果异步回调任务barrierCommand不为空,则会同时执行该任务。
private void nextGeneration()
// signal completion of last generation
trip.signalAll();
// set up next generation
count = parties;
generation = new Generation();
-
被trip.await()方法阻塞的线程,除了可以通过trip.signalAll()方法唤醒外,还可以被interrupt()方法唤醒的,这属于异常唤醒。被环形的通过g == generation && ! g.broken判断是否是当前generation,以及屏障点是否失效。如果没有失效,则调用breakBarrier()方法让屏障点失效。
private void breakBarrier()
generation.broken = true;
count = parties;
trip.signalAll();
被中断的线程调用breakBarrier()方法,表示让当前屏障点失效,并且唤醒所有被阻塞的线程。接着被唤醒的线程需要通过if(g.broken)判断屏障点是否失效,如果是则意味着所有被唤醒的线程都要抛出异常。
-
最后一种情况,被唤醒的线程可能会调用带有超时机制的阻塞方法 nacos=trip.awaitNanos(nacos),所以如果超过指定时间后相关线程还没有到达当前generation的屏障点,则同样可以通过breakBarrier()让屏障点失效。
最后还有一个reset()方法再看一下。
public void reset()
final ReentrantLock lock = this.lock;
lock.lock();
try
breakBarrier(); //中断当前的 generation
nextGeneration(); // 开始新的 generation
finally
lock.unlock();
这里的功能非常简单就是把原本阻塞在屏障点中的线程全部唤醒,然后进入下一个generation周期。
整体来看,CyclicBarrier的代码非常精简,实现的逻辑也不复杂,核心思想是通过Condition实现指定条件的线程等待和唤醒。通过CyclicBarrier的源码分析,我们可以更好的理解Condition作为基础组件如何灵活应用在不同的场景中。
以上是关于25.CyclicBarrire的功能和作用的主要内容,如果未能解决你的问题,请参考以下文章