Java多线程同步工具类之CyclicBarrier
Posted dafanjoy
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Java多线程同步工具类之CyclicBarrier相关的知识,希望对你有一定的参考价值。
一、CyclicBarrier使用
CyclicBarrier从字面上可以直接理解为线程运行的屏障,它可以让一组线程执行到一个共同的屏障点时被阻塞,直到最后一个线程执行到指定位置,你设置的执行线程就会触发运行;同时CyclicBarrier相比与CountDownLatch,它是可以被重置的;下面我们通过一个简单例子看下CyclicBarrier的使用;
实例化一个CyclicBarrier对象并传入你要控制的线程内部;
public static void main(String[] args) CyclicBarrier cb = new CyclicBarrier(3, new Runnable() public void run() System.out.println("所有线程集合"); ); for (int i = 0; i < 3; i++) new CyclicBarrierThread(i + "", cb).start();
计数线程代码,每当计数到偶数时调用CyclicBarrier的await()方法
public class CyclicBarrierThread extends Thread private CyclicBarrier barrier; private String name; private int count; public CyclicBarrierThread(String name,CyclicBarrier barrier) this.name=name; this.barrier=barrier; this.count=0; public void run() try for(int i=0;i<10;i++) Thread.sleep(100); count++; System.out.println(name+"号线程---"+Thread.currentThread().getName()+"开始计数:"+count); if(count%2==0) //每计数到偶数次时集合一次 barrier.await(); System.out.println(name+"号线程---"+Thread.currentThread().getName()+"集合完毕,继续计数"); catch (Exception e) // TODO Auto-generated catch block e.printStackTrace();
查看代码输出
2号线程---Thread-2开始计数:1 0号线程---Thread-0开始计数:1 1号线程---Thread-1开始计数:1 2号线程---Thread-2开始计数:2 1号线程---Thread-1开始计数:2 0号线程---Thread-0开始计数:2 所有线程集合 2号线程---Thread-2集合完毕,继续计数 1号线程---Thread-1集合完毕,继续计数 0号线程---Thread-0集合完毕,继续计数 2号线程---Thread-2开始计数:3 1号线程---Thread-1开始计数:3 0号线程---Thread-0开始计数:3 2号线程---Thread-2开始计数:4 0号线程---Thread-0开始计数:4 1号线程---Thread-1开始计数:4 所有线程集合 1号线程---Thread-1集合完毕,继续计数 2号线程---Thread-2集合完毕,继续计数 0号线程---Thread-0集合完毕,继续计数 0号线程---Thread-0开始计数:5 2号线程---Thread-2开始计数:5 1号线程---Thread-1开始计数:5 0号线程---Thread-0开始计数:6 1号线程---Thread-1开始计数:6 2号线程---Thread-2开始计数:6 所有线程集合 2号线程---Thread-2集合完毕,继续计数 0号线程---Thread-0集合完毕,继续计数 1号线程---Thread-1集合完毕,继续计数 0号线程---Thread-0开始计数:7 1号线程---Thread-1开始计数:7 2号线程---Thread-2开始计数:7 1号线程---Thread-1开始计数:8 0号线程---Thread-0开始计数:8 2号线程---Thread-2开始计数:8 所有线程集合 2号线程---Thread-2集合完毕,继续计数 0号线程---Thread-0集合完毕,继续计数 1号线程---Thread-1集合完毕,继续计数 0号线程---Thread-0开始计数:9 1号线程---Thread-1开始计数:9 2号线程---Thread-2开始计数:9 1号线程---Thread-1开始计数:10 0号线程---Thread-0开始计数:10 2号线程---Thread-2开始计数:10 所有线程集合 1号线程---Thread-1集合完毕,继续计数 2号线程---Thread-2集合完毕,继续计数 0号线程---Thread-0集合完毕,继续计数
通过输出结果可以看到,计数线程每计数到偶数次时使用CyclicBarrier的await()方法,线程都会进入阻塞等待的状态,直到最后一个线程到达屏障点时,触发你定义的执行线程,而且CyclicBarrier的await()方法是可以重复使用的。
二、CyclicBarrier源码分析
下面我们就对CyclicBarrier内部的源码实现进行一些分析与总结
1、CyclicBarrier的构造
首先看下CyclicBarrier的构造函数
public CyclicBarrier(int parties, Runnable barrierAction) if (parties <= 0) throw new IllegalArgumentException(); //拦截的线程数量 this.parties = parties; //用于计数的count值,每有一个线程执行到屏障点,就会递减1 this.count = parties; //定义的拦截线程 this.barrierCommand = barrierAction;
CyclicBarrier的构造函数很简单就是接收你要拦截的线程数量与定义的执行线程。
2、await方法
public int await() throws InterruptedException, BrokenBarrierException try return dowait(false, 0L); catch (TimeoutException toe) throw new Error(toe); // cannot happen
我们看下具体实现dowait方法的实现
private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException //获取可重入锁 final ReentrantLock lock = this.lock; //加锁 lock.lock(); try //CyclicBarrier内部定义的一个Generation类 final Generation g = generation; //判断Generation的broken状态 if (g.broken) throw new BrokenBarrierException(); //如果线程被中断 if (Thread.interrupted()) //Generation的broken置为true,count值重置,并唤醒所有线程 breakBarrier(); throw new InterruptedException(); //count值减一 int index = --count; if (index == 0) // 如果conunt为0,说明最后一个线程到大屏障 boolean ranAction = false; try final Runnable command = barrierCommand; if (command != null) command.run();//执行你传入的线程 ranAction = true; nextGeneration();//唤醒所有阻塞的线程,同时重置count值与Generation return 0; finally if (!ranAction) //拦截线程没有正常执行,唤醒所有线程,同时重置count值,Generation的broken置为true breakBarrier(); // loop until tripped, broken, interrupted, or timed out for (;;) try //是否设置阻塞的超时时间 if (!timed) //释放当前锁 trip.await();//false 表示不设置,一直阻塞 else if (nanos > 0L) nanos = trip.awaitNanos(nanos);//true 设置阻塞的超时时间 catch (InterruptedException ie) if (g == generation && ! g.broken) breakBarrier(); throw ie; else // We‘re about to finish waiting even if we had not // been interrupted, so this interrupt is deemed to // "belong" to subsequent execution. Thread.currentThread().interrupt(); if (g.broken) throw new BrokenBarrierException(); if (g != generation) return index; if (timed && nanos <= 0L) breakBarrier(); throw new TimeoutException(); finally //释放锁 lock.unlock();
dowait方法的实现流程是很清晰的,通过ReentrantLock的Condition接口与count值相互配合,主要完成以下功能:
1、当需要拦截的线程到达屏障点调用await方法后获取ReentrantLock锁,保证线程安全;
2、检查count值是否为0,判断是否是最后一个线程到达屏障,如果是的话执行需要触发执行的线程,调用Condition的signalAll方法唤醒所有阻塞的线程,并重置count值与Generation类,保障CyclicBarrier的重复可用;
3、如果不是最后一个线程的话,根据传入的参数调用Condition的await方法释放锁资源并进入阻塞等待,直到被唤醒;
3、reset方法
可以用来主动重置CyclicBarrier的状态
public void reset() final ReentrantLock lock = this.lock; lock.lock(); try //generation.broken设置为true,唤醒所有线程,count值重置 breakBarrier(); nextGeneration(); finally lock.unlock(); private void nextGeneration() // signal completion of last generation trip.signalAll(); // set up next generation count = parties; generation = new Generation(); private void breakBarrier() generation.broken = true; count = parties; trip.signalAll();
breakBarrier()与nextGeneration(),这两个方法的主要区别就在于前者会把generation.broken设置为true,也就是说如果调用reset方法主动重置CyclicBarrier类的状态,当前正在使用CyclicBarrier类同步的线程都会被唤醒或抛出异常;
4、getNumberWaiting方法
public int getNumberWaiting() final ReentrantLock lock = this.lock; lock.lock(); try return parties - count; finally lock.unlock();
很明显getNumberWaiting方法使用来获取当前已经运行至屏蔽点并阻塞等待的线程数量的;
三、总结
通过上面分析可以看到CyclicBarrier的实现原理相对还是比较简单与清晰的,主要是基于ReentrantLock与计数器相结合来实现多个线程的同步控制的。以上就是对CyclicBarrier类的使用与内部实现进行的分析,其中如有不足与不正确的地方还望指出与海涵。
关注微信公众号,查看更多技术文章。
以上是关于Java多线程同步工具类之CyclicBarrier的主要内容,如果未能解决你的问题,请参考以下文章