Java Review - 并发编程_ 回环屏障CyclicBarrier原理&源码剖析
Posted 小小工匠
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Java Review - 并发编程_ 回环屏障CyclicBarrier原理&源码剖析相关的知识,希望对你有一定的参考价值。
文章目录
Pre
Java Review - 并发编程_ CountDownLatch原理&源码剖析介绍的CountDownLatch在解决多个线程同步方面相对于调用线程的join方法已经有了不少优化,但是CountDownLatch的计数器是一次性的,也就是等到计数器值变为0后,再调用CountDownLatch的await和countdown方法都会立刻返回,这就起不到线程同步的效果了。
所以为了满足计数器可以重置的需要,JDK开发组提供了CyclicBarrier类,并且CyclicBarrier类的功能并不限于CountDownLatch的功能。从字面意思理解,CyclicBarrier是回环屏障的意思,它可以让一组线程全部达到一个状态后再全部同时执行。
这里之所以叫作回环是因为当所有等待线程执行完毕,并重置CyclicBarrier的状态后它可以被重用。
之所以叫作屏障是因为线程调用await方法后就会被阻塞,这个阻塞点就称为屏障点,等所有线程都调用了await方法后,线程们就会冲破屏障,继续向下运行。
小Demo
需求如下: 使用两个线程去执行一个被分解的任务A,当两个线程把自己的任务都执行完毕后再对它们的结果进行汇总处理。
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* @author 小工匠
* @version 1.0
* @description: TODO
* @date 2021/12/18 15:01
* @mark: show me the code , change the world
*/
public class CycleBarrierTest
// 创建一个CycleBarrier实例,添加一个所有子线程全部到达屏障后的执行的任务
private static CyclicBarrier cyclicBarrier = new CyclicBarrier(2, () -> System.out.println(Thread.currentThread().getName() + " merge result"));
public static void main(String[] args)
// 创建一个线程数量固定为2的线程池
ExecutorService executorService = Executors.newFixedThreadPool(2);
// 将线程A 提交到线程池
executorService.submit(() ->
try
System.out.println(Thread.currentThread().getName() + " begin to handle task1-1");
System.out.println(Thread.currentThread().getName() + " enter into cyclicBarrier");
cyclicBarrier.await();
System.out.println(Thread.currentThread().getName() + " enter out cyclicBarrier");
catch (InterruptedException e)
e.printStackTrace();
catch (BrokenBarrierException e)
e.printStackTrace();
);
// 将线程B 提交到线程池
executorService.submit(() ->
try
System.out.println(Thread.currentThread().getName() + " begin to handle task1-2");
System.out.println(Thread.currentThread().getName() + " enter into cyclicBarrier");
cyclicBarrier.await();
System.out.println(Thread.currentThread().getName() + " enter out cyclicBarrier");
catch (InterruptedException e)
e.printStackTrace();
catch (BrokenBarrierException e)
e.printStackTrace();
);
// 关闭线程池
executorService.shutdown();
【每次运行的结果可能不尽相同,但核心的流程是一致的】
-
首先创建了一个CyclicBarrier对象,其第一个参数为计数器初始值,第二个参数Runable是当计数器值为0时需要执行的任务。
-
在main函数里面首先创建了一个大小为2的线程池,然后添加两个子任务到线程池,每个子任务在执行完自己的逻辑后会调用await方法。一开始计数器值为2,当第一个线程调用await方法时,计数器值会递减为1。
-
由于此时计数器值不为0,所以当前线程就到了屏障点而被阻塞。然后第二个线程调用await时,会进入屏障,计数器值也会递减.
-
现在计数器值为0,这时就会去执行CyclicBarrier构造函数中的任务,执行完毕后退出屏障点,并且唤醒被阻塞的第二个线程,这时候第一个线程也会退出屏障点继续向下运行。
由此可见多个线程之间是相互等待的,假如计数器值为N,那么随后调用await方法的N-1个线程都会因为到达屏障点而被阻塞,当第N个线程调用await后,计数器值为0了,这时候第N个线程才会发出通知唤醒前面的N-1个线程。也就是当全部线程都到达屏障点时才能一块继续向下执行。
对于这个例子来说,使用CountDownLatch也可以得到类似的输出结果。下面再举个例子来说明CyclicBarrier的可复用性。
需求: 假设一个任务由阶段1、阶段2和阶段3组成,每个线程要串行地执行阶段1、阶段2和阶段3,当多个线程执行该任务时,必须要保证所有线程的阶段1全部完成后才能进入阶段2执行,当所有线程的阶段2全部完成后才能进入阶段3执行。
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* @author 小工匠
* @version 1.0
* @description: TODO
* @date 2021/12/18 15:23
* @mark: show me the code , change the world
*/
public class CycleBarrierTest2
// 创建一个CycleBarrier实例,添加一个所有子线程全部到达屏障后的执行的任务
private static CyclicBarrier cyclicBarrier = new CyclicBarrier(2, () -> System.out.println(Thread.currentThread().getName() + " 阶段任务全部线程执行结束....开启下一轮"));
public static void main(String[] args)
// 创建一个线程数量固定为2的线程池
ExecutorService executorService = Executors.newFixedThreadPool(2);
// 将线程A 提交到线程池
executorService.submit(() ->
try
System.out.println(Thread.currentThread().getName() + " execute step1");
cyclicBarrier.await();
System.out.println(Thread.currentThread().getName() + " execute step2");
cyclicBarrier.await();
System.out.println(Thread.currentThread().getName() + " execute step3");
catch (InterruptedException e)
e.printStackTrace();
catch (BrokenBarrierException e)
e.printStackTrace();
);
// 将线程B 提交到线程池
executorService.submit(() ->
try
System.out.println(Thread.currentThread().getName() + " execute step1");
cyclicBarrier.await();
System.out.println(Thread.currentThread().getName() + " execute step2");
cyclicBarrier.await();
System.out.println(Thread.currentThread().getName() + " execute step3");
catch (InterruptedException e)
e.printStackTrace();
catch (BrokenBarrierException e)
e.printStackTrace();
);
// 关闭线程池
executorService.shutdown();
在如上代码中,每个子线程在执行完阶段1后都调用了await方法,等到所有线程都到达屏障点后才会一块往下执行,这就保证了所有线程都完成了阶段1后才会开始执行阶段2。然后在阶段2后面调用了await方法,这保证了所有线程都完成了阶段2后,才能开始阶段3的执行。这个功能使用单个CountDownLatch是无法完成的。
类图结构
由以上类图可知:
-
CyclicBarrier基于独占锁实现,本质底层还是基于AQS的。
-
parties用来记录线程个数,这里表示多少线程调用await后,所有线程才会冲破屏障继续往下运行
-
count一开始等于parties,每当有线程调用await方法就递减1,当count为0时就表示所有线程都到了屏障点
为何维护parties和count两个变量,只使用count不就可以了?
别忘了CycleBarier是可以被复用的,使用两个变量的原因是,parties始终用来记录总的线程个数,当count计数器值变为0后,会将parties的值赋给count,从而进行复用。这两个变量是在构造CyclicBarrier对象时传递的
/**
* Creates a new @code CyclicBarrier that will trip when the
* given number of parties (threads) are waiting upon it, and which
* will execute the given barrier action when the barrier is tripped,
* performed by the last thread entering the barrier.
*
* @param parties the number of threads that must invoke @link #await
* before the barrier is tripped
* @param barrierAction the command to execute when the barrier is
* tripped, or @code null if there is no action
* @throws IllegalArgumentException if @code parties is less than 1
*/
public CyclicBarrier(int parties, Runnable barrierAction)
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;
this.barrierCommand = barrierAction;
-
还有一个变量barrierCommand也通过构造函数传递,这是一个任务,这个任务的执行时机是当所有线程都到达屏障点后。使用lock首先保证了更新计数器count的原子性。另外使用lock的条件变量trip支持线程间使用await和signal操作进行同步。
-
最后,在变量generation内部有一个变量broken,其用来记录当前屏障是否被打破。注意,这里的broken并没有被声明为volatile的,因为是在锁内使用变量,所以不需要声明。
/**
* Each use of the barrier is represented as a generation instance.
* The generation changes whenever the barrier is tripped, or
* is reset. There can be many generations associated with threads
* using the barrier - due to the non-deterministic way the lock
* may be allocated to waiting threads - but only one of these
* can be active at a time (the one to which @code count applies)
* and all the rest are either broken or tripped.
* There need not be an active generation if there has been a break
* but no subsequent reset.
*/
private static class Generation
boolean broken = false;
CyclicBarrier核心方法源码解读
int await()
当前线程调用CyclicBarrier的该方法时会被阻塞,直到满足下面条件之一才会返回
-
parties个线程都调用了await()方法,也就是线程都到了屏障点
-
其他线程调用了当前线程的interrupt()方法中断了当前线程,则当前线程会抛出InterruptedException异常而返回
-
与当前屏障点关联的Generation对象的broken标志被设置为true时,会抛出BrokenBarrierException异常,然后返回
public int await() throws InterruptedException, BrokenBarrierException
try
return dowait(false, 0L);
catch (TimeoutException toe)
throw new Error(toe); // cannot happen
通过源码可以知道,在内部调用了dowait方法。第一个参数为false则说明不设置超时时间,这时候第二个参数没有意义
int await(long timeout, TimeUnit unit)
当前线程调用CyclicBarrier的该方法时会被阻塞,直到满足下面条件之一才会返回:
-
parties个线程都调用了await()方法,也就是线程都到了屏障点,这时候返回true;
-
设置的超时时间到了后返回false;
-
其他线程调用当前线程的interrupt()方法中断了当前线程,则当前线程会抛出InterruptedException异常然后返回;
-
与当前屏障点关联的Generation对象的broken标志被设置为true时,会抛出BrokenBarrierException异常,然后返回。
public int await(long timeout, TimeUnit unit)
throws InterruptedException,
BrokenBarrierException,
TimeoutException
return dowait(true, unit.toNanos(timeout));
在内部调用了dowait方法。第一个参数为true则说明设置了超时时间,这时候第二个参数是超时时间。
int dowait(boolean timed, long nanos)
CyclicBarrier的核心功能
/**
* Main barrier code, covering the various policies.
*/
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException
final ReentrantLock lock = this.lock;
lock.lock();
try
final Generation g = generation;
if (g.broken)
throw new BrokenBarrierException();
if (Thread.interrupted())
breakBarrier();
throw new InterruptedException();
int index = --count;
// 1 如果index=0说明到所有线程都到达了屏障点,此时执行初始化时执行的任务
if (index == 0) // tripped
boolean ranAction = false;
try
final Runnable command = barrierCommand;
if (command != null)
// 2 执行任务
command.run();
ranAction = true;
// 3 激活其他线程因为await方法而被阻塞的线程,并重置CyclicBarrier
nextGeneration();
return 0;
finally
if (!ranAction)
breakBarrier();
// 4 如果index !=0
// loop until tripped, broken, interrupted, or timed out
for (;;)
try
// 5 没有设置超时时间
if (!timed)
trip.await();
// 6 设置超时时间
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos);
catch (InterruptedException ie)
if (g == generation && ! g.broken)
breakBarrier();
throw ie;
else
// We're about to finish waiting even if we had not
// been interrupted, so this interrupt is deemed to
// "belong" to subsequent execution.
Thread.currentThread().interrupt();
if (g.broken)
throw new BrokenBarrierException();
if (g != generation)
return index;
if (timed && nanos <= 0L)
breakBarrier();
throw new TimeoutException();
finally
lock.unlock();
/**
* Updates state on barrier trip and wakes up everyone.
* Called only while holding lock.
*/
private void nextGeneration()
// 7 从条件队列里唤醒里面的阻塞贤臣
// signal completion of last generation
trip.signalAll();
// 8 重置CyclicBarrier
// set up next generation
count = parties;
generation = new Generation();
以上是dowait方法的核心代码
-
当一个线程调用了dowait方法后,首先会获取独占锁lock,如果创建CycleBarrier时传递的参数为10,那么后面9个调用线程会被阻塞。
-
然后当前获取到锁的线程会对计数器count进行递减操作,递减后count=index=9,因为index!=0所以当前线程会执行代码(4)
-
如果当前线程调用的是无参数的await()方法,则这里timed=false,所以当前线程会被放入条件变量trip的条件阻塞队列,当前线程会被挂起并释放获取的lock锁。
-
如果调用的是有参数的await方法则timed=true,然后当前线程也会被放入条件变量的条件队列并释放锁资源,不同的是当前线程会在指定时间超时后自动被激活。
-
当第一个获取锁的线程由于被阻塞释放锁后,被阻塞的9个线程中有一个会竞争到lock锁,然后执行与第一个线程同样的操作,直到最后一个线程获取到lock锁,此时已经有9个线程被放入了条件变量trip的条件队列里面。
-
最后count=index等于0,所以执行代码(2),如果创建CyclicBarrier时传递了任务,则在其他线程被唤醒前先执行任务,任务执行完毕后再执行代码(3),唤醒其他9个线程,并重置CyclicBarrier,然后这10个线程就可以继续向下运行了。
小结
我们这里通过Demo说明了CycleBarrier与CountDownLatch的不同在于,CycleBarrier是可以复用的,并且CycleBarrier特别适合分段任务有序执行的场景。
然后分析了CycleBarrier通过独占锁ReentrantLock实现计数器原子性更新,并使用条件变量队列来实现线程同步。
以上是关于Java Review - 并发编程_ 回环屏障CyclicBarrier原理&源码剖析的主要内容,如果未能解决你的问题,请参考以下文章
Java Review - 并发编程_ 回环屏障CyclicBarrier原理&源码剖析