PHP并发包5--同步工具CountDownLatchCyclicBarrierSemaphore的实现原理解析
Posted bolne
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了PHP并发包5--同步工具CountDownLatchCyclicBarrierSemaphore的实现原理解析相关的知识,希望对你有一定的参考价值。
前言:
JUC中提供了很多同步工具类,比如CountDownLatch、CyclicBarrier、Semaphore等,都可以作用同步手段来实现多线程之间的同步效果
一、CountDownLatch
1.1、CountDownLatch的使用
CountDownLatch可以理解为是同步计数器,作用是允许一个或多个线程等待其他线程执行完成之后才继续执行,比如打dota、LoL或者王者荣耀时,创建了一个五人房,只有当五个玩家都准备了之后,游戏才能正式开始,否则游戏主线程会一直等待着直到玩家全部准备。在玩家没准备之前,游戏主线程会一直处于等待状态。如果把CountDownLatch比做此场景都话,相当于开始定义了匹配游戏需要5个线程,只有当5个线程都准备完成了之后,主线程才会开始进行匹配操作。
CountDownLatch案例如下:
复制代码
1 public static void countDownLatchTest() throws Exception{
2 CountDownLatch latch = new CountDownLatch(5);//定义了需要达到条件都线程为5个线程
3 new Thread(new Runnable() {
4 @Override
5 public void run() {
6 for (int i=0; i<12; i++){
7 try {
8 Thread.sleep(1000L);
9 } catch (InterruptedException e) {
10 e.printStackTrace();
11 }
12 new Thread(new Runnable() {
13 @Override
14 public void run() {
15 long count = latch.getCount();
16 latch.countDown();/相当于准备游戏成功*/
17 if(count > 0) {
18 System.out.println("线程" + Thread.currentThread().getName() + "组队准备,还需等待" + latch.getCount() + "人准备");
19 }else {
20 System.out.println("线程" + Thread.currentThread().getName() + "组队准备,房间已满不可加入");
21 }
22 }
23 }).start();
24 }
25 }
26 }).start();
27 new Thread(new Runnable() {
28 @Override
29 public void run() {
30 try {
31 System.out.println("游戏房间等待玩家加入...");
32 /一直等待到规定数量到线程都完全准备之后才会继续往下执行/
33 latch.await();
34 System.out.println("游戏房间已锁定...");
35 } catch (InterruptedException e) {
36 e.printStackTrace();
37 }
38 }
39 }).start();
40 System.out.println("等待玩家准备中...");
41 /**一直等待到规定数量到线程都完全准备之后才会继续往下执行/
42 latch.await();
43 System.out.println("游戏匹配中...");
44 }
复制代码
执行结果如下:
复制代码
1 等待玩家准备中...
2 游戏房间等待玩家加入...
3 线程Thread-2组队准备,还需等待4人准备
4 线程Thread-3组队准备,还需等待3人准备
5 线程Thread-4组队准备,还需等待2人准备
6 线程Thread-5组队准备,还需等待1人准备
7 线程Thread-6组队准备,还需等待0人准备
8 游戏匹配中...
9 游戏房间已锁定...
10 线程Thread-7组队准备,房间已满不可加入
11 线程Thread-8组队准备,房间已满不可加入
12 线程Thread-9组队准备,房间已满不可加入
13 线程Thread-10组队准备,房间已满不可加入
14 线程Thread-11组队准备,房间已满不可加入
15 线程Thread-12组队准备,房间已满不可加入
16 线程Thread-13组队准备,房间已满不可加入
复制代码
本案例中有两个线程都调用了latch.await()方法,则这两个线程都会被阻塞,直到条件达成。当5个线程调用countDown方法之后,达到了计数器的要求,则后续再执行countDown方法的效果就无效了,因为CountDownLatch仅一次有效。
1.2、CountDownLatch的实现原理
CountDownLatch的实现原理主要是通过内部类Sync来实现的,内部类Sync是AQS的子类,主要是通过重写AQS的共享式获取和释放同步状态方法来实现的。源码如下:
CountDownLatch初始化时需要定义调用count的次数,然后每调用一次countDown方法都会计数减一,源码如下:
1 public CountDownLatch(int count) {
2 if (count < 0) throw new IllegalArgumentException("count < 0");
3 this.sync = new Sync(count);
4 }
1 public void countDown() {
2 sync.releaseShared(1);
3 }
1 public void await() throws InterruptedException {
2 sync.acquireSharedInterruptibly(1);
3 }
可以看出CountDownLatch的实现逻辑全部都是调用内部类Sync的对应方法实现的,Sync源码如下:
复制代码
1 private static final class Sync extends AbstractQueuedSynchronizer {
2 private static final long serialVersionUID = 4982264981922014374L;
3
4 Sync(int count) {
5 /初始化设置计数值实际就是设置AQS的同步状态值*/
6 setState(count);
7 }
8
9 int getCount() {
10 return getState();
11 }
12
13 protected int tryAcquireShared(int acquires) {
14 return (getState() == 0) ? 1 : -1;
15 }
16
17 /CountDownLatch的countDown方法的实际执行逻辑/
18 protected boolean tryReleaseShared(int releases) {
19 // Decrement count; signal when transition to zero
20 for (;?? {
21 int c = getState();
22 /**当CountDownLatch的计数值为0时,表示倒计数已完成,则直接返回false/
23 if (c == 0)
24 return false;
25 int nextc = c-1;
26 /通过CAS操作来设置同步状态自减1操作*/
27 if (compareAndSetState(c, nextc))
28 /返回当前同步状态值是否为0,只有当状态值为0时才返回true,否则返回false*/
29 return nextc == 0;
30 }
31 }
32 }
复制代码
通过内部类Sync的源码可以分析出,CountDownLatch的实现完整逻辑如下:
1、初始化CountDownLatch实际就是设置了AQS的state为计数的值
2、调用CountDownLatch的countDown方法时实际就是调用AQS的释放同步状态的方法,每调用一次就自减一次state值
3、调用await方法实际就调用AQS的共享式获取同步状态的方法acquireSharedInterruptibly(1),这个方法的实现逻辑就调用子类Sync的tryAcquireShared方法,只有当子类Sync的tryAcquireShared方法返回大于0的值时才算获取同步状态成功,
否则就会一直在死循环中不断重试,直到tryAcquireShared方法返回大于等于0的值,而Sync的tryAcquireShared方法只有当AQS中的state值为0时才会返回1,否则都返回-1,也就相当于只有当AQS的state值为0时,await方法才会执行成功,否则
就会一直处于死循环中不断重试。
总结:
CountDownLatch实际完全依靠AQS的共享式获取和释放同步状态来实现,初始化时定义AQS的state值,每调用countDown实际就是释放一次AQS的共享式同步状态,await方法实际就是尝试获取AQS的同步状态,只有当同步状态值为0时才能获取成功。
二、CyclicBarrier
2.1、CyclicBarrier的使用
CyclicBarrier可以理解为一个循环同步屏障,定义一个同步屏障之后,当一组线程都全部达到同步屏障之前都会被阻塞,直到最后一个线程达到了同步屏障之后才会被打开,其他线程才可继续执行。
还是以dota、LoL和王者荣耀为例,当第一个玩家准备了之后,还需要等待其他4个玩家都准备,游戏才可继续,否则准备的玩家会被一直处于等待状态,只有当最后一个玩家准备了之后,游戏才会继续执行。
CyclicBarrier使用案例如下:
复制代码
1 public static void CyclicBarrierTest() throws Exception {
2 CyclicBarrier barrier = new CyclicBarrier(5);//定义需要达到同步屏障的线程数量
3 for (int i=0;i<12;i++){
4 Thread.sleep(1000L);
5 new Thread(new Runnable() {
6 @Override
7 public void run() {
8 try {
9 System.out.println("线程"+Thread.currentThread().getName()+"组队准备,当前" + (barrier.getNumberWaiting()+1) + "人已准备");
10 barrier.await();/**线程进入等待,直到最后一个线程达到同步屏障*/
11 System.out.println("线程:"+Thread.currentThread().getName()+"开始组队游戏");
12 } catch (InterruptedException e) {
13 e.printStackTrace();
14 } catch (BrokenBarrierException e) {
15 e.printStackTrace();
16 }
17 }
18 }).start();
19 }
20 }
复制代码
执行结果如下:
复制代码
1 线程Thread-0组队准备,当前1人已准备
2 线程Thread-1组队准备,当前2人已准备
3 线程Thread-2组队准备,当前3人已准备
4 线程Thread-3组队准备,当前4人已准备
5 线程Thread-4组队准备,当前5人已准备
6 线程:Thread-4开始组队游戏
7 线程:Thread-0开始组队游戏
8 线程:Thread-1开始组队游戏
9 线程:Thread-2开始组队游戏
10 线程:Thread-3开始组队游戏
11 线程Thread-5组队准备,当前1人已准备
12 线程Thread-6组队准备,当前2人已准备
13 线程Thread-7组队准备,当前3人已准备
14 线程Thread-8组队准备,当前4人已准备
15 线程Thread-9组队准备,当前5人已准备
16 线程:Thread-9开始组队游戏
17 线程:Thread-5开始组队游戏
18 线程:Thread-7开始组队游戏
19 线程:Thread-6开始组队游戏
20 线程:Thread-8开始组队游戏
21 线程Thread-10组队准备,当前1人已准备
22 线程Thread-11组队准备,当前2人已准备
复制代码
本案例中定义了达到同步屏障的线程为5个,每当一个线程调用了barrier.await()方法之后表示该线程已达到屏障,此时当前线程会被阻塞,只有当最后一个线程调用了await方法之后,被阻塞的其他线程才会被唤醒继续执行。
另外CyclicBarrier是循环同步屏障,同步屏障打开之后立马会继续计数,等待下一组线程达到同步屏障。而CountDownLatch仅单次有效。
2.2、CyclicBarrier的实现原理
先看下CyclicBarrier的构造方法
复制代码
1 public CyclicBarrier(int parties) {
2 this(parties, null);
3 }
4
5 public CyclicBarrier(int parties, Runnable barrierAction) {
6 if (parties <= 0) throw new IllegalArgumentException();
7 this.parties = parties;//同步屏障总需线程数
8 this.count = parties;//当前剩余需要达到的线程数
9 this.barrierCommand = barrierAction;
10 }
复制代码
CyclicBarrier的构造方法没有特殊之处,主要是给两个属性parties(总线程数)、count(当前剩余线程数)进行赋值,这里需要两个值的原因是CyclicBarrier提供了重置的功能,当调用reset方法重置时就需要将count值再赋值成parties的值
再看下await方法的实现逻辑
复制代码
1 public int await() throws InterruptedException, BrokenBarrierException {
2 try {
3 //调用私有方法dowait方法
4 return dowait(false, 0L);
5 } catch (TimeoutException toe) {
6 throw new Error(toe); // cannot happen
7 }
8 }
9
10 /**
11 * Main barrier code, covering the various policies.
12 */
13 private int dowait(boolean timed, long nanos)
14 throws InterruptedException, BrokenBarrierException,
15 TimeoutException {
16 //CyclicBarrier有个ReentrantLock属性的lock
17 final ReentrantLock lock = this.lock;
18 //加锁操作
19 lock.lock();
20 try {
21 final Generation g = generation;
22
23 if (g.broken)
24 throw new BrokenBarrierException();
25 //响应线程中断
26 if (Thread.interrupted()) {
27 breakBarrier();
28 throw new InterruptedException();
29 }
30 //count自减操作
31 int index = --count;
32 //判断当前还需达到同步屏障的线程数是否为0
33 if (index == 0) { // tripped
34 boolean ranAction = false;
35 try {
36 //barrierCommand是同步屏障打开之后需要执行的Runnable对象
37 final Runnable command = barrierCommand;
38 if (command != null)
39 //如果Runnable对象不为空直接执行Runnable线程任务
40 command.run();
41 ranAction = true;
42 //本次同步屏障全部达成,唤醒所有线程并开始下一次同步屏障
43 nextGeneration();
44 return 0;
45 } finally {
46 if (!ranAction)
47 breakBarrier();
48 }
49 }
50
51 // loop until tripped, broken, interrupted, or timed out
52 for (;?? {
53 try {
54 if (!timed)
55 //调用Condition对象的await方法使当前线程进入等待状态
56 trip.await();
57 else if (nanos > 0L)
58 nanos = trip.awaitNanos(nanos);
59 } catch (InterruptedException ie) {
60 if (g == generation && ! g.broken) {
61 breakBarrier();
62 throw ie;
63 } else {
64 // We‘re about to finish waiting even if we had not
65 // been interrupted, so this interrupt is deemed to
66 // "belong" to subsequent execution.
67 Thread.currentThread().interrupt();
68 }
69 }
70
71 if (g.broken)
72 throw new BrokenBarrierException();
73
74 if (g != generation)
75 return index;
76
77 if (timed && nanos <= 0L) {
78 breakBarrier();
79 throw new TimeoutException();
80 }
81 }
82 } finally {
83 lock.unlock();
84 }
85 }
86
87 private void nextGeneration() {
88 // signal completion of last generation
89 //唤醒所有线程
90 trip.signalAll();
91 // set up next generation
92 count = parties;
93 generation = new Generation();
94 }
复制代码
从源码可以看出CyclicBarrier的实现原理主要是通过ReentrantLock和Condition来实现的,主要实现流程如下:
1、创建CyclicBarrier时定义了CyclicBarrier对象需要达到的线程数count
2、每当一个线程执行了await方法时,需要先通过ReentrantLock进行加锁操作,然后对count进行自减操作,操作成功则判断当前count是否为0;
3、如果当前count不为0则调用Condition的await方法使当前线程进入等待状态;
4、如果当前count为0则表示同步屏障已经完全,此时调用Condition的signalAll方法唤醒之前所有等待的线程,并开启循环的下一次同步屏障功能;
5、唤醒其他线程之后,其他线程继续执行剩余的逻辑。
2.3、通过Synchronized和wait/notify实现CyclicBarrier
通过分析了解了CyclicBarrier是通过ReentrantLock和Condition来实现的,而ReentrantLock+Condition在使用上基本上等同于Synchronized+wait/notify,既然如此就可以通过Synchronized+wait/notify来自定义一个CyclicBarrier,话不多说,代码如下:
复制代码
public class MyCyclicBarrier {
public MyCyclicBarrier(int parties){
this.parties = parties;
this.count = parties;
}
/**当前剩余数量*/
private int count;
/**设置同数量*/
private int parties;
/**获取当前已准备数量*/
public int getNumberWaiting(){
return parties-count;
}
public int await(){
synchronized (this){
int rest = --count;
if(rest==0){
//当剩余数量为0时表示所有线程达到屏障,则重置同步屏障并唤醒所有线程
this.count = parties;
this.notifyAll();
return 0;
}
try {
//当剩余数量大于0时,线程进入等待状态
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
return count;
}
/**测试 Main 方法*/
public static void main(String[] args) throws InterruptedException {
MyCyclicBarrier barrier = new MyCyclicBarrier(5);//定义需要达到同步屏障的线程数量
for (int i=0;i<12;i++){
Thread.sleep(1000L);
new Thread(new Runnable() {
@Override
public void run() {
try {
System.out.println("线程"+Thread.currentThread().getName()+"组队准备,当前" + (barrier.getNumberWaiting()+1) + "人已准备");
barrier.await();
System.out.println("线程:"+Thread.currentThread().getName()+"开始组队游戏");
} catch (Exception e) {
e.printStackTrace();
}
}
}).start();
}
}
}
复制代码
执行结果如下:
复制代码
1 线程Thread-0组队准备,当前1人已准备
2 线程Thread-1组队准备,当前2人已准备
3 线程Thread-2组队准备,当前3人已准备
4 线程Thread-3组队准备,当前4人已准备
5 线程Thread-4组队准备,当前5人已准备
6 线程:Thread-4开始组队游戏
7 线程:Thread-3开始组队游戏
8 线程:Thread-0开始组队游戏
9 线程:Thread-1开始组队游戏
10 线程:Thread-2开始组队游戏
11 线程Thread-5组队准备,当前1人已准备
12 线程Thread-6组队准备,当前2人已准备
13 线程Thread-7组队准备,当前3人已准备
14 线程Thread-8组队准备,当前4人已准备
15 线程Thread-9组队准备,当前5人已准备
16 线程:Thread-9开始组队游戏
17 线程:Thread-7开始组队游戏
18 线程:Thread-5开始组队游戏
19 线程:Thread-6开始组队游戏
20 线程:Thread-8开始组队游戏
21 线程Thread-10组队准备,当前1人已准备
22 线程Thread-11组队准备,当前2人已准备
复制代码
可以看出实现的效果和CyclicBarrier实现的效果完全一样
三、Semaphore
3.1、Semaphore的使用
Semaphore字面意思是信号量,实际可以看作是一个限流器,初始化Semaphore时就定义好了最大通行证数量,每次调用时调用方法来消耗,业务执行完毕则释放通行证,如果通行证消耗完,再获取通行证时就需要阻塞线程直到有通行证可以获取。
比如银行柜台的窗口,一共有5个窗口可以使用,当窗口都被占用时,后面来的人就需要排队等候,直到有窗口用户办理完业务离开之后后面的人才可继续争取。模拟代码如下:
复制代码
1 public static void semaphoreTest() throws InterruptedException {
2 int count = 5;
3 Semaphore semaphore = new Semaphore(count);
4 System.out.println("初始化" + count + "个银行柜台窗口");
5 for (int i=0;i<10;i++){
6 Thread.sleep(1000L);
7 new Thread(new Runnable() {
8 @Override
9 public void run() {
10 try {
11 System.out.println("用户"+Thread.currentThread().getName()+"占用窗口");
12 semaphore.acquire(1);//获取许可证
13 /**用户办理业务需要消耗一定时间*/
14 System.out.println("用户"+Thread.currentThread().getName()+"开始办理业务");
15 Thread.sleep(5000L);
16 semaphore.release();//释放许可证
17 System.out.println("用户"+Thread.currentThread().getName()+"离开窗口");
18 } catch (Exception e) {
19 e.printStackTrace();
20 }
21 }
22 }).start();
23 }
24 }
复制代码
执行结果如下:
复制代码
1 初始化5个银行柜台窗口
2 用户Thread-0占用窗口
3 用户Thread-0开始办理业务
4 用户Thread-1占用窗口
5 用户Thread-1开始办理业务
6 用户Thread-2占用窗口
7 用户Thread-2开始办理业务
8 用户Thread-3占用窗口
9 用户Thread-3开始办理业务
10 用户Thread-4占用窗口
11 用户Thread-4开始办理业务
12 用户Thread-0离开窗口
13 用户Thread-5占用窗口
14 用户Thread-5开始办理业务
15 用户Thread-1离开窗口
16 用户Thread-6占用窗口
17 用户Thread-6开始办理业务
18 用户Thread-2离开窗口
19 用户Thread-7占用窗口
20 用户Thread-7开始办理业务
21 用户Thread-3离开窗口
22 用户Thread-8占用窗口
23 用户Thread-8开始办理业务
24 用户Thread-4离开窗口
25 用户Thread-9占用窗口
26 用户Thread-9开始办理业务
27 用户Thread-5离开窗口
28 用户Thread-6离开窗口
29 用户Thread-7离开窗口
30 用户Thread-8离开窗口
31 用户Thread-9离开窗口
复制代码
可以看出前5个线程可以直接占用窗口,但是后5个线程需要等待前面的线程离开了窗口之后才可占用窗口。
Semaphore调用acquire方法获取许可证,可以同时获取多个,但是也需要对应的释放多个,否则会造成其他线程获取不到许可证的情况。一旦许可证被消耗完,那么线程就需要被阻塞,直到许可证被释放才可继续执行。
另外Semaphore还具有公平模式和非公平模式两种用法,公平模式则遵循FIFO原则先排队的线程先拿到许可证;非公平模式则自行争取。
3.2、Semaphore实现原理
Semaphore的构造方法
复制代码
1 public Semaphore(int permits) {
2 //非公平模式
3 sync = new NonfairSync(permits);
4 }
5
6 public Semaphore(int permits, boolean fair) {
7 //公平模式
8 sync = fair ? new FairSync(permits) : new NonfairSync(permits);
9 }
复制代码
构造方法只有两个参数,一个是许可证总数量,一个是是否为公平模式;默认是非公平模式
Semaphore的实现全部是通过其内部类Sync来实现了,Sync也是AQS的子类,Semaphore的实现方式基本上和ReentrantLock的实现原理如出一辙。
公平模式实现原理:
复制代码
1 FairSync(int permits) {
2 //初始化AQS的state的值
3 super(permits);
4 }
5
6 protected int tryAcquireShared(int acquires) {
7 for (;?? {
8 //当首节点的后继节点不是当前线程时直接return -1
9 if (hasQueuedPredecessors())
10 return -1;
11 //获取当前AQS的state值
12 int available = getState();
13 //将state减去需要占用的许可证数量得到剩余的许可证数量
14 int remaining = available - acquires;
15 /**
16 * 当remaining<0时返回remaining则表示获取许可证失败,会进入AQS的死循环不停重试
17 * 当remain>0时,并且CAS成功了则返回remaining,表示获取许可证成功了
18 * */
19 if (remaining < 0 ||
20 compareAndSetState(available, remaining))
21 return remaining;
22 }
23 }
复制代码
公平模式就是当当前线程是AQS同步队列首节点的后继节点时才有权利尝试获取共享式的同步状态,并将同步状态值减去需要占用的许可证数量,如果剩余许可证数量小于0则表示获取失败进入AQS的死循环不停重试;
如果许可证数量大于0并且CAS设置成功了,则返回剩余许可证数量表示抢占许可证成功;
非公平模式实现原理:
看我公平模式的实现基本是就可以猜到非公平模式是如何实现的,只是会少了一步判断当前节点是否是首节点的后继节点而已。
复制代码
1 static final class NonfairSync extends Sync {
2 private static final long serialVersionUID = -2694183684443567898L;
3
4 NonfairSync(int permits) {
5 super(permits);
6 }
7
8 protected int tryAcquireShared(int acquires) {
9 return nonfairTryAcquireShared(acquires);
10 }
11 }
12
13 final int nonfairTryAcquireShared(int acquires) {
14 for (;?? {
15 int available = getState();
16 int remaining = available - acquires;
17 if (remaining < 0 ||
18 compareAndSetState(available, remaining))
19 return remaining;
20 }
21 }
复制代码
了解完Semaphore的公平模式和非公平模式的占有许可证的方法,再分析释放许可证的方法,不过可以先自行猜测下会是如何实现的,既然获取许可证是通过state字段不断减少来实现的,那么毫无疑问释放许可证就肯定是不断给state增加来实现的。
释放许可证源码如下:
复制代码
1 /**尝试释放许可证*/
2 protected final boolean tryReleaseShared(int releases) {
3 for (;?? {
4 int current = getState();
5 int next = current + releases;
6 if (next < current) // overflow
7 throw new Error("Maximum permit count exceeded");
8 if (compareAndSetState(current, next))
9 return true;
10 }
11 }
复制代码
Semaphore的释放许可证实际就是调用AQS的共享式释放同步状态的方法,然后调用内部类Sync重写的AQS的tryReleaseShared方法,实现逻辑就是不停CAS设置state的值加上需要释放的数量,直到CAS成功。这里少了AQS的逻辑解析,有兴趣可自行回顾AQS的共享式释放同步状态的实现原理。
四、Extra Knowledge
4.1、CountDownLatch 和 CyclicBarrier的区别?
CountDownLatch和CyclicBarrier实现的效果看似都是某个线程等待一组线程达到条件之后才可继续执行,但是实际上两者存在很多区别。
1、CountDownLatch阻塞的是调用await()的线程,不会阻塞达到条件的线程;CyclicBarrier阻塞的是达到同步屏障的所有线程
2、CountDownLatch采用倒数计数,定义数量之后,每当一个线程达到要求之后就减一;CyclicBarrier是正数计数,当数量达到定义的数量之后就打开同步屏障
3、CountDownLatch仅单次有效,不可重复使用;CyclicBarrir可以循环重复使用
4、CountDownLatch定义的数量和实际线程数无关,可以有一个线程执行多次countDown();CyclicBarrier定义的数量和实际线程数一致,必须由多个线程都达到要求执行才行(线程调用await()方法之后就会被阻塞,想调用多次也不行的)
5、CountDownLatch是通过内部类Sync继承AQS来实现的;CyclicBarrier是通过重入锁ReentrantLock来实现的
6、CountDownLatch不可重置;CyclicBarrier可以调用reset方法进行重置
以上是关于PHP并发包5--同步工具CountDownLatchCyclicBarrierSemaphore的实现原理解析的主要内容,如果未能解决你的问题,请参考以下文章