JUC系列同步工具类之CyclicBarrier

Posted 顧棟

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了JUC系列同步工具类之CyclicBarrier相关的知识,希望对你有一定的参考价值。

同步屏障 CyclicBarrier

文章目录


"循环屏障"是一种同步辅助工具,它允许一组线程相互等待以达到共同的屏障点。 CyclicBarriers 在涉及固定大小的线程组的程序中很有用,这些线程组必须偶尔相互等待。 屏障被称为循环的,因为它可以在等待线程被释放后重新使用。

CyclicBarrier 支持一个可选的 Runnable 命令,该命令在每个屏障点运行一次,在队伍中的最后一个线程到达之后,但在任何线程被释放之前。 此屏障操作对于在任何一方继续之前更新共享状态很有用。

示例

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Map;
import java.util.concurrent.*;

/**
 * 分布计算再合并
 */
public class CyclicBarrierDemo2 implements Runnable 

    /**
     * 当最后一个线程到达屏障点,执行此方法,汇总各个线程的值
     */
    @Override
    public void run() 
        int result = 0;
        // 遍历
        for (Map.Entry<String, Integer> ss : s.entrySet()) 
            result += ss.getValue();
        
        s.put("result", result);
        System.out.println("[" + new SimpleDateFormat("HH:mm:ss").format(new Date()) + "--" + Thread.currentThread().getName() + "] count value [" + result + "].");
    

    /**
     * 定义一个需要4个线程的同步屏障,并在最后一个到达屏障的线程后,执行当前类的run方法
     */
    private final CyclicBarrier c = new CyclicBarrier(4, this);

    /**
     * 4个线程的线程池
     */
    private final Executor executor = Executors.newFixedThreadPool(4);

    /**
     * 用来存放各个线程运算结果的值
     */
    private final ConcurrentHashMap<String, Integer> s = new ConcurrentHashMap<>();

    /**
     * 计算api 4个线程分别向变量s中塞值
     */
    private void count() 
        for (int i = 0; i < 4; i++) 
            int finalI = i;
            executor.execute(() -> 
                System.out.println("[" + new SimpleDateFormat("HH:mm:ss").format(new Date()) + "--" + Thread.currentThread().getName() + "] put value [" + finalI + "] and await.");
                s.put(Thread.currentThread().getName(), finalI);
                try 
                    c.await();
                 catch (InterruptedException | BrokenBarrierException e) 
                    e.printStackTrace();
                
            );
        
    

    public static void main(String[] args) 
        CyclicBarrierDemo2 cyclicBarrierDemo2 = new CyclicBarrierDemo2();
        cyclicBarrierDemo2.count();
    


执行结果

[16:51:15--pool-1-thread-1] put value [0] and await.
[16:51:15--pool-1-thread-3] put value [2] and await.
[16:51:15--pool-1-thread-4] put value [3] and await.
[16:51:15--pool-1-thread-2] put value [1] and await.
[16:51:15--pool-1-thread-2] count value [6].

核心思想

内部组合了非公平策略的重入锁,借助AQS实现线程的阻塞和唤醒,主要依赖条件队列。关于AQS的Condition

组成

构造函数

    // 创建一个新的 CyclicBarrier,它将在给定数量的参与方(线程)等待它时触发,并且在触发障碍时不执行预定义的操作。 parties代表线程数
	public CyclicBarrier(int parties) 
        this(parties, null);
    

	// 创建一个新的 CyclicBarrier,当给定数量的参与方(线程)正在等待它时,它将触发,并且当障碍被触发时,它将执行给定的屏障动作,由最后一个进入屏障的线程执行。barrierAction代表当屏障被触发时执行的命令,如果没有动作则为 null
    public CyclicBarrier(int parties, Runnable barrierAction) 
        // 参与者必须大于0
        if (parties <= 0) throw new IllegalArgumentException();
        this.parties = parties;
        this.count = parties;
        this.barrierCommand = barrierAction;
    

内部类 Generation

屏障的每次使用都表示为一个生成实例。每当栅栏被触发或重置时,实例就会发生变化。可以有许多代(轮次)与使用屏障的线程相关联

  • 由于锁定可能分配给等待线程的非确定性方式
  • 但一次只能激活其中一个(应用计数的那个)并且所有其余的要么坏要么绊倒。

如果有中断但没有后续重置,则不需要活动生成实例。

    private static class Generation 
        boolean broken = false;
    

成员变量

// 用于保护障碍入口的锁
private final ReentrantLock lock = new ReentrantLock();
// 条件对象
private final Condition trip = lock.newCondition();
// 参与线程的数量
private final int parties;
// 由最后一个进入 barrier 的线程执行的操作
private final Runnable barrierCommand;
// 正在等待进入屏障的线程数量
private int count;

核心方法

方法名描述
int getParties()返回触发此障碍所需的参与方数量。
int await() throws InterruptedException, BrokenBarrierException等待所有线程都在障碍点上调用了await。
如果当前线程不是最后到达的,则出于线程调度目的将其禁用并处于休眠状态,直到发生以下情况之一:
1.最后一个线程到达
2.其他线程中断当前线程
3.其他一些线程中断了其他等待线程之一
4.其他一些线程在等待屏障时超时
5.其他一些线程在此屏障上调用重置
如果当前线程:
1.在进入此方法时设置其中断状态
2.等待时被打断
会抛出 InterruptedException 并清除当前线程的中断状态。

