CyclicBarrier源码分析

Posted 叶长风

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了CyclicBarrier源码分析相关的知识,希望对你有一定的参考价值。

CyclicBarrier源码分析

上一篇讲到了使用CountDownLatch来阻塞主线程继续执行,然后想到了CyclicBarrier,CyclicBarrier则与CountDownLatch相反,CyclicBarrier一般是用来阻塞子线程执行,在这想到CyclicBarrier虽然用的比较多,但是对其工作机制,源码并不是太熟悉,因此此处写一篇CyclicBarrier的源码分析记录一下。

CyclicBarrier用法


写个小程序演示用法,如下:

CyclicBarrier barrier = new CyclicBarrier(2);
        System.out.println("等待线程数:" + barrier.getNumberWaiting());
        ExecutorService executorService = Executors.newFixedThreadPool(5);
        executorService.execute(() -> 
            try 
                barrier.await();
                System.out.println(Thread.currentThread().getName() + " is running 1!");
             catch (Exception e) 
                e.printStackTrace();
            
        );
        System.out.println("等待线程数:" + barrier.getNumberWaiting());
        executorService.execute(() -> 
            try 
                barrier.await();
                System.out.println(Thread.currentThread().getName() + " is running 2!");
             catch (Exception e) 
                e.printStackTrace();
            
        );

这个程序与CountDownLatch的不同点就在于CountDownLatch一般用于阻塞主线程的执行,而CyclicBarrier用于阻塞子线程的执行,那么CountDownLatch和CyclicBarrier其实用法差不多,功能应该也是差不多,CountDownLatch是Count值递增到设定值后主线程继续执行,想必CyclicBarrier应该就是返过来,先设定count值,count值递减到0后线程继续执行,下面就来看下CyclicBarrier的源码。

CyclicBarrier源码


看下CyclicBarrier的初始化方式:

CyclicBarrier barrier = new CyclicBarrier(2);

进入CyclicBarrier的构造方法。

public CyclicBarrier(int parties) 
        this(parties, null);
    
public CyclicBarrier(int parties, Runnable barrierAction) 
        if (parties <= 0) throw new IllegalArgumentException();
        this.parties = parties;
        this.count = parties;
        this.barrierCommand = barrierAction;
    

这里的操作就是讲初始值赋值给parties和count,另外对barrierCommand参数也进行了初始化操作,但是这里我们初始值为null,因此barrierCommand值也为null,暂时不讲这两parties和barrierCommand的用处,在后面再详细讲述。

现在来看CyclicBarrier的使用。

executorService.execute(() -> 
            try 
                barrier.await();
                System.out.println(Thread.currentThread().getName() + " is running 1!");
             catch (Exception e) 
                e.printStackTrace();
            
        );

主要阻塞子线程工作的代码段为:

barrier.await();

进入到await方法看看。

public int await() throws InterruptedException, BrokenBarrierException 
        try 
            return dowait(false, 0L);
         catch (TimeoutException toe) 
            throw new Error(toe); // cannot happen
        
    

/**
     * Main barrier code, covering the various policies.
     */
    private int dowait(boolean timed, long nanos)
        throws InterruptedException, BrokenBarrierException,
               TimeoutException 
        final ReentrantLock lock = this.lock;
        lock.lock();
        try 
            final Generation g = generation;

            if (g.broken)
                throw new BrokenBarrierException();

            if (Thread.interrupted()) 
                breakBarrier();
                throw new InterruptedException();
            

            int index = --count;
            if (index == 0)   // tripped
                boolean ranAction = false;
                try 
                    final Runnable command = barrierCommand;
                    if (command != null)
                        command.run();
                    ranAction = true;
                    nextGeneration();
                    return 0;
                 finally 
                    if (!ranAction)
                        breakBarrier();
                
            

            // loop until tripped, broken, interrupted, or timed out
            for (;;) 
                try 
                    if (!timed)
                        trip.await();
                    else if (nanos > 0L)
                        nanos = trip.awaitNanos(nanos);
                 catch (InterruptedException ie) 
                    if (g == generation && ! g.broken) 
                        breakBarrier();
                        throw ie;
                     else 
                        // We're about to finish waiting even if we had not
                        // been interrupted, so this interrupt is deemed to
                        // "belong" to subsequent execution.
                        Thread.currentThread().interrupt();
                    
                

                if (g.broken)
                    throw new BrokenBarrierException();

                if (g != generation)
                    return index;

                if (timed && nanos <= 0L) 
                    breakBarrier();
                    throw new TimeoutException();
                
            
         finally 
            lock.unlock();
        
    

方法比较长,拆开一步步看。

final ReentrantLock lock = this.lock;
lock.lock();

首先第一步加锁,没有异议,对于这种多线程场景下,基本都是加锁来完成方法的调用。

接下来:

final Generation g = generation;

if (g.broken)
     throw new BrokenBarrierException();

