25.CyclicBarrire的功能和作用

Posted 纵横千里,捭阖四方

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了25.CyclicBarrire的功能和作用相关的知识,希望对你有一定的参考价值。

CyclicBarrire的意思是可循环(Cyclic)使用的屏障Barrire,主要作用是让一组线程达到一个屏障(也可以称为同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会打开,所有被屏障拦截的线程才能继续往下执行,线性进入屏障通过CyclicBarrire的await()方法实现的。

1 基本使用

我们先写个例子看一下如何使用CyclicBarrire。

public class CyclicBarrierExample 
    public static void main(String[] args) 
        int parties = 4;
        CyclicBarrier barrier = new CyclicBarrier(parties, () -> 
            System.out.println("所有线程执行完毕,继续处理其他任务");
        );
        for (int i = 0; i < parties; i++) 
            new ImportDataTask(barrier).start();
        

    

    static class ImportDataTask extends Thread 
        private CyclicBarrier cyclicBarrier;

        public ImportDataTask(CyclicBarrier cyclicBarrier) 
            this.cyclicBarrier = cyclicBarrier;
        

        @Override
        public void run() 
            try 
                Thread.sleep(1000);
                System.out.println("线程 " + Thread.currentThread().getName() + "数据导入完毕,等待其他线程");
                cyclicBarrier.await();
             catch (Exception e) 
                e.printStackTrace();
            
        
    

该代码做的事情是:构建一个要求4个线程参与的CyclicBarrier实例,定义四个线程分别执行Writer写入,每个Writer线程执行写入完成后,调用cyclicBarrier.await()阻塞线程。当四个线程都调用await()方法之后,这四个线程都会被唤醒继续往下执行。

其中CyclicBarrier的构造参数代表参与的线程数量,当有线程调用await()方法时先阻塞线程,只有达到该数量的线程都调用await()方法后,这些线程才全部被唤醒。

我们通过下面的图示来进一步理解:

2 实现原理

CyclicBarrier包含了两层意思,第一个是前面说的屏障点,线程调用await()方法都会阻塞再屏障点,知道所有线程都达到屏障点再放行。第二个层面是Cyclic循环,当所有线程通过当前屏障点之后 ,又可以进入下一轮的屏障点进行等待,可以不断循环。

在CyclicBarrier中定义了两个int类型的变量,分别是parties和count,这两个变量的作用如下:

  • parties表示每次要求达到屏障点的线程数,只有满足指定数量的线程,所有线程才会被唤醒。

  • count用来实现内部的计数器,初始值就是parties,后续再每个线程调用await()方法时,会对count--,当count=0时会唤醒所有的线程。

以下是CyclicBarrier中定义的成员变量,可以看到,内部使用了重入锁和Condition,也就是说CyclicBarrier中的线程阻塞和唤醒是基于Condition实现的。

public class CyclicBarrier 
    private static class Generation 
        boolean broken = false;
    

    private final ReentrantLock lock = new ReentrantLock();
    private final Condition trip = lock.newCondition();
    private final int parties;
    private final Runnable barrierCommand;
    private Generation generation = new Generation();

    private int count;

另外,CyclicBarrier有一个静态内部类Generation,该类的对象代表屏障点当前generation(代),每次当所有线程通过屏障点后,表示当前generation已经过去了,会进入下一个generation,CyclicBarrier用其实现循环等待。

await方法的代码如下:

public int await() throws InterruptedException, BrokenBarrierException 
    try 
        return dowait(false, 0L);
     catch (TimeoutException toe) 
        throw new Error(toe); // cannot happen
    


private int dowait(boolean timed, long nanos)
    throws InterruptedException, BrokenBarrierException,
           TimeoutException 
    final ReentrantLock lock = this.lock;
    lock.lock();//获得重入锁
    try 
        final Generation g = generation;
//确认当前generation的barrier是否失效
        if (g.broken)
            throw new BrokenBarrierException();

        if (Thread.interrupted()) 
            breakBarrier();
            throw new InterruptedException();
        
//统计已经到达当前generation的线程数量
        int index = --count;
        //如果是0,表示所有线程都达到了屏障点
        if (index == 0)   // tripped
            boolean ranAction = false;
            try 
                final Runnable command = barrierCommand;
                if (command != null)
                //如果CyclicBarrier的回调不为空,则直接触发回调
                    command.run();
                ranAction = true;
                //进入下一个屏障周期
                nextGeneration();
                return 0;
             finally 
            //如果执行屏障点回调任务失败,则将屏障点失效
                if (!ranAction)
                    breakBarrier();
            
        

        // 循环等待,直到所有线程达到屏障点,或者屏障点失效,线程中断、等待超时
        for (;;) 
            try 
            //是否带有等待超时时间,如果没有,则直接调用await()方法阻塞当前线程
                if (!timed)
                    trip.await();
                else if (nanos > 0L)
                //否则采用超时等待
                    nanos = trip.awaitNanos(nanos);
             catch (InterruptedException ie)    //被其他线程通过interrupt()方法唤醒
         //如果是当前generation且没有被broken,则让屏障失效并抛出异常
                if (g == generation && ! g.broken) 
                    breakBarrier();
                    throw ie;
                 else 
                    Thread.currentThread().interrupt();
                
            
//有任何一个线程被中断时,都会调用breakBarrier()方法,而在该方法中会唤醒所有处于await()阻塞情况下的线程。
//如果其他线程被唤醒,那么也需要抛出异常
            if (g.broken)
                throw new BrokenBarrierException();
//被唤醒的generation和当前的不同,不做处理
            if (g != generation)
                return index;
//如果在等待超时之后被唤醒,说明还有线程没有达到屏障点,则让屏障点失效
            if (timed && nanos <= 0L) 
                breakBarrier();
                throw new TimeoutException();
            
        
     finally 
        lock.unlock();
    

上面代码较长,整体逻辑如下:

  • 正常情况下,线程调用cyclicBarrier.await()方法直接阻塞当前线程,所以在dowait()方法中调用trip.await()方法阻塞当前线程。

  • 每个线程在调用cyclicBarrier.await()方法时,都会在代码中通过int index=-count对计数器进行递减,如果为0,则可以直接唤醒所有线程(nextGeneration()),并且如果异步回调任务barrierCommand不为空,则会同时执行该任务。

private void nextGeneration() 
    // signal completion of last generation
    trip.signalAll();
    // set up next generation
    count = parties;
    generation = new Generation();
  • 被trip.await()方法阻塞的线程,除了可以通过trip.signalAll()方法唤醒外,还可以被interrupt()方法唤醒的,这属于异常唤醒。被环形的通过g == generation && ! g.broken判断是否是当前generation,以及屏障点是否失效。如果没有失效,则调用breakBarrier()方法让屏障点失效。

private void breakBarrier() 
    generation.broken = true;
    count = parties;
    trip.signalAll();

被中断的线程调用breakBarrier()方法,表示让当前屏障点失效,并且唤醒所有被阻塞的线程。接着被唤醒的线程需要通过if(g.broken)判断屏障点是否失效,如果是则意味着所有被唤醒的线程都要抛出异常。

  • 最后一种情况,被唤醒的线程可能会调用带有超时机制的阻塞方法 nacos=trip.awaitNanos(nacos),所以如果超过指定时间后相关线程还没有到达当前generation的屏障点,则同样可以通过breakBarrier()让屏障点失效。

最后还有一个reset()方法再看一下。

public void reset() 
    final ReentrantLock lock = this.lock;
    lock.lock();
    try 
        breakBarrier();   //中断当前的 generation
        nextGeneration(); // 开始新的 generation
     finally 
        lock.unlock();
    

这里的功能非常简单就是把原本阻塞在屏障点中的线程全部唤醒,然后进入下一个generation周期。

整体来看,CyclicBarrier的代码非常精简,实现的逻辑也不复杂,核心思想是通过Condition实现指定条件的线程等待和唤醒。通过CyclicBarrier的源码分析,我们可以更好的理解Condition作为基础组件如何灵活应用在不同的场景中。

以上是关于25.CyclicBarrire的功能和作用的主要内容,如果未能解决你的问题,请参考以下文章

内存屏障

聊一聊DTM子事务屏障功能之SQL Server版

回环屏障CyclicBarrier

java CyclicBarrier同步屏障

Java基础和JUC常见问题

Java基础和JUC常见问题