如果在任何线程等待时屏障被重置,或者在调用 await 时或者在任何线程正在等待时屏障被破坏,则抛出 BrokenBarrierException。

如果任何线程在等待时被中断,那么所有其他等待的线程都会抛出 BrokenBarrierException 并且屏障处于损坏状态
如果当前线程是最后到达的线程,并且在构造函数中提供了非空屏障操作,则当前线程在允许其他线程继续之前运行该操作。 如果在屏障操作期间发生异常,则该异常将在当前线程中传播,并且屏障处于损坏状态。
int await(long timeout, TimeUnit unit) throws InterruptedException, BrokenBarrierException, TimeoutException在 await() 的基础上增加了等待超时时长,如果指定的等待时间过去,则抛出 TimeoutException。 如果时间小于或等于零,则该方法根本不会等待。
boolean isBroken()查询此屏障是否处于损坏状态。
void reset()将屏障重置为其初始状态。 如果任何一方当前在屏障处等待,他们将返回一个 BrokenBarrierException。 请注意,由于其他原因发生破损后的重置可能会很复杂; 线程需要以其他方式重新同步,并选择一个执行重置。 相反,最好为后续使用创建一个新的屏障。
int getNumberWaiting()返回当前在屏障处等待的参与方数量。 此方法主要用于调试和断言。

dowait(boolean timed, long nanos)

    private int dowait(boolean timed, long nanos)
        throws InterruptedException, BrokenBarrierException,
               TimeoutException 
        // 保存当前锁
        final ReentrantLock lock = this.lock;
        // 争夺锁
        lock.lock();
        try 
            // 保存代
            final Generation g = generation;

            // 检查屏障是否损坏
            if (g.broken)
                throw new BrokenBarrierException();

            // 检查线程是否中断
            if (Thread.interrupted()) 
                // 损坏当前屏障,并且唤醒所有的线程,只有拥有锁的时候才会调用
                breakBarrier();
                throw new InterruptedException();
            

            // 减少等待进入屏障的线程数
            int index = --count;
            // index为0,代表定义的所需线程已经都到达屏障
            if (index == 0)   // tripped
                // 执行动作的标识
                boolean ranAction = false;
                try 
                    // 是否有屏障命令
                    final Runnable command = barrierCommand;
                    if (command != null)
                        command.run();
                    ranAction = true;
                    // 此函数在所有线程进入屏障后会被调用,即生成下一个版本,所有线程又可以重新进入到屏障中
                    nextGeneration();
                    return 0;
                 finally 
                    // 若屏障命令执行失败 需要损坏当前屏障
                    if (!ranAction)
                        breakBarrier();
                
            

            // loop until tripped, broken, interrupted, or timed out
            // 无限循环-直到需要出现以下情况之一:
            // 1.线程都到达屏障
            // 2.屏障损坏
            // 3.中断
            // 4.等待超时
            for (;;) 
                try 
                    // 没有设置等待时间-一直等
                    if (!timed)
                        trip.await();
                    // 设置等待时间且没超时,调用超时的等待
                    else if (nanos > 0L)
                        nanos = trip.awaitNanos(nanos);
                 catch (InterruptedException ie) 
                    // 若当前线程发生中断,需要损坏当代屏障
                    if (g == generation && ! g.broken) 
                        breakBarrier();
                        throw ie;
                     else 
                        // We're about to finish waiting even if we had not
                        // been interrupted, so this interrupt is deemed to
                        // "belong" to subsequent execution.
                        Thread.currentThread().interrupt();
                    
                
                // 再次检查屏障是否损坏
                if (g.broken)
                    throw new BrokenBarrierException();
                // 代数不对 返回下标
                if (g != generation)
                    return index;

                // 设置等待时间,并且已经过了超时时间 损坏屏障 抛出超时异常
                if (timed && nanos <= 0L) 
                    breakBarrier();
                    throw new TimeoutException();
                
            
         finally 
            lock.unlock();
        
    

nextGeneration()

    private void nextGeneration() 
        // 唤醒所有线程
        trip.signalAll();
        // 重置屏障执行的所需的线程数
        count = parties;
        // 新生一代
        generation = new Generation();
    

breakBarrier()

    private void breakBarrier() 
        // 当前代屏障损坏标识
        generation.broken = true;
        // 恢复正在等待进入屏障的线程数量
        count = parties;
        // 唤醒所有线程
        trip.signalAll();
    

以上是关于JUC系列同步工具类之CyclicBarrier的主要内容,如果未能解决你的问题,请参考以下文章

JUC系列同步工具类之ThreadLocal

Java多线程同步工具类之CyclicBarrier

Java并发工具类之同步屏障CyclicBarrier

JUC常用同步工具类——CountDownLatch,CyclicBarrier,Semaphore

JUC——线程同步辅助工具类(Semaphore,CountDownLatch,CyclicBarrier)

JUC并发编程 共享模式之工具 JUC CyclicBarrier(循环栅栏 与CountdownLatch最大的不同是可以重值倒计时) -- CyclicBarrier介绍使用注意事项