concurrent同步屏障 CyclicBarrier & 源码分析
Posted 人生如逆旅,我亦是行人。
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了concurrent同步屏障 CyclicBarrier & 源码分析相关的知识,希望对你有一定的参考价值。
参考文档:
Java多线程系列--“JUC锁”10之 CyclicBarrier原理和示例:https://www.cnblogs.com/skywang12345/p/3533995.html
简介
CyclicBarrier是一个同步辅助类,允许一组线程互相等待,直到到达某个公共屏障点 (common barrier point)。因为该 barrier 在释放等待线程后可以重用,所以称它为循环的 barrier。基于ReentrantLock实现
举个栗子
/** * 简单模拟一下对战平台中玩家需要完全准备好了,才能进入游戏的场景。 * * @author BFD_526 * */ public class CyclicBarrierTest { public static void main(String[] args) { test(); } // 同步屏障 static void test() { ExecutorService service = Executors.newFixedThreadPool(5); CyclicBarrier barrier = new CyclicBarrier(5); for (int i = 0; i < 5; i++) { service.execute(new Player("玩家" + i, barrier)); } service.shutdown(); } // 同步屏障重置 static void test1() { ExecutorService service = Executors.newFixedThreadPool(5); CyclicBarrier barrier = new CyclicBarrier(5); for (int i = 0; i < 5; i++) { service.execute(new Player("玩家" + i, barrier)); } for (int i = 5; i < 10; i++) { service.execute(new Player("玩家" + i, barrier)); } service.shutdown(); } // 在同步屏障结束后,启动优先线程 static void test2() { ExecutorService service = Executors.newFixedThreadPool(5); CyclicBarrier ba = new CyclicBarrier(5, new Runnable() { @Override public void run() { System.out.println("所有玩家已就位"); } }); for (int i = 0; i < 5; i++) { service.execute(new Player("玩家" + i, ba)); } } } class Player implements Runnable { private final String name; private final CyclicBarrier barrier; public Player(String name, CyclicBarrier barrier) { this.name = name; this.barrier = barrier; } public void run() { try { TimeUnit.SECONDS.sleep(1 + (new Random().nextInt(3))); System.out.println(name + "已准备,等待其他玩家准备..."); barrier.await(); TimeUnit.SECONDS.sleep(1 + (new Random().nextInt(3))); System.out.println(name + "已加入游戏"); } catch (InterruptedException e) { System.out.println(name + "离开游戏"); } catch (BrokenBarrierException e) { System.out.println(name + "离开游戏"); } } }
源码分析
函数列表
CyclicBarrier(int parties):创建一个新的 CyclicBarrier,它将在给定数量的参与者(线程)处于等待状态时启动,但它不会在启动 barrier 时执行预定义的操作
CyclicBarrier(int parties, Runnable barrierAction):创建一个新的 CyclicBarrier,它将在给定数量的参与者(线程)处于等待状态时启动,并在启动 barrier 时执行给定的屏障操作,该操作由最后一个进入 barrier 的线程执行
int await():在所有参与者都已经在此 barrier 上调用 await 方法之前,将一直等待
int await(long timeout, TimeUnit unit):在所有参与者都已经在此屏障上调用 await 方法之前将一直等待,或者超出了指定的等待时间
int getNumberWaiting():返回当前在屏障处等待的参与者数目
int getParties():返回要求启动此 barrier 的参与者数目
boolean isBroken():查询此屏障是否处于损坏状态
void reset():将屏障重置为其初始状态
await()
public int await() throws InterruptedException, BrokenBarrierException { try { return dowait(false, 0L); } catch (TimeoutException toe) { throw new Error(toe); // cannot happen; } }
private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException { final ReentrantLock lock = this.lock; // 获取“独占锁(lock)” lock.lock(); try { // 保存“当前的generation” final Generation g = generation; // 若“当前generation已损坏”,则抛出异常。 if (g.broken) throw new BrokenBarrierException(); // 如果当前线程被中断,则通过breakBarrier()终止CyclicBarrier,唤醒CyclicBarrier中所有等待线程。 if (Thread.interrupted()) { breakBarrier(); throw new InterruptedException(); } // 将“count计数器”-1 int index = --count; // 如果index=0,则意味着“有parties个线程到达barrier” if (index == 0) { // tripped boolean ranAction = false; try { // 如果barrierCommand不为null,则执行该动作 final Runnable command = barrierCommand; if (command != null) command.run(); ranAction = true; // 唤醒所有等待线程,并更新generation nextGeneration(); return 0; } finally { if (!ranAction) breakBarrier(); } } // 当前线程一直阻塞,直到“有parties个线程到达barrier” 或 “当前线程被中断” 或 “超时”这3者之一发生, // 当前线程才继续执行。 for (;;) { try { // 如果不是“超时等待”,则调用awati()进行等待;否则,调用awaitNanos()进行等待 if (!timed) trip.await(); else if (nanos > 0L) nanos = trip.awaitNanos(nanos); } catch (InterruptedException ie) { // 如果等待过程中,线程被中断,则执行下面的函数 if (g == generation && ! g.broken) { breakBarrier(); throw ie; } else { Thread.currentThread().interrupt(); } } // 如果“当前generation已经损坏”,则抛出异常 if (g.broken) throw new BrokenBarrierException(); // 如果“generation已经换代”,则返回index if (g != generation) return index; // 如果是“超时等待”,并且时间已到,则通过breakBarrier()终止CyclicBarrier,唤醒CyclicBarrier中所有等待线程 if (timed && nanos <= 0L) { breakBarrier(); throw new TimeoutException(); } } } finally { // 释放“独占锁(lock)” lock.unlock(); } }
generation是CyclicBarrier的一个成员变量,它的定义如下:
private Generation generation = new Generation(); private static class Generation { boolean broken = false; }
在CyclicBarrier中,同一批的线程属于同一代,即同一个Generation;CyclicBarrier中通过generation对象,记录属于哪一代
当有parties个线程到达barrier,generation就会被更新换代
换代:
//换代
private void nextGeneration() { trip.signalAll(); count = parties; generation = new Generation(); }
private void breakBarrier() { generation.broken = true; count = parties; trip.signalAll(); }
以上是关于concurrent同步屏障 CyclicBarrier & 源码分析的主要内容,如果未能解决你的问题,请参考以下文章
如何使用 DISPATCH_QUEUE_CONCURRENT 和屏障块复制 FIFO 队列?
java.util.concurrent CyclicBarrier类