AQS源码剖析第三篇--共享模式
Posted 热爱编程的大忽悠
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了AQS源码剖析第三篇--共享模式相关的知识,希望对你有一定的参考价值。
AQS源码剖析第三篇--共享模式
系列文章:
这篇,我们的关注点是 AQS 最后的部分,AQS 共享模式的使用。
本文先用 CountDownLatch 将共享模式说清楚,然后顺着把其他 AQS 相关的类 CyclicBarrier、Semaphore 的源码一起过一下。
CountDownLatch
CountDownLatch 这个类是比较典型的 AQS 的共享模式的使用,这是一个高频使用的类。latch 的中文意思是门栓、栅栏,具体怎么解释我就不废话了,大家随意,看两个例子就知道在哪里用、怎么用了。
假设我们有3个任务,那么我们会用 3 来初始化一个 CountDownLatch,然后将这个 latch 的引用传递到各个线程中,在每个线程完成了任务后,调用 latch.countDown() 代表完成了一个任务。
调用 latch.await() 的方法的线程会阻塞,直到所有的任务完成。
package com.aqs;
import java.util.concurrent.CountDownLatch;
/**
* @author 大忽悠
* @create 2022/10/2 16:56
*/
public class AqsShare
/**
* 开始信号
*/
private static final CountDownLatch START_SIGNAL=new CountDownLatch(1);
/**
* 结束信号
*/
private static final CountDownLatch END_SIGNAL=new CountDownLatch(3);
public static void main(String[] args) throws InterruptedException
for (int i = 0; i < 3; ++i)
new Thread(new Worker(START_SIGNAL, END_SIGNAL)).start();
// 这边插入一些代码,确保上面的每个线程先启动起来,才执行下面的代码。
doSomethingElse();
// 因为这里 N == 1,所以,只要调用一次,那么所有的 await 方法都可以通过
START_SIGNAL.countDown();
doSomethingElse();
// 等待所有任务结束
END_SIGNAL.await();
System.out.println("所有任务执行结束");
private static void doSomethingElse()
try
Thread.sleep(1000L);
System.out.println("Main线程休息好了...");
catch (InterruptedException e)
e.printStackTrace();
public static class Worker implements Runnable
/**
* 开始信号
*/
private final CountDownLatch startSignal;
/**
* 结束信号
*/
private final CountDownLatch endSignal;
public Worker(CountDownLatch startSignal, CountDownLatch endSignal)
this.startSignal = startSignal;
this.endSignal = endSignal;
@Override
public void run()
try
// 为了让所有线程同时开始任务,我们让所有线程先阻塞在这里
// 等大家都准备好了,再打开这个门栓
startSignal.await();
doWork();
endSignal.countDown();
catch (InterruptedException e)
e.printStackTrace();
private void doWork() throws InterruptedException
System.out.printf("当前线程[%s],努力工作中...\\r\\n",Thread.currentThread().getName());
Thread.sleep(3000L);
System.out.printf("当前线程[%s],工作结束\\r\\n",Thread.currentThread().getName());
3个新开启的线程都调用了startSignal.await() 进行阻塞等待,它们阻塞在栅栏上,只有当条件满足的时候(startSignal.countDown()),它们才能同时通过这个栅栏,目的是让所有的线程站在一个起跑线上。
如果始终只有一个线程调用 await 方法等待任务完成,那么 CountDownLatch 就会简单很多,所以之后的源码分析读者一定要在脑海中构建出这么一个场景:有 m 个线程是做任务的,有 n 个线程在某个栅栏上等待这 m 个线程做完任务,直到所有 m 个任务完成后,n 个线程同时通过栅栏。
CountDownLatch 非常实用,我们常常会将一个比较大的任务进行拆分,然后开启多个线程来执行,等所有线程都执行完了以后,再往下执行其他操作。
源码分析
构造方法,需要传入一个不小于 0 的整数:
public CountDownLatch(int count)
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
CountDownLatch内部提供的同步器实现如下:
private static final class Sync extends AbstractQueuedSynchronizer
private static final long serialVersionUID = 4982264981922014374L;
Sync(int count)
setState(count);
int getCount()
return getState();
//count计数为0,任务都完成了,就返回1,否则返回-1
protected int tryAcquireShared(int acquires)
return (getState() == 0) ? 1 : -1;
//count计数自旋减一,如果减去一后为0,则返回true,否则返回false
protected boolean tryReleaseShared(int releases)
// Decrement count; signal when transition to zero
for (;;)
int c = getState();
if (c == 0)
return false;
int nextc = c - 1;
if (compareAndSetState(c, nextc))
return nextc == 0;
代码都是套路,先分析套路:AQS 里面的 state 是一个整数值,这边用一个 int count 参数其实初始化就是设置了这个值,所有调用了 await 方法的等待线程会挂起,然后有其他一些线程会做 state = state - 1 操作,当 state 减到 0 的同时,那个将 state 减为 0 的线程会负责唤醒 所有调用了 await 方法的线程。
对于 CountDownLatch,我们仅仅需要关心两个方法,一个是 countDown() 方法,另一个是 await() 方法。
countDown() 方法每次调用都会将 state 减 1,直到 state 的值为 0;而 await 是一个阻塞方法,当 state 减为 0 的时候,await 方法才会返回。await 可以被多个线程调用,读者这个时候脑子里要有个图:所有调用了 await 方法的线程阻塞在 AQS 的阻塞队列中,等待条件满足(state == 0),将线程从队列中一个个唤醒过来。
await源码分析
首先,我们来看 await() 方法,它代表线程阻塞,等待 state 的值减为 0。
public void await() throws InterruptedException
//共享模式下响应中断的获取资源获取
sync.acquireSharedInterruptibly(1);
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException
//如果获取资源过程中发生了中断,会给予响应
if (Thread.interrupted())
throw new InterruptedException();
//尝试在共享模式下获取资源
if (tryAcquireShared(arg) < 0)
//资源未能获取成功,则准备进入阻塞队列
doAcquireSharedInterruptibly(arg);
//尝试在共享模式下去获取资源
//对于CountDownLatch来说,资源是否获取成功,等价于count是否为0
// 只有当 state == 0 的时候,这个方法才会返回 1,表示资源获取成功,那就不需要将当前线程弄到阻塞队列等着了
//如果资源获取失败,则返回-1,下一步就是让当前线程去阻塞队列等着了
protected int tryAcquireShared(int acquires)
return (getState() == 0) ? 1 : -1;
doAcquireSharedInterruptibly 从方法名我们就可以看出,这个方法是获取共享锁,并且此方法是可中断的(中断的时候抛出 InterruptedException 退出这个方法)。
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException
//将当前线程以共享模式加入阻塞队列
final Node node = addWaiter(Node.SHARED);
try
//死轮询直到获取到共享锁
for (;;)
//只有前驱节点是head哑结点的情况下,才有机会去抢共享锁
final Node p = node.predecessor();
if (p == head)
//抢锁成功对于CountDownLatch意味着count=0
int r = tryAcquireShared(arg);
//返回值大于0,表示count=0了,那么需要链式唤醒后续其他阻塞线程
if (r >= 0)
//设置当前节点为头结点,并且唤醒阻塞队列上模式为shared的后续节点
setHeadAndPropagate(node, r);
p.next = null; // help GC
return;
//抢锁失败就挂起,和之前一样---这里抢锁失败意味着count!=0
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
catch (Throwable t)
cancelAcquire(node);
throw t;
如果当前阻塞队列中第一个节点绑定的线程阻塞被唤醒后,发现此时count=0了,那么会去链式唤醒在阻塞队列上的后续模式为shared的节点。
链式唤醒的逻辑存在于setHeadAndPropagate中:
private void setHeadAndPropagate(Node node, int propagate)
Node h = head;
//设置当前节点为新的头结点
setHead(node);
//如果当前被唤醒线程的node节点的下一个节点是共享模式,那么会通过doReleaseShared唤醒后继节点
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0)
Node s = node.next;
//链式唤醒的核心,后面分析doReleaseShared方法的时候就会明白
if (s == null || s.isShared())
doReleaseShared();
下面再通过流程图的方式把上面的流程过一遍:
- 线程 t3 经过第 1 步 addWaiter 入队以后,我们应该可以得到这个:
由于 tryAcquireShared 这个方法会返回 -1,所以 if (r >= 0) 这个分支不会进去。到 shouldParkAfterFailedAcquire 的时候,t3 将 head 的 waitStatus 值设置为 -1,如下:
然后进入到 parkAndCheckInterrupt 的时候,t3 挂起。
我们再分析 t4 入队,t4 会将前驱节点 t3 所在节点的 waitStatus 设置为 -1,t4 入队后,应该是这样的:
然后,t4 也挂起。接下来,t3 和 t4 就等待唤醒了。
接下来,我们来看唤醒的流程。为了让下面的示意图更丰富些,我们假设用 10 初始化 CountDownLatch。
countDown源码分析
public void countDown()
sync.releaseShared(1);
public final boolean releaseShared(int arg)
if (tryReleaseShared(arg))
doReleaseShared();
return true;
return false;
releaseShared是由AQS框架提供的模板方法,其中tryReleaseShared如何释放共享锁的逻辑应该由子类实现,因此我们来看看countDownLatch中是如何实现的吧。
protected boolean tryReleaseShared(int releases)
// Decrement count; signal when transition to zero
for (;;)
int c = getState();
if (c == 0)
return false;
int nextc = c - 1;
if (compareAndSetState(c, nextc))
return nextc == 0;
这个方法很简单,用自旋的方法实现 state 减 1,如果减去一后,state为0,返回true。
state为0,对于CountDownLatch来说,即count为0,即计数为0了,那么就需要唤醒所有因为await阻塞的线程了。
doReleaseShared是由AQS框架提供的,共享模式下唤醒逻辑:
// 调用这个方法的时候,state == 0
private void doReleaseShared()
for (;;)
Node h = head;
//阻塞队列不为空
if (h != null && h != tail)
int ws = h.waitStatus;
//说明头结点后面存在节点需要被唤醒
if (ws == Node.SIGNAL)
// 将 head 的 waitStatue 设置为 0---因为唤醒头结点的后继节点后,后继节点后将自己作为新的头结点
//旧的头结点会从阻塞队列中移除
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
// 唤醒 head 的后继节点,也就是阻塞队列中靠前面满足条件的节点
unparkSuccessor(h);
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) // todo
continue; // loop on failed CAS
if (h == head) // loop if head changed
break;
这段代码先不看完,就看到unparkSuccessor(h),然后结束。
当unparkSuccessor(h)唤醒头节点的第一个后继节点后,因为此时count=0,所以:
setHeadAndPropagate开始了链式唤醒:
唤醒第一个后继节点后,还存在下面几种可能情况:
- 被唤醒的后继节点所在线程将自己设置为新的头结点时,调用CountDown的线程已经执行完了,到了 if (h == head)判断逻辑
// 调用这个方法的时候,state == 0
private void doReleaseShared()
for (;;)
Node h = head;
//阻塞队列不为空
if (h != null && h != tail)
int ws = h.waitStatus;
//说明头结点后面存在节点需要被唤醒
if (ws == Node.SIGNAL)
// 将 head 的 waitStatue 设置为 0---因为唤醒头结点的后继节点后,后继节点后将自己作为新的头结点
//旧的头结点会从阻塞队列中移除
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
// 唤醒 head 的后继节点,也就是阻塞队列中靠前面满足条件的节点
unparkSuccessor(h);`在这里插入代码片`
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) // todo
continue; // loop on failed CAS
//此时头结点还没有被后继节点重新设置,因此满足条件,跳出循环
if (h == head) // loop if head changed
break;
- 被唤醒的后继节点所在线程将自己设置为新的头结点时,调用CountDown的线程还没执行完 if (h == head)判断逻辑
// 调用这个方法的时候,state == 0
private void doReleaseShared()
for (;;)
Node h = head;
//阻塞队列不为空
if (h != null && h != tail)
int ws = h.waitStatus;
//说明头结点后面存在节点需要被唤醒
if (ws == Node.SIGNAL)
// 将 head 的 waitStatue 设置为 0---因为唤醒头结点的后继节点后,后继节点后将自己作为新的头结点
//旧的头结点会从阻塞队列中移除
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
// 唤醒 head 的后继节点,也就是阻塞队列中靠前面满足条件的节点
unparkSuccessor(h);
//当唤醒到阻塞队列中最后一个节点时,自然此时ws为0了
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) // todo
continue; // loop on failed CAS
//此时头结已经被后继节点重新设置,因此不满足条件,继续循环
if (h == head) // loop if head changed
break;
CyclicBarrier
字面意思是“可重复使用的栅栏”或“周期性的栅栏”,总之不是用了一次就没用了的,CyclicBarrier 相比 CountDownLatch 来说,要简单很多,其源码没有什么高深的地方,它是 ReentrantLock 和 Condition 的组合使用。看如下示意图,CyclicBarrier 和 CountDownLatch 是不是很像,只是 CyclicBarrier 可以有不止一个栅栏,因为它的栅栏(Barrier)可以重复使用(Cyclic)。
首先,CyclicBarrier 的源码实现和 CountDownLatch 大相径庭,CountDownLatch 基于 AQS 的共享模式的使用,而 CyclicBarrier 基于 Condition 来实现。
因为 CyclicBarrier 的源码相对来说简单许多,读者只要熟悉了前面关于 Condition 的分析,那么这里的源码是毫无压力的,就是几个特殊概念罢了。
先用一张图来描绘下 CyclicBarrier 里面的一些概念,和它的基本使用流程:
看图我们也知道了,CyclicBarrier 的源码最重要的就是 await() 方法了。
简单使用演示如下:
package com.aqs;
import lombok.SneakyThrows;
import java.util.concurrent.*;
/**
* @author 大忽悠
* @create 2022/10/2 16:56
*/
public class AqsShare
public static void main(String[] args) throws InterruptedException
CyclicBarrier cyclicBarrier = new CyclicBarrier(10, () ->
System.out.println("所有线程已经到达栅栏处,准备放行");
);
ExecutorService executorService = Executors.newFixedThreadPool(10);
for (int i = 0; i < 10; i++)
executorService.execute(()->
try
while(true)
System.out.println("马上到达栅栏处...");
sleepRandom();
cyclicBarrier.await();
System.out.println("栅栏放开...");
catch (InterruptedException |以上是关于AQS源码剖析第三篇--共享模式的主要内容,如果未能解决你的问题,请参考以下文章