JUC系列同步工具类之CyclicBarrier
Posted 顧棟
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了JUC系列同步工具类之CyclicBarrier相关的知识,希望对你有一定的参考价值。
同步屏障 CyclicBarrier
文章目录
"循环屏障"是一种同步辅助工具,它允许一组线程相互等待以达到共同的屏障点。 CyclicBarriers 在涉及固定大小的线程组的程序中很有用,这些线程组必须偶尔相互等待。 屏障被称为循环的,因为它可以在等待线程被释放后重新使用。
CyclicBarrier 支持一个可选的 Runnable 命令,该命令在每个屏障点运行一次,在队伍中的最后一个线程到达之后,但在任何线程被释放之前。 此屏障操作对于在任何一方继续之前更新共享状态很有用。
示例
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Map;
import java.util.concurrent.*;
/**
* 分布计算再合并
*/
public class CyclicBarrierDemo2 implements Runnable
/**
* 当最后一个线程到达屏障点,执行此方法,汇总各个线程的值
*/
@Override
public void run()
int result = 0;
// 遍历
for (Map.Entry<String, Integer> ss : s.entrySet())
result += ss.getValue();
s.put("result", result);
System.out.println("[" + new SimpleDateFormat("HH:mm:ss").format(new Date()) + "--" + Thread.currentThread().getName() + "] count value [" + result + "].");
/**
* 定义一个需要4个线程的同步屏障,并在最后一个到达屏障的线程后,执行当前类的run方法
*/
private final CyclicBarrier c = new CyclicBarrier(4, this);
/**
* 4个线程的线程池
*/
private final Executor executor = Executors.newFixedThreadPool(4);
/**
* 用来存放各个线程运算结果的值
*/
private final ConcurrentHashMap<String, Integer> s = new ConcurrentHashMap<>();
/**
* 计算api 4个线程分别向变量s中塞值
*/
private void count()
for (int i = 0; i < 4; i++)
int finalI = i;
executor.execute(() ->
System.out.println("[" + new SimpleDateFormat("HH:mm:ss").format(new Date()) + "--" + Thread.currentThread().getName() + "] put value [" + finalI + "] and await.");
s.put(Thread.currentThread().getName(), finalI);
try
c.await();
catch (InterruptedException | BrokenBarrierException e)
e.printStackTrace();
);
public static void main(String[] args)
CyclicBarrierDemo2 cyclicBarrierDemo2 = new CyclicBarrierDemo2();
cyclicBarrierDemo2.count();
执行结果
[16:51:15--pool-1-thread-1] put value [0] and await.
[16:51:15--pool-1-thread-3] put value [2] and await.
[16:51:15--pool-1-thread-4] put value [3] and await.
[16:51:15--pool-1-thread-2] put value [1] and await.
[16:51:15--pool-1-thread-2] count value [6].
核心思想
内部组合了非公平策略的重入锁,借助AQS实现线程的阻塞和唤醒,主要依赖条件队列。关于AQS的Condition。
组成
构造函数
// 创建一个新的 CyclicBarrier,它将在给定数量的参与方(线程)等待它时触发,并且在触发障碍时不执行预定义的操作。 parties代表线程数
public CyclicBarrier(int parties)
this(parties, null);
// 创建一个新的 CyclicBarrier,当给定数量的参与方(线程)正在等待它时,它将触发,并且当障碍被触发时,它将执行给定的屏障动作,由最后一个进入屏障的线程执行。barrierAction代表当屏障被触发时执行的命令,如果没有动作则为 null
public CyclicBarrier(int parties, Runnable barrierAction)
// 参与者必须大于0
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;
this.barrierCommand = barrierAction;
内部类 Generation
屏障的每次使用都表示为一个生成实例。每当栅栏被触发或重置时,实例就会发生变化。可以有许多代(轮次)与使用屏障的线程相关联
- 由于锁定可能分配给等待线程的非确定性方式
- 但一次只能激活其中一个(应用计数的那个)并且所有其余的要么坏要么绊倒。
如果有中断但没有后续重置,则不需要活动生成实例。
private static class Generation
boolean broken = false;
成员变量
// 用于保护障碍入口的锁
private final ReentrantLock lock = new ReentrantLock();
// 条件对象
private final Condition trip = lock.newCondition();
// 参与线程的数量
private final int parties;
// 由最后一个进入 barrier 的线程执行的操作
private final Runnable barrierCommand;
// 正在等待进入屏障的线程数量
private int count;
核心方法
方法名 | 描述 |
---|---|
int getParties() | 返回触发此障碍所需的参与方数量。 |
int await() throws InterruptedException, BrokenBarrierException | 等待所有线程都在障碍点上调用了await。 如果当前线程不是最后到达的,则出于线程调度目的将其禁用并处于休眠状态,直到发生以下情况之一: 1.最后一个线程到达 2.其他线程中断当前线程 3.其他一些线程中断了其他等待线程之一 4.其他一些线程在等待屏障时超时 5.其他一些线程在此屏障上调用重置 如果当前线程: 1.在进入此方法时设置其中断状态 2.等待时被打断 会抛出 InterruptedException 并清除当前线程的中断状态。 如果在任何线程等待时屏障被重置,或者在调用 await 时或者在任何线程正在等待时屏障被破坏,则抛出 BrokenBarrierException。 如果任何线程在等待时被中断,那么所有其他等待的线程都会抛出 BrokenBarrierException 并且屏障处于损坏状态。 如果当前线程是最后到达的线程,并且在构造函数中提供了非空屏障操作,则当前线程在允许其他线程继续之前运行该操作。 如果在屏障操作期间发生异常,则该异常将在当前线程中传播,并且屏障处于损坏状态。 |
int await(long timeout, TimeUnit unit) throws InterruptedException, BrokenBarrierException, TimeoutException | 在 await() 的基础上增加了等待超时时长,如果指定的等待时间过去,则抛出 TimeoutException。 如果时间小于或等于零,则该方法根本不会等待。 |
boolean isBroken() | 查询此屏障是否处于损坏状态。 |
void reset() | 将屏障重置为其初始状态。 如果任何一方当前在屏障处等待,他们将返回一个 BrokenBarrierException。 请注意,由于其他原因发生破损后的重置可能会很复杂; 线程需要以其他方式重新同步,并选择一个执行重置。 相反,最好为后续使用创建一个新的屏障。 |
int getNumberWaiting() | 返回当前在屏障处等待的参与方数量。 此方法主要用于调试和断言。 |
dowait(boolean timed, long nanos)
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException
// 保存当前锁
final ReentrantLock lock = this.lock;
// 争夺锁
lock.lock();
try
// 保存代
final Generation g = generation;
// 检查屏障是否损坏
if (g.broken)
throw new BrokenBarrierException();
// 检查线程是否中断
if (Thread.interrupted())
// 损坏当前屏障,并且唤醒所有的线程,只有拥有锁的时候才会调用
breakBarrier();
throw new InterruptedException();
// 减少等待进入屏障的线程数
int index = --count;
// index为0,代表定义的所需线程已经都到达屏障
if (index == 0) // tripped
// 执行动作的标识
boolean ranAction = false;
try
// 是否有屏障命令
final Runnable command = barrierCommand;
if (command != null)
command.run();
ranAction = true;
// 此函数在所有线程进入屏障后会被调用,即生成下一个版本,所有线程又可以重新进入到屏障中
nextGeneration();
return 0;
finally
// 若屏障命令执行失败 需要损坏当前屏障
if (!ranAction)
breakBarrier();
// loop until tripped, broken, interrupted, or timed out
// 无限循环-直到需要出现以下情况之一:
// 1.线程都到达屏障
// 2.屏障损坏
// 3.中断
// 4.等待超时
for (;;)
try
// 没有设置等待时间-一直等
if (!timed)
trip.await();
// 设置等待时间且没超时,调用超时的等待
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos);
catch (InterruptedException ie)
// 若当前线程发生中断,需要损坏当代屏障
if (g == generation && ! g.broken)
breakBarrier();
throw ie;
else
// We're about to finish waiting even if we had not
// been interrupted, so this interrupt is deemed to
// "belong" to subsequent execution.
Thread.currentThread().interrupt();
// 再次检查屏障是否损坏
if (g.broken)
throw new BrokenBarrierException();
// 代数不对 返回下标
if (g != generation)
return index;
// 设置等待时间,并且已经过了超时时间 损坏屏障 抛出超时异常
if (timed && nanos <= 0L)
breakBarrier();
throw new TimeoutException();
finally
lock.unlock();
nextGeneration()
private void nextGeneration()
// 唤醒所有线程
trip.signalAll();
// 重置屏障执行的所需的线程数
count = parties;
// 新生一代
generation = new Generation();
breakBarrier()
private void breakBarrier()
// 当前代屏障损坏标识
generation.broken = true;
// 恢复正在等待进入屏障的线程数量
count = parties;
// 唤醒所有线程
trip.signalAll();
以上是关于JUC系列同步工具类之CyclicBarrier的主要内容,如果未能解决你的问题,请参考以下文章
JUC常用同步工具类——CountDownLatch,CyclicBarrier,Semaphore
JUC——线程同步辅助工具类(Semaphore,CountDownLatch,CyclicBarrier)
JUC并发编程 共享模式之工具 JUC CyclicBarrier(循环栅栏 与CountdownLatch最大的不同是可以重值倒计时) -- CyclicBarrier介绍使用注意事项