源码分析-CyclicBarrier

Posted 千念飞羽

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了源码分析-CyclicBarrier相关的知识,希望对你有一定的参考价值。

CyclicBarrier

从用法上来说,CyclicBarrier可能看出是CountDownLatch的高级版本,增加了重置的功能,对于多个线程的中断提供了通知的功能。

具体的用法通过api就有比较详细的介绍。

内部类Generation-如何实现重置功能的

首先CyclicBarrier内部有一个内部静态类Generation。当然在每个CyclicBarrier实例中也有一个Generation域

这个类只有一个内部域broken用来表示当前的屏障是否被打破了。

    private static class Generation 
        boolean broken = false;
    

Generation只在线程不中断的情况下用来判断CyclicBarrier的状态的。
是由于有count个线程调用了await来正常中断的——即所谓的开闸状态。
还是由于其他特殊原因打破了CyclicBarrier(也就是当前CyclicBarrier无效了)——即所谓的打破状态。

而如果需要重置也就是讲CyclicBarrier实例中的域来重新构建一个新的Generation就可以了。

工作原理

    private final ReentrantLock lock = new ReentrantLock();//所有方法都通过这个锁来同步。之所以不使用内置锁主要是因为需要抛出异常。此外这里需要的实际上是共享锁,而内置锁不能实现共享锁。
    private final Condition trip = lock.newCondition();//通过lock得到的一个状态变量
    private final int parties;//通过构造器传入的参数,表示总的等待线程的数量。
    private final Runnable barrierCommand;//当屏障正常打开后运行的程序,通过最后一个调用await的线程来执行。
    private Generation generation = new Generation();当前的Generation。每当屏障失效或者开闸之后都会自动替换掉。从而实现重置的功能。

锁、条件队列、状态变量、条件谓词之间的关系。

方法

最主要的就是await()方法。

实现的功能:

调用await()的线程会等待直到有足够数量的线程调用await——也就是开闸状态,

  • 当最后一个线程到达

或者出现下面的情况——也就是打破状态。

  • 有其他线程中断当前线程。则抛出interruptException
  • 指定了限时操作,并到达线程,则抛出TimeoutException
  • 如果barrier被重置,或者屏障处于打破状态,则抛出BrokenBarrierException

什么样的情况会出现打破状态?当任意等待线程抛出BrokenBarrierException的时候会使得当前屏障处于打破状态。

await方法是通过一个内部方法dowait来实现的。

    private int dowait(boolean timed, long nanos)
        throws InterruptedException, BrokenBarrierException,
               TimeoutException 
        final ReentrantLock lock = this.lock;
        lock.lock();
        try 
            final Generation g = generation;

            if (g.broken)//如果当前Generation是处于打破状态则传播这个BrokenBarrierExcption
                throw new BrokenBarrierException();

            if (Thread.interrupted()) 
                breakBarrier();//如果当前线程被中断则使得当前generation处于打破状态,重置剩余count。并且唤醒状态变量。这时候其他线程会传播BrokenBarrierException.
                throw new InterruptedException();
            

           int index = --count;//尝试降低当前count
           if (index == 0)   // tripped//如果当前状态将为0,则Generation处于开闸状态。运行可能存在的command,设置下一个Generation。相当于每次开闸之后都进行了一次reset。
               boolean ranAction = false;
               try 
                   final Runnable command = barrierCommand;
                   if (command != null)
                       command.run();
                   ranAction = true;
                   nextGeneration();
                   return 0;
                finally 
                   if (!ranAction)//如果运行command失败也会导致当前屏障被打破。
                       breakBarrier();
               
           

            // loop until tripped, broken, interrupted, or timed out
            for (;;) 
                try 
                    if (!timed)//阻塞在当前的状态变量。
                        trip.await();
                    else if (nanos > 0L)
                        nanos = trip.awaitNanos(nanos);
                 catch (InterruptedException ie) 
                    if (g == generation && ! g.broken) //如果当前线程被中断了则使得屏障被打破。并抛出异常。
                        breakBarrier();
                        throw ie;
                     else 
                        // We're about to finish waiting even if we had not
                        // been interrupted, so this interrupt is deemed to
                        // "belong" to subsequent execution.
                        Thread.currentThread().interrupt();//这种捕获了InterruptException之后调用Thread.currentThread().interrupt()是一种通用的方式。但是之前源码中好像都没有体现。我第一次见这个好像是java并发实践中。这样做的目的是什么?其实就是为了保存中断状态,从而让其他更高层次的代码注意到这个中断。但是需要注意的是这里需要其他代码予以配合才行否则这样做其实是比较危险的一种方式,因为这相当于吞了这个异常。
                    
                

                //从阻塞恢复之后,需要重新判断当前的状态。
                if (g.broken)
                    throw new BrokenBarrierException();

                if (g != generation)
                    return index;

                if (timed && nanos <= 0L) 
                    breakBarrier();
                    throw new TimeoutException();
                
            
         finally 
            lock.unlock();
        
    

此外再看下两个小过程:

这两个小过程当然是需要锁的,但是由于这两个方法只是通过其他方法调用,所以依然是在持有锁的范围内运行的。这两个方法都是对域进行操作。

nextGeneration实际上在屏障开闸之后重置状态。以待下一次调用。
breakBarrier实际上是在屏障打破之后设定打破状态,以唤醒其他线程并通知。

    private void nextGeneration() 
        // signal completion of last generation
        trip.signalAll();
        // set up next generation
        count = parties;
        generation = new Generation();
    

    private void breakBarrier() 
        generation.broken = true;
        count = parties;
        trip.signalAll();
    

reset
reset方法比较简单。但是这里还是要注意一下要先打破当前屏蔽,然后再重建一个新的屏蔽。否则的话可能会导致信号丢失。

    public void reset() 
        final ReentrantLock lock = this.lock;
        lock.lock();
        try 
            breakBarrier();   // break the current generation
            nextGeneration(); // start a new generation
         finally 
            lock.unlock();
        
    

以上是关于源码分析-CyclicBarrier的主要内容,如果未能解决你的问题,请参考以下文章

concurrent同步屏障 CyclicBarrier & 源码分析

Java并发编程(十六):CyclicBarrier源码分析

Java并发编程(十六):CyclicBarrier源码分析

CyclicBarrier源码分析

Java Review - 并发编程_ 回环屏障CyclicBarrier原理&源码剖析

CyclicBarrier源码分析