if (Thread.interrupted()) 
        breakBarrier();
        throw new InterruptedException();

实例化Generation对象, 这个Generation对象比较简单,就是一个静态类,然后静态类中有一boolean变量,用来标记当前的barrier对象是否已经被重置或者中断。

private static class Generation 
        boolean broken = false;
    

继续回到上面这个方法,当broken字段为true,或者线程被中断后,则抛出异常。

这里看下breakBarrier方法。

private void breakBarrier() 
        generation.broken = true;
        count = parties;
        trip.signalAll();
    

这里breakBarrier方法做的事情就是讲generation中的broken字段置为true,parties值赋值给count,同时trip的signalAll()动作,看下trip对象。

/** Condition to wait on until tripped */
    private final Condition trip = lock.newCondition();

是Condition对象,这个在随后进行讲述,一般来说,Condition和CyclicBarrier是在一起配合使用。

在这breakBarrier动作基本就是通知其他等待线程barrier已经被破坏,不再按照之前设定条件执行。

我们继续回到之前的代码。

int index = --count;
if (index == 0)   // tripped
    boolean ranAction = false;
    try 
          final Runnable command = barrierCommand;
          if (command != null)
                command.run();
           ranAction = true;
           nextGeneration();
           return 0;
          finally 
              if (!ranAction)
                   breakBarrier();
           

这里的动作就是建立index变量,index值为count减一得到。

int index = --count;

随后,下面的代码块是在index为0时的操作,在获取到构造方法传入的barrierCommand后,进行类barrierCommand的运行,并重新对generation进行了实例化操作。

如果command为null,则就是最后一个线程到达了barrier的条件,这时就放开barrier。

finally 
              if (!ranAction)
                   breakBarrier();
           

接下来看没有达到barrier条件时的程序逻辑。

// loop until tripped, broken, interrupted, or timed out
            for (;;) 
                try 
                    if (!timed)
                        trip.await();
                    else if (nanos > 0L)
                        nanos = trip.awaitNanos(nanos);
                 catch (InterruptedException ie) 
                    if (g == generation && ! g.broken) 
                        breakBarrier();
                        throw ie;
                     else 
                        // We're about to finish waiting even if we had not
                        // been interrupted, so this interrupt is deemed to
                        // "belong" to subsequent execution.
                        Thread.currentThread().interrupt();
                    
                

                if (g.broken)
                    throw new BrokenBarrierException();

                if (g != generation)
                    return index;

                if (timed && nanos <= 0L) 
                    breakBarrier();
                    throw new TimeoutException();
                
            
         finally 
            lock.unlock();
        

这里的操作就是这个无限for循环,然后循环结束的条件便是超时、线程中断、broken字段为true或者是条件满足的情况。

// loop until tripped, broken, interrupted, or timed out
            for (;;) 
            

接着继续看下面的代码,如果timed字段为false时,则进行等待,否则等待配置的执行时长,中间如果发生InterruptedException异常,则结束等待,抛出异常。同时breakBarrier方法中也会进行唤醒其他线程的操作。

try 
                    if (!timed)
                        trip.await();
                    else if (nanos > 0L)
                        nanos = trip.awaitNanos(nanos);
                 catch (InterruptedException ie) 
                    if (g == generation && ! g.broken) 
                        breakBarrier();
                        throw ie;
                     else 
                        // We're about to finish waiting even if we had not
                        // been interrupted, so this interrupt is deemed to
                        // "belong" to subsequent execution.
                        Thread.currentThread().interrupt();
                    
                

在等待时间满足后,如果broken为false,等待时间不小于0,或者异常中为其他中断异常,则继续for循环的动作,进行等待。

if (g.broken)
    throw new BrokenBarrierException();

if (g != generation)
      return index;

if (timed && nanos <= 0L) 
      breakBarrier();
      throw new TimeoutException();

这里这段代码应该是所有条件结束后,对循环条件再进行一次判断,如果generation不同,或者时间小于0之类的则结束循环。

CyclicBarrier就上面这些东西,和CountDownLatch类似,不过和CountDownLatch不同的是,CyclicBarrier还可以重置count结果,可以看一个方法。

public void reset() 
        final ReentrantLock lock = this.lock;
        lock.lock();
        try 
            breakBarrier();   // break the current generation
            nextGeneration(); // start a new generation
         finally 
            lock.unlock();
        
    

这个方法对count值和generation进行了重新赋值,这样CyclicBarrier就又可以重新开始了。

CyclicBarrier就讲述到这里。

以上是关于CyclicBarrier源码分析的主要内容,如果未能解决你的问题,请参考以下文章

CyclicBarrier源码分析

JDK源码分析通过源码分析CyclicBarrier

CyclicBarrier源码分析

源码分析-CyclicBarrier

Java并发包中CyclicBarrier的源码分析和使用

深入浅出Java并发编程指南「源码原理系列」让我们一起探索一下CyclicBarrier的技术原理和源码分析