java并发控制类之CountDownLatchCyclicBarrierSemaphore
Posted Leo Han
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了java并发控制类之CountDownLatchCyclicBarrierSemaphore相关的知识,希望对你有一定的参考价值。
java并发包中提供了CountDownLatch、CyclicBarrier、Semaphore这三个类可以来实现一些线程之间的状态同步。这三个本质上都是基于java并发包的AQS来实现的。
CountDownLatch
首先来看CountDownLatch
,其可可以让一个线程等待其他线程都执行完之后才开始执行。其典型用法如下:
CountDownLatch countDownLatch = new CountDownLatch(3);
new Thread(new Runnable()
@Override
public void run()
System.out.println("start run "+Thread.currentThread().getId());
try
Thread.sleep(3000);
catch(Exception e)
System.out.println("fininsh run "+Thread.currentThread().getId());
countDownLatch.countDown();
).start();
new Thread(new Runnable()
@Override
public void run()
System.out.println("start run "+Thread.currentThread().getId());
try
Thread.sleep(3000);
catch(Exception e)
System.out.println("fininsh run "+Thread.currentThread().getId());
countDownLatch.countDown();
).start();
new Thread(new Runnable()
@Override
public void run()
System.out.println("start run "+Thread.currentThread().getId());
try
Thread.sleep(3000);
catch(Exception e)
System.out.println("fininsh run "+Thread.currentThread().getId());
countDownLatch.countDown();
).start();
countDownLatch.await();
System.out.println("all finish");
可以看到,初始CountDownLatch
计数器设置为3,每调用一次countDownLatch.countDown();
则计数器减 1 ,而调用countDownLatch.await();
时,必须等CountDownLatch的计数器为0,否则将会阻塞等待。
在CountDownLatch
内部,自定义了一个AbstractQueuedSynchronizer
实现类。
public class CountDownLatch
private static final class Sync extends AbstractQueuedSynchronizer
Sync(int count)
setState(count);
int getCount()
return getState();
protected int tryAcquireShared(int acquires)
return (getState() == 0) ? 1 : -1;
protected boolean tryReleaseShared(int releases)
for (;;)
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
...
public CountDownLatch(int count)
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
可以看到,在构造的时候直接将count值设置到了AQS的state上
。
当执行countDown的时候,直接将state-1
,而在执行await时,最终会调用如下
:
protected int tryAcquireShared(int acquires)
return (getState() == 0) ? 1 : -1;
如果state!=0时,则会进入阻塞,否则返回成功,继续执行
‘
CyclicBarrier
CyclicBarrier功能与CountDownLatch类似,但是CyclicBarrier是可以持续使用的,
CountDownLatch在使用完一次之后,state=0之后,后续无法继续使用,而
CyclicBarrier则在使用完之后,可以继续复用。
CyclicBarrier cyclicBarrier = new CyclicBarrier(3);
new Thread(new Runnable()
@Override
public void run()
try
System.out.println("start run "+Thread.currentThread().getId());
cyclicBarrier.await();
catch(Exception e)
).start();
new Thread(new Runnable()
@Override
public void run()
try
System.out.println("start run "+Thread.currentThread().getId());
cyclicBarrier.await();
catch(Exception e)
).start();
cyclicBarrier.await();
System.out.println("all finish");
输出:
start run 11
start run 12
all finish
而CyclicBarrier
部分源码如下:
public class CyclicBarrier
private static class Generation
boolean broken = false;
/** The lock for guarding barrier entry */
private final ReentrantLock lock = new ReentrantLock();
/** Condition to wait on until tripped */
private final Condition trip = lock.newCondition();
/** The number of parties */
private final int parties;
/* The command to run when tripped */
private final Runnable barrierCommand;
/** The current generation */
private Generation generation = new Generation();
private int count;
private void nextGeneration()
// signal completion of last generation
trip.signalAll();
// set up next generation
count = parties;
generation = new Generation();
当我们执行await
方法时:
public int await() throws InterruptedException, BrokenBarrierException
try
return dowait(false, 0L);
catch (TimeoutException toe)
throw new Error(toe); // cannot happen
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException
final ReentrantLock lock = this.lock;
lock.lock();
try
final Generation g = generation;
if (g.broken)
throw new BrokenBarrierException();
if (Thread.interrupted())
breakBarrier();
throw new InterruptedException();
int index = --count;
if (index == 0) // tripped
boolean ranAction = false;
try
final Runnable command = barrierCommand;
if (command != null)
command.run();
ranAction = true;
nextGeneration();
return 0;
finally
if (!ranAction)
breakBarrier();
// loop until tripped, broken, interrupted, or timed out
for (;;)
try
if (!timed)
trip.await();
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos);
catch (InterruptedException ie)
if (g == generation && ! g.broken)
breakBarrier();
throw ie;
else
// We're about to finish waiting even if we had not
// been interrupted, so this interrupt is deemed to
// "belong" to subsequent execution.
Thread.currentThread().interrupt();
if (g.broken)
throw new BrokenBarrierException();
if (g != generation)
return index;
if (timed && nanos <= 0L)
breakBarrier();
throw new TimeoutException();
finally
lock.unlock();
这里在执行dowait
的时候,首先首先会通过lock加锁,然后会对count执行自减操作,如果--count之后为0,则表示所有线程都到达了统一状态,这时候会通过nextGeneration方法,唤醒所有等待线程,同时将count重新设置,而如果--count不为0,这时候,通过lock.Condition让线程阻塞等待
Semaphore
Semaphore为信号量的意思,通过semaphore.acquire()
获取一个资源,semaphore.release();
将资源归还,可以用在限流场景下,比如支持同时并发200,那么信号量设置为200,每次来一个请求通过semaphore.acquire()
获取一个资源,完成之后通过semaphore.release();
将资源归还,如果没有资源,那么semaphore.acquire()
将阻塞直到有资源可用。
以上是关于java并发控制类之CountDownLatchCyclicBarrierSemaphore的主要内容,如果未能解决你的问题,请参考以下文章
『死磕Java并发编程系列』并发编程工具类之CountDownLatch
『死磕Java并发编程系列』并发编程工具类之CountDownLatch
『死磕Java并发编程系列』并发编程工具类之CountDownLatch