CountDownLatchCyclicBarrierSemaphore源码解析
Posted alimayun
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了CountDownLatchCyclicBarrierSemaphore源码解析相关的知识,希望对你有一定的参考价值。
CountDownLatch
1 前言
CountDownLatch
是一种同步辅助工具类,它允许一个或多个线程等待,直到在其他线程中执行的一组操作完成为止。(源码分析基于JDK1.8) CountDownLatch需要用给定的闩锁计数count初始化。await
方法使当前线程阻塞(每执行一次countDown
方法就将闩锁计数减1),直到闩锁计数达到零时(所有因此阻塞等待的线程都)才会被唤醒。CountDownLatch是一次性使用的同步工具,闩锁计数无法重置,如果需要重置计数,可能使用CyclicBarrier
更合适。
2 用法示例
我们需要解析一个Excel里多个sheet的数据,此时可以考虑使用多线程,每个线程解析一个sheet里的数据,等到所有的sheet都解析完之后,程序需要提示解析完成。在这个需求中,要实现主线程等待所有线程完成sheet的解析操作,最简单的做法是使用join()方法。
public class JoinCountDownLatchTest { public static void main(String[] args) throws InterruptedException { Thread parser1 = new Thread(new Runnable() { @Override public void run() { System.out.println("parser1 finish"); } }); Thread parser2 = new Thread(new Runnable() { @Override public void run() { System.out.println("parser2 finish"); } }); parser1.start(); parser2.start(); parser1.join(); parser2.join(); System.out.println("all parser finish"); } }
CountDownLatch
可以实现join类似的功能,但它更强大,它提供了很多API方法,能够实现更精准的控制。
CountDownLatch的构造方法必须传入一个int类型的参数,这个参数作为闩锁的计数器。
CountDownLatch的countDown
和await
方法一般都要配合使用。await
方法(休眠)阻塞当前线程,而每调用一次countDown
方法,闩锁计数就减1,当其减为0时,当前线程就被唤醒、await方法得以返回。
class CountDownLatchTest { static CountDownLatch c = new CountDownLatch(2); public static void main(String[] args) throws InterruptedException { Thread parser1 = new Thread(() -> { System.out.println("parser1 finish"); c.countDown(); }); Thread parser2 = new Thread(new Runnable() { @Override public void run() { System.out.println("parser2 finish"); c.countDown(); } }); parser1.start(); parser2.start(); c.await(); System.out.println("all parser finish"); } }
打印结果:
parser1 finish
parser2 finish
all parser finish
若c.await();
被注释掉,就不能保证打印的先后顺序,输出结果如下:
all parser finish
parser1 finish
parser2 finish
2) 示例2
这里有两个类Driver和Worker,分别表示驱动者、工作者线程。这里使用了两个CountDownLatch对象,第一个表示启动信号,可防止任何工作者线程Worker前进处理,直到驱动者Driver为它们做好准备为止;第二个表示完成信号,允许驱动者Driver等到所有工作者线程Worker都完成任务为止。
class Driver { // ... void main() throws InterruptedException { CountDownLatch startSignal = new CountDownLatch(1); CountDownLatch doneSignal = new CountDownLatch(N); ? for (int i = 0; i < N; ++i) // create and start threads new Thread(new Worker(startSignal, doneSignal)).start(); ? doSomethingElse(); // don‘t let run yet 做准备 startSignal.countDown(); // let all threads proceed doSomethingElse(); doneSignal.await(); // wait for all to finish } } ? class Worker implements Runnable { private final CountDownLatch startSignal; private final CountDownLatch doneSignal; Worker(CountDownLatch startSignal, CountDownLatch doneSignal) { this.startSignal = startSignal; this.doneSignal = doneSignal; } public void run() { try { startSignal.await(); doWork(); doneSignal.countDown(); } catch (InterruptedException ex) {} // return; } ? void doWork() { ... } }
3 实现分析
CountDownLatch的实现主要基于同步器AbstractQueuedSynchronizer
,它利用AQS实现了一个共享锁. CountDownLatch主要有一个Sync
类型成员变量sync, Sync
是继承抽象类AbstractQueuedSynchronizer的静态内部类。
private final Sync sync;
1) 构造方法CountDownLatch(int)
CountDownLatch的构造方法主要是执行this.sync = new Sync(count)
对sync进行实例化, 而Sync(int)
又将父类AbstractQueuedSynchronizer
的实例变量state
设置为指定的count。
public CountDownLatch(int count) { if (count < 0) throw new IllegalArgumentException("count < 0"); this.sync = new Sync(count);//实例化 } Sync(int count) { setState(count);//将`AbstractQueuedSynchronizer`的state设为count }
2) 静态内部类Sync
Sync主要重写了父类的tryAcquireShared
、tryReleaseShared
方法,这两个方法都是实现共享锁所必须重写的相关方法,其作用分别是尝试获取共享状态、尝试释放共享状态,两者刚好配对。
protected int tryAcquireShared(int acquires) { //state为0,闩锁计数为0 ,返回1,获取共享状态成功 //反之闩锁计数不为0,返回-1,获取共享状态失败。 return (getState() == 0) ? 1 : -1; } protected boolean tryReleaseShared(int releases) { // Decrement count; signal when transition to zero for (;;) { int c = getState(); if (c == 0) //state已经为0,非法状态返回false.(只有在已获取锁,即state非零时,才有释放锁的说法) return false; int nextc = c-1; if (compareAndSetState(c, nextc))//cas自旋将state减1 return nextc == 0;//state自减1后为零,返回true,可释放锁。反之返回false,还不能释放锁。 } }
另外Sync还提供了一个方法getCount
,返回当前剩余的闩锁计数,它直接调用父类AQS的getState
实现。
int getCount() { return getState(); }
3) await
await使当前线程休眠等待,直到count减少至0或线程中断。
await调用了AQS的acquireSharedInterruptibly方法,acquireSharedInterruptibly获取共享锁并响应中断。
public void await() throws InterruptedException { sync.acquireSharedInterruptibly(1); }
await(long , TimeUnit )
是await()的超时版本。
public boolean await(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}
我们用以下程序来分析源码,t1 和 t2 负责调用 countDown() 方法,t3 和 t4 调用 await 方法阻塞:
public class CountDownLatchDemo { public static void main(String[] args) { CountDownLatch latch = new CountDownLatch(2); Thread t1 = new Thread(new Runnable() { @Override public void run() { try { Thread.sleep(5000); } catch (InterruptedException ignore) { } // 休息 5 秒后(模拟线程工作了 5 秒),调用 countDown() latch.countDown(); } }, "t1"); Thread t2 = new Thread(new Runnable() { @Override public void run() { try { Thread.sleep(10000); } catch (InterruptedException ignore) { } // 休息 10 秒后(模拟线程工作了 10 秒),调用 countDown() latch.countDown(); } }, "t2"); t1.start(); t2.start(); Thread t3 = new Thread(new Runnable() { @Override public void run() { try { // 阻塞,等待 state 减为 0 latch.await(); System.out.println("线程 t3 从 await 中返回了"); } catch (InterruptedException e) { System.out.println("线程 t3 await 被中断"); Thread.currentThread().interrupt(); } } }, "t3"); Thread t4 = new Thread(new Runnable() { @Override public void run() { try { // 阻塞,等待 state 减为 0 latch.await(); System.out.println("线程 t4 从 await 中返回了"); } catch (InterruptedException e) { System.out.println("线程 t4 await 被中断"); Thread.currentThread().interrupt(); } } }, "t4"); t3.start(); t4.start(); } }
上述程序,大概在过了 10 秒左右的时候,会输出:
线程 t3 从 await 中返回了 线程 t4 从 await 中返回了 // 这两条输出,顺序不是绝对的 // 后面的分析,我们假设 t3 先进入阻塞队列
接下来,我们按照流程一步一步走:先 await 等待,然后被唤醒,await 方法返回。
首先,我们来看 await() 方法,它代表线程阻塞,等待 state 的值减为 0。
public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); // 假设state初始化为2,没有线程countDown()完,那么此时tryAcquireShared一定等于-1 if (tryAcquireShared(arg) < 0) doAcquireSharedInterruptibly(arg); } // 只有当 state == 0 的时候,这个方法才会返回 1 protected int tryAcquireShared(int acquires) { return (getState() == 0) ? 1 : -1; }
从方法名我们就可以看出,这个方法是获取共享锁,并且此方法是可中断的(中断的时候抛出 InterruptedException 退出这个方法)。
private void doAcquireSharedInterruptibly(int arg) throws InterruptedException { // 1. 入队 final Node node = addWaiter(Node.SHARED); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head) { // 同上,只要 state 不等于 0,那么这个方法返回 -1 int r = tryAcquireShared(arg); // r=-1时,这里if不会进入 if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GC failed = false; return; } } // 2. 这和第一篇AQS里面代码一样,修改前驱节点的waitStatus 为-1,同时挂起当前线程 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }
我们来仔细分析这个方法,线程 t3 经过第 1 步 第4行 addWaiter 入队以后,我们应该可以得到这个:
由于 tryAcquireShared 这个方法会返回 -1,所以 if (r >= 0) 这个分支不会进去。到 shouldParkAfterFailedAcquire 的时候,t3 将 head 的 waitStatus 值设置为 -1,如下:
然后进入到 parkAndCheckInterrupt 的时候,t3 挂起。
我们再分析 t4 入队,t4 会将前驱节点 t3 所在节点的 waitStatus 设置为 -1,t4 入队后,应该是这样的:
然后,t4 也挂起。接下来,t3 和 t4 就等待唤醒了。
接下来,我们来看唤醒的流程,我们假设用 10 初始化 CountDownLatch。
4) countDown
countDown将闩锁计数递减1,若递减后为0就将唤醒所有阻塞等待的线程。如果闩锁的计数(递减前)已经为零,就啥也不做,恰好与上面tryReleaseShared
方法体中的if (c == 0) return false;
所对应。
public void countDown() { sync.releaseShared(1); }
public void countDown() { sync.releaseShared(1); } public final boolean releaseShared(int arg) { // 只有当 state 减为 0 的时候,tryReleaseShared 才返回 true // 否则只是简单的 state = state - 1 那么 countDown 方法就结束了 if (tryReleaseShared(arg)) { // 唤醒 await 的线程 doReleaseShared(); return true; } return false; } // 这个方法很简单,用自旋的方法实现 state 减 1 protected boolean tryReleaseShared(int releases) { for (;;) { int c = getState(); if (c == 0) return false; int nextc = c-1; //通过CAS将state的值减1,失败就不会进入return,继续for循环,直至CAS成功 if (compareAndSetState(c, nextc)) //state减到0就返回true,否则返回false return nextc == 0; } }
countDown 方法就是每次调用都将 state 值减 1,如果 state 减到 0 了,那么就调用下面的方法进行唤醒阻塞队列中的线程:
// 调用这个方法的时候,state == 0 private void doReleaseShared() { for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; // t3 入队的时候,已经将头节点的 waitStatus 设置为 Node.SIGNAL(-1) 了 if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // loop to recheck cases // 就是这里,唤醒 head 的后继节点,也就是阻塞队列中的第一个节点 // 在这里,也就是唤醒 t3 , t3的await()方法可以接着运行了 unparkSuccessor(h); } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) // todo continue; // loop on failed CAS } //此时 h == head 说明被唤醒的 t3线程 还没有执行到await()方法中的setHeadAndPropagate(node, r)这一步,则此时循环结束; //如果执行完setHeadAndPropagate(node, r),则head就为t3了,这里的h和head就不相等,会继续循环 if (h == head) // loop if head changed break; } }
一旦 t3 被唤醒后,我们继续回到 await 的这段代码,在第24行代码 parkAndCheckInterrupt 返回继续接着运行,我们先不考虑中断的情况:
private void doAcquireSharedInterruptibly(int arg) throws InterruptedException { final Node node = addWaiter(Node.SHARED); boolean failed = true; try { for (;;) { //p表示当前节点的前驱节点 final Node p = node.predecessor(); //此时被唤醒的是之前head的后继节点,所以此线程的前驱节点是head if (p == head) { //此时state已经为0,r为1 int r = tryAcquireShared(arg); if (r >= 0) { // 2. 这里将唤醒t3的后续节点t4,以此类推,t4被唤醒后,会在t4的await中唤醒t4的后续节点 setHeadAndPropagate(node, r); // 将已经唤醒的t3节点从队列中去除 p.next = null; // help GC failed = false; return; } } if (shouldParkAfterFailedAcquire(p, node) && // 1. 唤醒后这个方法返回 parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }
接下来,t3 会循环一次进到 setHeadAndPropagate(node, r) 这个方法,先把 head 给占了,然后唤醒队列中其他的线程:
private void setHeadAndPropagate(Node node, int propagate) { Node h = head; // Record old head for check below setHead(node); // 下面说的是,唤醒当前 node 之后的节点,即 t3 已经醒了,马上唤醒 t4 // 类似的,如果 t4 后面还有 t5,那么 t4 醒了以后,马上将 t5 给唤醒了 if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) { Node s = node.next; if (s == null || s.isShared()) // 又是这个方法,只是现在的 head 已经不是原来的空节点了,是 t3 的节点了 doReleaseShared(); } }
又回到这个方法了,那么接下来,我们好好分析 doReleaseShared 这个方法,我们根据流程,头节点 head 此时是 t3 节点了:
// 调用这个方法的时候,state == 0 private void doReleaseShared() { for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; // t4 将头节点(此时是 t3)的 waitStatus 设置为 Node.SIGNAL(-1) 了 if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // loop to recheck cases // 就是这里,唤醒 head 的后继节点,也就是阻塞队列中的第一个节点 // 在这里,也就是唤醒 t4 unparkSuccessor(h); } else if (ws == 0 && // 这个 CAS 失败的场景是:执行到这里的时候,刚好有一个节点入队,入队会将这个 ws 设置为 -1 !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS } // 如果到这里的时候,前面唤醒的线程已经占领了 head,那么再循环 // 否则,就是 head 没变,那么退出循环, // 退出循环是不是意味着阻塞队列中的其他节点就不唤醒了?当然不是,唤醒的线程之后还是会在await()方法中调用此方法接着唤醒后续节点,比如现在t4已经被唤醒,然后他会继续setHeadAndProtogate-->doReleaseShared唤醒其他线程 if (h == head) // loop if head changed break; } }
5) getCount
getCount用于查询当前的闩锁计数
public long getCount() { return sync.getCount(); }
总结
总的来说,CountDownLatch 就是线程入队阻塞,依次唤醒的过程
使用过程会执行以下操作:
1.当创建一个CountDownLatch 的实例后,AQS中的state会设置一个正整数
2.一个线程调用await(),当前线程加入到阻塞队列中,当前线程挂起
3.一个线程调用countDown()唤醒方法,state减1,直到state被减为0时,唤醒阻塞队列中第一个等待节点中的线程
4.第一个线程被唤醒后,当前线程继续执行await()方法,将当前线程设置为head,并在此方法中唤醒head的下一个节点,依次类推
1 前言
CountDownLatch
是一种同步辅助工具类,它允许一个或多个线程等待,直到在其他线程中执行的一组操作完成为止。(源码分析基于JDK1.8) CountDownLatch需要用给定的闩锁计数count初始化。await
方法使当前线程阻塞(每执行一次countDown
方法就将闩锁计数减1),直到闩锁计数达到零时(所有因此阻塞等待的线程都)才会被唤醒。CountDownLatch是一次性使用的同步工具,闩锁计数无法重置,如果需要重置计数,可能使用CyclicBarrier
更合适。
2 用法示例
我们需要解析一个Excel里多个sheet的数据,此时可以考虑使用多线程,每个线程解析一个sheet里的数据,等到所有的sheet都解析完之后,程序需要提示解析完成。在这个需求中,要实现主线程等待所有线程完成sheet的解析操作,最简单的做法是使用join()方法。
public class JoinCountDownLatchTest { public static void main(String[] args) throws InterruptedException { Thread parser1 = new Thread(new Runnable() { @Override public void run() { System.out.println("parser1 finish"); } }); Thread parser2 = new Thread(new Runnable() { @Override public void run() { System.out.println("parser2 finish"); } }); parser1.start(); parser2.start(); parser1.join(); parser2.join(); System.out.println("all parser finish"); } }
CountDownLatch
可以实现join类似的功能,但它更强大,它提供了很多API方法,能够实现更精准的控制。
CountDownLatch的构造方法必须传入一个int类型的参数,这个参数作为闩锁的计数器。
CountDownLatch的countDown
和await
方法一般都要配合使用。await
方法(休眠)阻塞当前线程,而每调用一次countDown
方法,闩锁计数就减1,当其减为0时,当前线程就被唤醒、await方法得以返回。
class CountDownLatchTest { static CountDownLatch c = new CountDownLatch(2); public static void main(String[] args) throws InterruptedException { Thread parser1 = new Thread(() -> { System.out.println("parser1 finish"); c.countDown(); }); Thread parser2 = new Thread(new Runnable() { @Override public void run() { System.out.println("parser2 finish"); c.countDown(); } }); parser1.start(); parser2.start(); c.await(); System.out.println("all parser finish"); } }
打印结果:
parser1 finish
parser2 finish
all parser finish
若c.await();
被注释掉,就不能保证打印的先后顺序,输出结果如下:
all parser finish
parser1 finish
parser2 finish
2) 示例2
这里有两个类Driver和Worker,分别表示驱动者、工作者线程。这里使用了两个CountDownLatch对象,第一个表示启动信号,可防止任何工作者线程Worker前进处理,直到驱动者Driver为它们做好准备为止;第二个表示完成信号,允许驱动者Driver等到所有工作者线程Worker都完成任务为止。
class Driver { // ... void main() throws InterruptedException { CountDownLatch startSignal = new CountDownLatch(1); CountDownLatch doneSignal = new CountDownLatch(N); ? for (int i = 0; i < N; ++i) // create and start threads new Thread(new Worker(startSignal, doneSignal)).start(); ? doSomethingElse(); // don‘t let run yet 做准备 startSignal.countDown(); // let all threads proceed doSomethingElse(); doneSignal.await(); // wait for all to finish } } ? class Worker implements Runnable { private final CountDownLatch startSignal; private final CountDownLatch doneSignal; Worker(CountDownLatch startSignal, CountDownLatch doneSignal) { this.startSignal = startSignal; this.doneSignal = doneSignal; } public void run() { try { startSignal.await(); doWork(); doneSignal.countDown(); } catch (InterruptedException ex) {} // return; } ? void doWork() { ... } }
3 实现分析
CountDownLatch的实现主要基于同步器AbstractQueuedSynchronizer
,它利用AQS实现了一个共享锁. CountDownLatch主要有一个Sync
类型成员变量sync, Sync
是继承抽象类AbstractQueuedSynchronizer的静态内部类。
private final Sync sync;
1) 构造方法CountDownLatch(int)
CountDownLatch的构造方法主要是执行this.sync = new Sync(count)
对sync进行实例化, 而Sync(int)
又将父类AbstractQueuedSynchronizer
的实例变量state
设置为指定的count。
public CountDownLatch(int count) { if (count < 0) throw new IllegalArgumentException("count < 0"); this.sync = new Sync(count);//实例化 } Sync(int count) { setState(count);//将`AbstractQueuedSynchronizer`的state设为count }
2) 静态内部类Sync
Sync主要重写了父类的tryAcquireShared
、tryReleaseShared
方法,这两个方法都是实现共享锁所必须重写的相关方法,其作用分别是尝试获取共享状态、尝试释放共享状态,两者刚好配对。
protected int tryAcquireShared(int acquires) { //state为0,闩锁计数为0 ,返回1,获取共享状态成功 //反之闩锁计数不为0,返回-1,获取共享状态失败。 return (getState() == 0) ? 1 : -1; } protected boolean tryReleaseShared(int releases) { // Decrement count; signal when transition to zero for (;;) { int c = getState(); if (c == 0) //state已经为0,非法状态返回false.(只有在已获取锁,即state非零时,才有释放锁的说法) return false; int nextc = c-1; if (compareAndSetState(c, nextc))//cas自旋将state减1 return nextc == 0;//state自减1后为零,返回true,可释放锁。反之返回false,还不能释放锁。 } }
另外Sync还提供了一个方法getCount
,返回当前剩余的闩锁计数,它直接调用父类AQS的getState
实现。
int getCount() { return getState(); }
3) await
await使当前线程休眠等待,直到count减少至0或线程中断。
await调用了AQS的acquireSharedInterruptibly方法,acquireSharedInterruptibly获取共享锁并响应中断。
public void await() throws InterruptedException { sync.acquireSharedInterruptibly(1); }
await(long , TimeUnit )
是await()的超时版本。
public boolean await(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}
我们用以下程序来分析源码,t1 和 t2 负责调用 countDown() 方法,t3 和 t4 调用 await 方法阻塞:
public class CountDownLatchDemo { public static void main(String[] args) { CountDownLatch latch = new CountDownLatch(2); Thread t1 = new Thread(new Runnable() { @Override public void run() { try { Thread.sleep(5000); } catch (InterruptedException ignore) { } // 休息 5 秒后(模拟线程工作了 5 秒),调用 countDown() latch.countDown(); } }, "t1"); Thread t2 = new Thread(new Runnable() { @Override public void run() { try { Thread.sleep(10000); } catch (InterruptedException ignore) { } // 休息 10 秒后(模拟线程工作了 10 秒),调用 countDown() latch.countDown(); } }, "t2"); t1.start(); t2.start(); Thread t3 = new Thread(new Runnable() { @Override public void run() { try { // 阻塞,等待 state 减为 0 latch.await(); System.out.println("线程 t3 从 await 中返回了"); } catch (InterruptedException e) { System.out.println("线程 t3 await 被中断"); Thread.currentThread().interrupt(); } } }, "t3"); Thread t4 = new Thread(new Runnable() { @Override public void run() { try { // 阻塞,等待 state 减为 0 latch.await(); System.out.println("线程 t4 从 await 中返回了"); } catch (InterruptedException e) { System.out.println("线程 t4 await 被中断"); Thread.currentThread().interrupt(); } } }, "t4"); t3.start(); t4.start(); } }
上述程序,大概在过了 10 秒左右的时候,会输出:
线程 t3 从 await 中返回了 线程 t4 从 await 中返回了 // 这两条输出,顺序不是绝对的 // 后面的分析,我们假设 t3 先进入阻塞队列
接下来,我们按照流程一步一步走:先 await 等待,然后被唤醒,await 方法返回。
首先,我们来看 await() 方法,它代表线程阻塞,等待 state 的值减为 0。
public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); // 假设state初始化为2,没有线程countDown()完,那么此时tryAcquireShared一定等于-1 if (tryAcquireShared(arg) < 0) doAcquireSharedInterruptibly(arg); } // 只有当 state == 0 的时候,这个方法才会返回 1 protected int tryAcquireShared(int acquires) { return (getState() == 0) ? 1 : -1; }
从方法名我们就可以看出,这个方法是获取共享锁,并且此方法是可中断的(中断的时候抛出 InterruptedException 退出这个方法)。
private void doAcquireSharedInterruptibly(int arg) throws InterruptedException { // 1. 入队 final Node node = addWaiter(Node.SHARED); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head) { // 同上,只要 state 不等于 0,那么这个方法返回 -1 int r = tryAcquireShared(arg); // r=-1时,这里if不会进入 if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GC failed = false; return; } } // 2. 这和第一篇AQS里面代码一样,修改前驱节点的waitStatus 为-1,同时挂起当前线程 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }
我们来仔细分析这个方法,线程 t3 经过第 1 步 第4行 addWaiter 入队以后,我们应该可以得到这个:
由于 tryAcquireShared 这个方法会返回 -1,所以 if (r >= 0) 这个分支不会进去。到 shouldParkAfterFailedAcquire 的时候,t3 将 head 的 waitStatus 值设置为 -1,如下:
然后进入到 parkAndCheckInterrupt 的时候,t3 挂起。
我们再分析 t4 入队,t4 会将前驱节点 t3 所在节点的 waitStatus 设置为 -1,t4 入队后,应该是这样的:
然后,t4 也挂起。接下来,t3 和 t4 就等待唤醒了。
接下来,我们来看唤醒的流程,我们假设用 10 初始化 CountDownLatch。
4) countDown
countDown将闩锁计数递减1,若递减后为0就将唤醒所有阻塞等待的线程。如果闩锁的计数(递减前)已经为零,就啥也不做,恰好与上面tryReleaseShared
方法体中的if (c == 0) return false;
所对应。
public void countDown() { sync.releaseShared(1); }
public void countDown() { sync.releaseShared(1); } public final boolean releaseShared(int arg) { // 只有当 state 减为 0 的时候,tryReleaseShared 才返回 true // 否则只是简单的 state = state - 1 那么 countDown 方法就结束了 if (tryReleaseShared(arg)) { // 唤醒 await 的线程 doReleaseShared(); return true; } return false; } // 这个方法很简单,用自旋的方法实现 state 减 1 protected boolean tryReleaseShared(int releases) { for (;;) { int c = getState(); if (c == 0) return false; int nextc = c-1; //通过CAS将state的值减1,失败就不会进入return,继续for循环,直至CAS成功 if (compareAndSetState(c, nextc)) //state减到0就返回true,否则返回false return nextc == 0; } }
countDown 方法就是每次调用都将 state 值减 1,如果 state 减到 0 了,那么就调用下面的方法进行唤醒阻塞队列中的线程:
// 调用这个方法的时候,state == 0 private void doReleaseShared() { for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; // t3 入队的时候,已经将头节点的 waitStatus 设置为 Node.SIGNAL(-1) 了 if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // loop to recheck cases // 就是这里,唤醒 head 的后继节点,也就是阻塞队列中的第一个节点 // 在这里,也就是唤醒 t3 , t3的await()方法可以接着运行了 unparkSuccessor(h); } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) // todo continue; // loop on failed CAS } //此时 h == head 说明被唤醒的 t3线程 还没有执行到await()方法中的setHeadAndPropagate(node, r)这一步,则此时循环结束; //如果执行完setHeadAndPropagate(node, r),则head就为t3了,这里的h和head就不相等,会继续循环 if (h == head) // loop if head changed break; } }
一旦 t3 被唤醒后,我们继续回到 await 的这段代码,在第24行代码 parkAndCheckInterrupt 返回继续接着运行,我们先不考虑中断的情况:
private void doAcquireSharedInterruptibly(int arg) throws InterruptedException { final Node node = addWaiter(Node.SHARED); boolean failed = true; try { for (;;) { //p表示当前节点的前驱节点 final Node p = node.predecessor(); //此时被唤醒的是之前head的后继节点,所以此线程的前驱节点是head if (p == head) { //此时state已经为0,r为1 int r = tryAcquireShared(arg); if (r >= 0) { // 2. 这里将唤醒t3的后续节点t4,以此类推,t4被唤醒后,会在t4的await中唤醒t4的后续节点 setHeadAndPropagate(node, r); // 将已经唤醒的t3节点从队列中去除 p.next = null; // help GC failed = false; return; } } if (shouldParkAfterFailedAcquire(p, node) && // 1. 唤醒后这个方法返回 parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }
接下来,t3 会循环一次进到 setHeadAndPropagate(node, r) 这个方法,先把 head 给占了,然后唤醒队列中其他的线程:
private void setHeadAndPropagate(Node node, int propagate) { Node h = head; // Record old head for check below setHead(node); // 下面说的是,唤醒当前 node 之后的节点,即 t3 已经醒了,马上唤醒 t4 // 类似的,如果 t4 后面还有 t5,那么 t4 醒了以后,马上将 t5 给唤醒了 if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) { Node s = node.next; if (s == null || s.isShared()) // 又是这个方法,只是现在的 head 已经不是原来的空节点了,是 t3 的节点了 doReleaseShared(); } }
又回到这个方法了,那么接下来,我们好好分析 doReleaseShared 这个方法,我们根据流程,头节点 head 此时是 t3 节点了:
// 调用这个方法的时候,state == 0 private void doReleaseShared() { for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; // t4 将头节点(此时是 t3)的 waitStatus 设置为 Node.SIGNAL(-1) 了 if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // loop to recheck cases // 就是这里,唤醒 head 的后继节点,也就是阻塞队列中的第一个节点 // 在这里,也就是唤醒 t4 unparkSuccessor(h); } else if (ws == 0 && // 这个 CAS 失败的场景是:执行到这里的时候,刚好有一个节点入队,入队会将这个 ws 设置为 -1 !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS } // 如果到这里的时候,前面唤醒的线程已经占领了 head,那么再循环 // 否则,就是 head 没变,那么退出循环, // 退出循环是不是意味着阻塞队列中的其他节点就不唤醒了?当然不是,唤醒的线程之后还是会在await()方法中调用此方法接着唤醒后续节点,比如现在t4已经被唤醒,然后他会继续setHeadAndProtogate-->doReleaseShared唤醒其他线程 if (h == head) // loop if head changed break; } }
5) getCount
getCount用于查询当前的闩锁计数
public long getCount() { return sync.getCount(); }
总结
总的来说,CountDownLatch 就是线程入队阻塞,依次唤醒的过程
使用过程会执行以下操作:
1.当创建一个CountDownLatch 的实例后,AQS中的state会设置一个正整数
2.一个线程调用await(),当前线程加入到阻塞队列中,当前线程挂起
3.一个线程调用countDown()唤醒方法,state减1,直到state被减为0时,唤醒阻塞队列中第一个等待节点中的线程
4.第一个线程被唤醒后,当前线程继续执行await()方法,将当前线程设置为head,并在此方法中唤醒head的下一个节点,依次类推
CyclicBarrier
字面意思是“可重复使用的栅栏”,CyclicBarrier 相比 CountDownLatch 来说,要简单很多,其源码没有什么高深的地方,它是 ReentrantLock 和 Condition 的组合使用。看如下示意图,CyclicBarrier 和 CountDownLatch 是不是很像,只是 CyclicBarrier 可以有不止一个栅栏,因为它的栅栏(Barrier)可以重复使用(Cyclic)。
示例
1、CyclicBarrier默认的构造方法是CyclicBarrier(int parties),其参数表示屏障拦截的线程数量,每个线程调用await方法告诉CyclicBarrier我已经到达了屏障,然后当前线程被阻塞。
class CyclicBarrierTest { static CyclicBarrier c = new CyclicBarrier(2); ? public static void main(String[] args) { new Thread(() -> { try { System.out.println("子线程到达屏障点"); c.await(); } catch (Exception e) { e.printStackTrace(); } System.out.println("所有线程均到达屏障点后,子线程打印" + 1); }).start(); try { System.out.println("主线程到达屏障点"); c.await(); } catch (Exception e) { e.printStackTrace(); } System.out.println("所有线程均到达屏障点后,主线程打印" + 2); } }
因为主线程和子线程的调度是由CPU决定,所以字符串“所有线程均到达屏障点后,子线程打印1“、”所有线程均到达屏障点后,主线程打印2“的输出先后顺序不固定。但”子(主)线程到达屏障点“ 打印输出一定先于”所有线程均到达屏障点后,子(主)线程打印1(2)“,因为CyclicBarrier规定“只有所有的线程都到达屏障点时,这些被阻塞线程才能继续执行”。
2、另外CyclicBarrier还有一个带有两个参数的构造方法CyclicBarrier(int parties, Runnable barrierAction),一个表示屏障拦截的线程数,另一个是Runnable类型参数barrierAction 。此barrierAction 在最后一个线程到达屏障点之后但在唤醒所有线程之前被执行。换句话说,在线程到达屏障时,优先执行barrierAction 。此屏障操作可用在任何一线程继续执行之前更新共享状态。
class CyclicBarrierTest { static CyclicBarrier c = new CyclicBarrier(2,()->{ String tName= Thread.currentThread().getName(); String gName= Thread.currentThread().getThreadGroup().getName(); System.out.println("Thread ‘" + tName +"‘ in thread group ‘" +gName+"‘ executes barrier action."); }); ? public static void main(String[] args) { new Thread(() -> { try { System.out.println("子线程到达屏障点"); c.await(); } catch (Exception e) { e.printStackTrace(); } System.out.println("所有线程均到达屏障点后,子线程打印" + 1); }).start(); try { System.out.println("主线程到达屏障点"); c.await(); } catch (Exception e) { e.printStackTrace(); } System.out.println("所有线程均到达屏障点后,主线程打印" + 2); } }
可以看出barrierAction先于“xxxxxxx1(2)”输出,再次验证了“在线程到达屏障时,优先执行barrierAction”。
public class CyclicBarrier { // 我们说了,CyclicBarrier 是可以重复使用的,我们把每次从开始使用到穿过栅栏当做"一代" private static class Generation { boolean broken = false; } /** The lock for guarding barrier entry */ private final ReentrantLock lock = new ReentrantLock(); // CyclicBarrier 是基于 Condition 的 // Condition 是“条件”的意思,CyclicBarrier 的等待线程通过 barrier 的“条件”是大家都到了栅栏上 private final Condition trip = lock.newCondition(); // 参与的线程数 private final int parties; // 如果设置了这个,代表越过栅栏之前,要执行相应的操作 private final Runnable barrierCommand; // 当前所处的“代” private Generation generation = new Generation(); // 还没有到栅栏的线程数,这个值初始为 parties,然后递减 // 还没有到栅栏的线程数 = parties - 已经到栅栏的数量 private int count; public CyclicBarrier(int parties, Runnable barrierAction) { if (parties <= 0) throw new IllegalArgumentException(); this.parties = parties; this.count = parties; this.barrierCommand = barrierAction; } public CyclicBarrier(int parties) { this(parties, null); }
我用一图来描绘下 CyclicBarrier 里面的一些概念:
现在开始分析最重要的等待通过栅栏方法 await 方法:
// 不带超时机制 public int await() throws InterruptedException, BrokenBarrierException { try { return dowait(false, 0L); } catch (TimeoutException toe) { throw new Error(toe); // cannot happen } } // 带超时机制,如果超时抛出 TimeoutException 异常 public int await(long timeout, TimeUnit unit) throws InterruptedException, BrokenBarrierException, TimeoutException { return dowait(true, unit.toNanos(timeout)); }
继续往里看:
private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException { final ReentrantLock lock = this.lock; // 先要获取到锁,然后在 finally 中要记得释放锁 // 如果记得 Condition 部分的话,我们知道 condition 的 await 会释放锁,signal 的时候需要重新获取锁 lock.lock(); try { final Generation g = generation; // 检查栅栏是否被打破,如果被打破,抛出 BrokenBarrierException 异常 if (g.broken) throw new BrokenBarrierException(); // 检查中断状态,如果中断了,抛出 InterruptedException 异常 if (Thread.interrupted()) { breakBarrier(); throw new InterruptedException(); } // index 是这个 await 方法的返回值 // 注意到这里,这个是从 count 递减后得到的值 int index = --count; //最后一个线程到达后, 唤醒所有等待的线程,开启新的一代(设置新的generation) // 如果等于 0,说明所有的线程都到栅栏上了,准备通过 if (index == 0) { // tripped boolean ranAction = false; try { // 如果在初始化的时候,指定了通过栅栏前需要执行的操作,在这里会得到执行 final Runnable command = barrierCommand; if (command != null) command.run(); // 如果 ranAction 为 true,说明执行 command.run() 的时候,没有发生异常退出的情况 ranAction = true; // 唤醒等待的线程,然后开启新的一代 nextGeneration(); return 0; } finally { if (!ranAction) // 进到这里,说明执行指定操作的时候,发生了异常,那么需要打破栅栏 // 之前我们说了,打破栅栏意味着唤醒所有等待的线程,设置 broken 为 true,重置 count 为 parties breakBarrier(); } } // loop until tripped, broken, interrupted, or timed out // 如果是最后一个线程调用 await,那么上面就返回了 // 下面的操作是给那些不是最后一个到达栅栏的线程执行的 for (;;) { try { // 如果带有超时机制,调用带超时的 Condition 的 await 方法等待,直到最后一个线程调用 await if (!timed) //此线程会添加到Condition条件队列中,并在此阻塞 trip.await(); else if (nanos > 0L) nanos = trip.awaitNanos(nanos); } catch (InterruptedException ie) { // 如果到这里,说明等待的线程在 await(是 Condition 的 await)的时候被中断 if (g == generation && ! g.broken) { // 打破栅栏 breakBarrier(); // 打破栅栏后,重新抛出这个 InterruptedException 异常给外层调用的方法 throw ie; } else { Thread.currentThread().interrupt(); } } // 唤醒后,检查栅栏是否是“破的” if (g.broken) throw new BrokenBarrierException(); // 上面最后一个线程执行nextGeneration()后,generation被重写设置 // 我们要清楚,最后一个线程在执行完指定任务(如果有的话),会调用 nextGeneration 来开启一个新的代 // 然后释放掉锁,其他线程从 Condition 的 await 方法中得到锁并返回,然后到这里的时候,其实就会满足 g != generation 的,因为最后一个到达的线程已经重写设置了generation if (g != generation) return index; // 如果醒来发现超时了,打破栅栏,抛出异常 if (timed && nanos <= 0L) { breakBarrier(); throw new TimeoutException(); } } } finally { lock.unlock(); } }
我们看看怎么开启新的一代:
// 开启新的一代,当最后一个线程到达栅栏上的时候,调用这个方法来唤醒其他线程,同时初始化“下一代” private void nextGeneration() { // 首先,需要唤醒所有的在栅栏上等待的线程 trip.signalAll(); // 更新 count 的值 count = parties; // 重新生成“新一代” generation = new Generation(); }
看看怎么打破一个栅栏:
private void breakBarrier() { // 设置状态 broken 为 true generation.broken = true; // 重置 count 为初始值 parties count = parties; // 唤醒所有已经在等待的线程 trip.signalAll(); }
整个过程已经很清楚了。
下面我们来看看怎么得到有多少个线程到了栅栏上,处于等待状态:
public int getNumberWaiting() { final ReentrantLock lock = this.lock; lock.lock(); try { return parties - count; } finally { lock.unlock(); } }
判断一个栅栏是否被打破了,这个很简单,直接看 broken 的值即可:
public boolean isBroken() { final ReentrantLock lock = this.lock; lock.lock(); try { return generation.broken; } finally { lock.unlock(); } }
最后,我们来看看怎么重置一个栅栏:
public void reset() { final ReentrantLock lock = this.lock; lock.lock(); try { breakBarrier(); // break the current generation nextGeneration(); // start a new generation } finally { lock.unlock(); } }
总结
假设一个项目中用代码CyclicBarrier c= new CyclicBarrier(3)
构造一个CyclicBarrier对象, 阻塞等待用的是非超时版本的c.await()
,那么这里c.count的初始值就是3。
①当第一个线程执行到代码片段c.await()
进入到dowait方法中,dowait方法首先要尝试获取锁lock,由于它是第一个线程,此时没有线程竞争能立即获取到锁lock。获取到锁后,将当前需要阻塞等待的线程数count自减1,(count初始为3)此时count自减后为2(不为0),所以它会进入for循环,它一进入for自旋就执行trip.await(),当前(第一个)线程就休眠并释放锁lock .
②当第二个线程执行到代码片段c.await()
③当第三个线程执行到代码片段c.await()
进入到dowait方法中,dowait方法首先要尝试获取锁lock, 由于第二个线程在休眠后释放了锁lock,所以此线程也能立即获取到锁。获取到锁后,将当前需要阻塞等待的线程数count自减1,此时count自减后为0,方法进入代码块if (index == 0){...}
内部,若有barrierCommand,就先执行barrierCommand任务(由此可见,barrierCommand任务会在最后一个到达屏障点的线程中执行),之后再执行方法nextGeneration()
,然后从dowait方法return返回。可以看出第三个(最后一个到达屏障点的)线程执行到c.await()
不会休眠等待。
④nextGeneration()
方法很关键,此方法体中的trip.signalAll()
将唤醒前两个(所有)线程,使得前两个线程从trip.await()
的休眠中返回,继续执行for循环接下来的代码。在接下来的for循环代码中检测到if (g != generation)
条件成立(nextGeneration方法将重新创建一个Generation对象,并将引用赋给成员变量generation),从而从dowait方法中return返回,结束阻塞状态,这两个线程得以继续执行。
Semaphore
有了 CountDownLatch 的基础后,分析 Semaphore 会简单很多。Semaphore 是什么呢?它类似一个资源池(读者可以类比线程池),每个线程需要调用 acquire() 方法获取资源,然后才能执行,执行完后,需要 release 资源,让给其他的线程用。
套路解读:创建 Semaphore 实例的时候,需要一个参数 permits,这个基本上可以确定是设置给 AQS 的 state 的,然后每个线程调用 acquire 的时候,执行 state = state - 1,release 的时候执行 state = state + 1,当然,acquire 的时候,如果 state = 0,说明没有资源了,需要等待其他线程 release。
示例
假如有一个需求,要读取几万个文件的数据,因为都是IO密集型任务,我们可以启动几十个线程并发地读取,但是如果读到内存后,还需要存储到数据库中,而数据库的连接数只有10个,这时我们必须控制只有10个线程同时获取数据库连接保存数据,否则会报错无法获取数据库连接.
class SemaphoreTest { private static final int THREAD_COUNT = 30; private static ExecutorService threadPool = Executors.newFixedThreadPool(THREAD_COUNT); private static Semaphore s = new Semaphore(10); ? public static void main(String[] args) { for (int i = 0; i < THREAD_COUNT; i++) { threadPool.execute(new Runnable() { @Override public void run() { try { s.acquire(); System.out.println("save data"); s.release(); } catch (InterruptedException e) { ? } } }); } threadPool.shutdown(); } }
源码分析
Semaphore
内部主要有一个Sync
类型成员变量sync
, Sync
是继承抽象类AbstractQueuedSynchronizer的静态抽象内部类。
Semaphore利用父类AQS实现了一个共享锁,Sync有两个子类NonfairSync 和FairSync ,分另代表非公平锁、公平锁。共享锁的关键在于实现重写tryAcquireShared 和 tryReleaseShared 方法,这两个方法分别会被父类的模板方法acquireShared 、releaseShared 所调用。
Semaphore的默认构造方法使用非公平锁,Semaphore的构造方法有一个布尔型可选参数fair,此参数指定锁的公平锁。
public Semaphore(int permits) { sync = new NonfairSync(permits); } public Semaphore(int permits, boolean fair) { sync = fair ? new FairSync(permits) : new NonfairSync(permits); }
静态内部类Sync
构造方法Sync(int)
将父类AbstractQueuedSynchronizer
的实例变量state
设置为指定的许可证数permits
。
Sync(int permits) { setState(permits);// }
getPermits()`返回的许可证数即是父类AQS的state值.
final int getPermits() { return getState(); }
构造方法:
public Semaphore(int permits) { sync = new NonfairSync(permits); } public Semaphore(int permits, boolean fair) { sync = fair ? new FairSync(permits) : new NonfairSync(permits); }
这里和 ReentrantLock 类似,用了公平策略和非公平策略。
看 acquire 方法:
public void acquire() throws InterruptedException { sync.acquireSharedInterruptibly(1); } public void acquireUninterruptibly() { sync.acquireShared(1); } public void acquire(int permits) throws InterruptedException { if (permits < 0) throw new IllegalArgumentException(); sync.acquireSharedInterruptibly(permits); } public void acquireUninterruptibly(int permits) { if (permits < 0) throw new IllegalArgumentException(); sync.acquireShared(permits); }
我们接下来看不抛出 InterruptedException 异常的 acquireUninterruptibly() 方法吧:
public void acquireUninterruptibly() { sync.acquireShared(1); } public final void acquireShared(int arg) { if (tryAcquireShared(arg) < 0) doAcquireShared(arg); }
前面说了,Semaphore 分公平策略和非公平策略,我们对比一下两个 tryAcquireShared 方法:
// 公平策略: protected int tryAcquireShared(int acquires) { for (;;) { // 区别就在于是不是会先判断是否有线程在排队,然后才进行 CAS 减操作 // 这个就不分析了,第一篇AQS中已经讲过 if (hasQueuedPredecessors()) //进入到这里说明阻塞队列中已经有线程在等着获取资源 return -1; int available = getState(); int remaining = available - acquires; //当remaining最小为0时,会CAS设置state为0,成功返回remaining //当remaining小于0时,这里会直接返回remaining,这里不会执行compareAndSetState if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } } // 非公平策略: protected int tryAcquireShared(int acquires) { return nonfairTryAcquireShared(acquires); } final int nonfairTryAcquireShared(int acquires) { for (;;) { int available = getState(); int remaining = available - acquires; if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } }
我们再回到 acquireShared 方法
public final void acquireShared(int arg) { if (tryAcquireShared(arg) < 0) doAcquireShared(arg); }
当 tryAcquireShared(arg)大于或者等于0时,获取资源成功,接着执行acquire()后面的业务代码;
当 tryAcquireShared(arg) 返回小于 0 的时候,说明 state 已经小于 0 了(没资源了),此时 acquire 不能立马拿到资源,需要进入到阻塞队列等待,即执行上面第3行代码
private void doAcquireShared(int arg) { final Node node = addWaiter(Node.SHARED); boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GC if (interrupted) selfInterrupt(); failed = false; return; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
线程挂起后等待有资源被 release 出来。接下来,我们就要看 release 的方法了:
// 任务介绍,释放一个资源 public void release() { sync.releaseShared(1); } public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; } protected final boolean tryReleaseShared(int releases) { for (;;) { int current = getState(); int next = current + releases; // 溢出,当然,我们一般也不会用这么大的数 if (next < current) // overflow throw new Error("Maximum permit count exceeded"); //释放资源后,将state的值又加上释放资源数 if (compareAndSetState(current, next)) return true; } }
tryReleaseShared 方法总是会返回 true,此时state的资源数已经加上了,然后是 doReleaseShared,这个也是我们熟悉的方法了,我就贴下代码,不分析了,这个方法用于唤醒所有的等待线程中的第一个等待的线程:
private void doReleaseShared() { for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // loop to recheck cases unparkSuccessor(h); } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS } if (h == head) // loop if head changed break; } }
第一个等待的线程被唤醒后,doReleaseShared终止,接着doAcquireShared()方法被唤醒接着运行,如果资源还够用,则继续唤醒下一个等待节点,可以看到doAcquireShared()方法中第11行处 设置当前节点为head节点,并唤醒下一个等待节点
Semphore 的源码确实很简单,方法都和CountDownLatch 中差不多,基本上都是分析过的老代码的组合使用了。
以上是关于CountDownLatchCyclicBarrierSemaphore源码解析的主要内容,如果未能解决你的问题,请参考以下文章