J.U.C工具类中的CountDownLatch和CyclicBarrier
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了J.U.C工具类中的CountDownLatch和CyclicBarrier相关的知识,希望对你有一定的参考价值。
讲解CyclicBarrier
API文档是这样介绍的:一个同步辅助类,它允许一组线程互相等待,直到到达某个公共屏障点(common barrier point)。在涉及一组固定大小的线程的程序中,这些线程必须不时地互相等待,此使CyclicBarrier很有用。因为该barrier在释放等待线程后可以重用,所以称它为循环的barrier。
CyclicBarrier 支持一个可选的 Runnable 命令,在一组线程中的最后一个线程到达之后(但在释放所有线程之前),该命令只在每个屏障点运行一次。若在继续所有参与线程之前更新共享状态,此屏障操作 很有用。
通俗点讲:让一组线程到达一个屏障时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有别屏障拦截的线程才会干活。
类图:
上图可知:CyclicBarrier的内部是使用重入锁ReetrantLock和Condition。
构造函数
源码
/**
* Creates a new {@code CyclicBarrier} that will trip when the
* given number of parties (threads) are waiting upon it, and which
* will execute the given barrier action when the barrier is tripped,
* performed by the last thread entering the barrier.
*
* @param parties the number of threads that must invoke {@link #await}
* before the barrier is tripped
* @param barrierAction the command to execute when the barrier is
* tripped, or {@code null} if there is no action
* @throws IllegalArgumentException if {@code parties} is less than 1
*/
public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;
this.barrierCommand = barrierAction;
}
/**
* Creates a new {@code CyclicBarrier} that will trip when the
* given number of parties (threads) are waiting upon it, and
* does not perform a predefined action when the barrier is tripped.
*
* @param parties the number of threads that must invoke {@link #await}
* before the barrier is tripped
* @throws IllegalArgumentException if {@code parties} is less than 1
*/
public CyclicBarrier(int parties) {
this(parties, null);
}
Parties:表示拦截线程的数量。
barrierAction:为CyclicBarrier接受的Runnable命令,用于线程到达屏障时,优先执行barrierAction,用于处理复杂的业务场景.
<br/>
现在我们来看看CyclicBarrier最重要的函数await():
代码:
await()方法:
public int await() throws InterruptedException, BrokenBarrierException {
try {
return dowait(false, 0L);
} catch (TimeoutException toe) {
throw new Error(toe); // cannot happen
}
}
dowait()方法:
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
//获取锁
final ReentrantLock lock = this.lock;
lock.lock();
try {
//分代
final Generation g = generation;
//当前generation“已损坏”,抛出BrokenBarrierException异常
//抛出该异常一般都是某个线程在等待某个处于“断开”状态的CyclicBarrie
if (g.broken)
//当某个线程试图等待处于断开状态的 barrier 时,或者 barrier 进入断开状态而线程处于等待状态时,抛出该异常
throw new BrokenBarrierException();
//如果线程中断,终止CyclicBarrier
if (Thread.interrupted()) {
breakBarrier();
throw new InterruptedException();
}
//进来一个线程 count - 1
int index = --count;
//count == 0 表示所有线程均已到位,触发Runnable任务
if (index == 0) { // tripped
boolean ranAction = false;
try {
final Runnable command = barrierCommand;
//触发任务
if (command != null)
command.run();
ranAction = true;
//唤醒所有等待线程,并更新generation
nextGeneration();
return 0;
} finally {
if (!ranAction)
breakBarrier();
}
}
for (;;) {
try {
//如果不是超时等待,则调用Condition.await()方法等待
if (!timed)
trip.await();
else if (nanos > 0L)
//超时等待,调用Condition.awaitNanos()方法等待
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
if (g == generation && ! g.broken) {
breakBarrier();
throw ie;
} else {
// We‘re about to finish waiting even if we had not
// been interrupted, so this interrupt is deemed to
// "belong" to subsequent execution.
Thread.currentThread().interrupt();
}
}
if (g.broken)
throw new BrokenBarrierException();
//generation已经更新,返回index
if (g != generation)
return index;
//“超时等待”,并且时间已到,终止CyclicBarrier,并抛出异常
if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
//释放锁
lock.unlock();
}
}
我们再看看jdkAPI文档描述:
public int await()
throws InterruptedException,
BrokenBarrierException
在所有 参与者都已经在此 barrier 上调用 await 方法之前,将一直等待。
如果当前线程不是将到达的最后一个线程,出于调度目的,将禁用它,且在发生以下情况之一前,该线程将一直处于休眠状态:
最后一个线程到达;或者
其他某个线程中断当前线程;或者
其他某个线程中断另一个等待线程;或者
其他某个线程在等待 barrier 时超时;或者
其他某个线程在此 barrier 上调用 reset()。
如果当前线程:
在进入此方法时已经设置了该线程的中断状态;或者
在等待时被中断
则抛出 InterruptedException,并且清除当前线程的已中断状态。
如果在线程处于等待状态时 barrier 被 reset(),或者在调用 await 时 barrier 被损坏,抑或任意一个线程正处于等待状态,则抛出 BrokenBarrierException 异常。
如果任何线程在等待时被 中断,则其他所有等待线程都将抛出 BrokenBarrierException 异常,并将 barrier 置于损坏状态。
如果当前线程是最后一个将要到达的线程,并且构造方法中提供了一个非空的屏障操作,则在允许其他线程继续运行之前,当前线程将运行该操作。如果在执行屏障操作过程中发生异常,则该异常将传播到当前线程中,并将 barrier 置于损坏状态。
返回:
到达的当前线程的索引,其中,索引 getParties() - 1 指示将到达的第一个线程,零指示最后一个到达的线程
抛出:
InterruptedException - 如果当前线程在等待时被中断
BrokenBarrierException - 如果 另一个 线程在当前线程等待时被中断或超时,或者重置了 barrier,或者在调用 await 时 barrier 被损坏,抑或由于异常而导致屏障操作(如果存在)失败。
示例:
public class CyclicBarrierTest {
public static CyclicBarrier cyclicBarrier;
public static class WorkThread extends Thread
{
public void run()
{
try {
System.out.println(Thread.currentThread().getName()+"已进入");
cyclicBarrier.await();
System.out.println(Thread.currentThread().getName()+"离开");
} catch (InterruptedException | BrokenBarrierException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
public static void main(String[] args) {
cyclicBarrier=new CyclicBarrier(4,new Runnable() {
public void run()
{
System.out.println("人已经到齐");
}
});
for(int i=0;i<4;i++)
{
new WorkThread().start();
}
}
}
运行结果:
Thread-0已进入
Thread-2已进入
Thread-1已进入
Thread-3已进入
人已经到齐
Thread-3离开
Thread-0离开
Thread-1离开
Thread-2离开
讲解CountDownLatch
看一下JDK文档的描述:
-
一个同步辅助类,在完成一组正在其他线程中执行的操作之前,它允许一个或多个线程一直等待。
-
用给定的计数 初始化 CountDownLatch。由于调用了 countDown() 方法,所以在当前计数到达零之前,await 方法会一直受阻塞。之后,会释放所有等待的线程,await 的所有后续调用都将立即返回。这种现象只出现一次——计数无法被重置。如果需要重置计数,请考虑使用 CyclicBarrier。
-
CountDownLatch 是一个通用同步工具,它有很多用途。将计数 1 初始化的 CountDownLatch 用作一个简单的开/关锁存器,或入口:在通过调用 countDown() 的线程打开入口前,所有调用 await 的线程都一直在入口处等待。用 N 初始化的 CountDownLatch 可以使一个线程在 N 个线程完成某项操作之前一直等待,或者使其在某项操作完成 N 次之前一直等待。
- CountDownLatch 的一个有用特性是,它不要求调用 countDown 方法的线程等到计数到达零时才继续,而在所有线程都能通过之前,它只是阻止任何线程继续通过一个 await。
构造函数
/**
* Constructs a {@code CountDownLatch} initialized with the given count.
*
* @param count the number of times {@link #countDown} must be invoked
* before threads can pass through {@link #await}
* @throws IllegalArgumentException if {@code count} is negative
*/
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
从上述可以看出:count :在线程能通过 await() 之前,必须调用 countDown() 的次数
说到这我们来看看CountDownLatch中的连个方法:countDown()和await();
await()方法:
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
描述:
使当前线程在锁存器倒计数至零之前一直等待,除非线程被 中断。
如果当前计数为零,则此方法立即返回。
如果当前计数大于零,则出于线程调度目的,将禁用当前线程,且在发生以下两种情况之一前,该线程将一直处于休眠状态:
由于调用 countDown() 方法,计数到达零;或者
其他某个线程中断当前线程。
如果当前线程:
在进入此方法时已经设置了该线程的中断状态;或者
在等待时被中断,
则抛出 InterruptedException,并且清除当前线程的已中断状态。
countDown()方法:
public void countDown() {
sync.releaseShared(1);
}
描述:递减锁存器的计数,如果计数到达零,则释放所有等待的线程。
如果当前计数大于零,则将计数减少。如果新的计数为零,出于线程调度目的,将重新启用所有的等待线程。
如果当前计数等于零,则不发生任何操作。
示例:
public class CountDownLatchTest {
private static class WorkThread extends Thread
{
private CountDownLatch cdl;
private int sleepSecond;
public WorkThread(String name, CountDownLatch cdl, int sleepSecond)
{
super(name);
this.cdl = cdl;
this.sleepSecond = sleepSecond;
}
public void run()
{
try
{
System.out.println(this.getName() + "启动了,时间为" + System.currentTimeMillis());
Thread.sleep(sleepSecond * 1000);
cdl.countDown();
System.out.println(this.getName() + "执行完了,时间为" + System.currentTimeMillis());
}
catch (InterruptedException e)
{
e.printStackTrace();
}
}
}
private static class DoneThread extends Thread
{
private CountDownLatch cdl;
public DoneThread(String name, CountDownLatch cdl)
{
super(name);
this.cdl = cdl;
}
public void run()
{
try
{
System.out.println(this.getName() + "要等待了, 时间为" + System.currentTimeMillis());
cdl.await();
System.out.println(this.getName() + "等待完了, 时间为" + System.currentTimeMillis());
}
catch (InterruptedException e)
{
e.printStackTrace();
}
}
}
public static void main(String[] args) throws Exception
{
CountDownLatch cdl = new CountDownLatch(3);
DoneThread dt0 = new DoneThread("DoneThread1", cdl);
DoneThread dt1 = new DoneThread("DoneThread2", cdl);
dt0.start();
dt1.start();
WorkThread wt0 = new WorkThread("WorkThread1", cdl, 2);
WorkThread wt1 = new WorkThread("WorkThread2", cdl, 3);
WorkThread wt2 = new WorkThread("WorkThread3", cdl, 4);
wt0.start();
wt1.start();
wt2.start();
}
}
运行结果:
DoneThread2要等待了, 时间为1529917959491
DoneThread1要等待了, 时间为1529917959491
WorkThread1启动了,时间为1529917959492
WorkThread2启动了,时间为1529917959492
WorkThread3启动了,时间为1529917959492
WorkThread1执行完了,时间为1529917961493
WorkThread2执行完了,时间为1529917962492
WorkThread3执行完了,时间为1529917963492
DoneThread1等待完了, 时间为1529917963492
DoneThread2等待完了, 时间为1529917963492
两者之间的比较
CountDownLatch的计数器只能使用一次,而CyclicBarrier的计数器可以使用reset()方法重置。
以上是关于J.U.C工具类中的CountDownLatch和CyclicBarrier的主要内容,如果未能解决你的问题,请参考以下文章