AQS源码剖析第一篇---全貌概览
Posted 热爱编程的大忽悠
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了AQS源码剖析第一篇---全貌概览相关的知识,希望对你有一定的参考价值。
AQS源码剖析第一篇---全貌概览
引言
AQS全称AbstractQueuedSynchronizer(基于队列实现的抽象同步器),它是 Java 并发包的基础工具类,是实现 ReentrantLock、CountDownLatch、Semaphore、FutureTask 等类的基础。
- AQS能为我们提供什么?
- 同步状态的原子性管理。
- 线程的阻塞和解除阻塞。
- 提供阻塞线程的存储队列。
基于这三大功能,衍生出下面的附加功能:
- 通过中断实现的任务取消,此功能基于线程中断实现。
- 可选的超时设置,也就是调用者可以选择放弃等待任务执行完毕直接返回。
- 定义了Condition接口,用于支持管程形式的await/signal/signalAll操作,代替了Object类基于JNI提供的wait/notify/notifyAll。
AQS还根据同步状态的不同管理方式区分为两种不同的实现:独占状态的同步器和共享状态的同步器。
AQS 结构
先来看看 AQS 有哪些属性,搞清楚这些基本就知道 AQS 是什么套路了,毕竟可以猜嘛!
// 头结点,你直接把它当做 当前持有锁的线程 可能是最好理解的
private transient volatile Node head;
// 阻塞的尾节点,每个新的节点进来,都插入到最后,也就形成了一个链表
private transient volatile Node tail;
// 这个是最重要的,代表当前锁的状态,0代表没有被占用,大于 0 代表有线程持有当前锁
// 这个值可以大于 1,是因为锁可以重入,每次重入都加上 1
private volatile int state;
// 代表当前持有独占锁的线程,举个最重要的使用例子,因为锁可以重入
// reentrantLock.lock()可以嵌套调用多次,所以每次用这个来判断当前线程是否已经拥有了锁
// if (currentThread == getExclusiveOwnerThread()) state++
private transient Thread exclusiveOwnerThread; //继承自AbstractOwnableSynchronizer
CLH队列
AQS的核心在于他的队列实现,AQS借鉴了CLH的思想,并衍生出符合AQS场景下的变体实现,因此我们需要先来了解一下CLH队列实现思想。
CLH锁底层是基于队列实现,一般也称为CLH队列锁。CLH锁也是一种基于链表的可扩展、高性能、公平的自旋锁,申请线程仅仅在本地变量上自旋,它不断轮询前驱的状态,假设发现前驱释放了锁就结束自旋。从实现上看,CLH锁是一种自旋锁,能确保无饥饿性,提供先来先服务的公平性。先看简单的CLH锁的一个简单实现:
package com.TheadTest;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
public class CLHLock implements Lock
/**
* 队列尾部节点
*/
AtomicReference<QueueNode> tail = new AtomicReference<>(new QueueNode());
/**
* 前驱节点
*/
ThreadLocal<QueueNode> pred;
/**
* 当前线程对应的节点
*/
ThreadLocal<QueueNode> current;
/**
* 初始化头结点,头结点的前驱节点为空
*/
public CLHLock()
current = ThreadLocal.withInitial(QueueNode::new);
pred = ThreadLocal.withInitial(() -> null);
/**
* 1.拿到当前线程对应的节点,并上锁 <p>
* 2.设置尾结点为当前节点,并返回旧的尾结点 <p>
* 3.旧的尾结点,作为当前节点的前驱节点 <p>
* 4.不断轮询,直到前驱节点释放了锁 <p>
*
* ps: 没获取到锁,就一直阻塞,直到前驱节点释放了锁
*/
@Override
public void lock()
QueueNode node = current.get();
node.locked = true;
QueueNode pred = tail.getAndSet(node);
this.pred.set(pred);
while (pred.locked)
/**
* 1.获取当前线程对应的节点 <p>
* 2.当前节点解锁 <p>
* 3.设置当前节点重新指向它的前驱节点 <p>
*/
@Override
public void unlock()
QueueNode node = current.get();
node.locked = false;
current.set(this.pred.get());
static class QueueNode
//确保线程可见性
volatile boolean locked;
// 忽略其他接口方法的实现
@Override
public Condition newCondition()
return null;
@Override
public void lockInterruptibly() throws InterruptedException
@Override
public boolean tryLock()
return false;
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException
return false;
上面是一个简单的CLH队列锁的实现,内部类QueueNode只使用了一个简单的布尔值locked属性记录了每个线程的状态,如果该属性为true,则相应的线程要么已经获取到锁,要么正在等待锁,如果该属性为false,则相应的线程已经释放了锁。新来的想要获取锁的线程必须对tail属性调用getAndSet()方法,使得自身成为队列的尾部,同时得到一个指向前驱节点的引用pred,最后线程所在节点在其前驱节点的locked属性上自旋,直到前驱节点释放锁。上面的实现是无法运行的,因为一旦自旋就会进入死循环导致CPU飙升,可以尝试使用下文将要提到的LockSupport进行改造。
CLH队列锁本质是使用队列(实际上是单向链表)存放等待获取锁的线程,等待的线程总是在其所在节点的前驱节点的状态上自旋,直到前驱节点释放资源。从实际来看,过度自旋带来的CPU性能损耗比较大,并不是理想的线程等待队列的实现。
基于原始的CLH队列锁中提供的等待队列的基本原理,AQS实现一种了CLH锁队列的变体(Variant)。AQS类的protected修饰的构造函数里面有一大段注释用于说明AQS实现的等待队列的细节事项,这里列举几点重要的:
- AQS实现的等待队列没有直接使用CLH锁队列,但是参考了其设计思路,等待节点会保存前驱节点中线程的信息,内部也会维护一个控制线程阻塞的状态值。
- 每个节点都设计为一个持有单独的等待线程并且”带有具体的通知方式”的监视器,这里所谓通知方式就是自定义唤醒阻塞线程的方式而已。
- 一个线程是等待队列中的第一个等待节点的持有线程会尝试获取锁,但是并不意味着它一定能够获取锁成功(这里的意思是存在公平和非公平的实现),获取失败就要重新等待。
- 等待队列中的节点通过prev属性连接前驱节点,通过next属性连接后继节点,简单来说,就是双向链表的设计。
- CLH队列本应该需要一个虚拟的头节点,但是在AQS中没有直接提供虚拟的头节点,而是延迟到第一次竞争出现的时候懒创建虚拟的头节点(其实也会创建尾节点,初始化时头尾节点是同一个节点)。
- Condition(条件)等待队列中的阻塞线程使用的是相同的Node结构,但是提供了另一个链表用来存放,Condition等待队列的实现比非Condition等待队列复杂。
线程阻塞与唤醒
线程的阻塞和唤醒在JDK1.5之前,一般只能依赖于Object类提供的wait()、notify()和notifyAll()方法,它们都是JNI方法,由JVM提供实现,并且它们必须运行在获取监视器锁的代码块内(synchronized代码块中),这个局限性先不谈性能上的问题,代码的简洁性和灵活性是比较低的。JDK1.5引入了LockSupport类,底层是基于Unsafe类的park()和unpark()方法,提供了线程阻塞和唤醒的功能,它的机制有点像只有一个允许使用资源的信号量java.util.concurrent.Semaphore,也就是一个线程只能通过park()方法阻塞一次,只能调用unpark()方法解除调用阻塞一次,线程就会唤醒(多次调用unpark()方法也只会唤醒一次),可以想象是内部维护了一个0-1的计数器。
LockSupport类如果使用得好,可以提供更灵活的编码方式,这里举个简单的使用例子:
package com.TheadTest;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.concurrent.locks.LockSupport;
public class LockSupportMain implements Runnable
private static final DateTimeFormatter FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS");
private Thread thread;
private void setThread(Thread thread)
this.thread = thread;
public static void main(String[] args) throws Exception
LockSupportMain main = new LockSupportMain();
Thread thread = new Thread(main, "LockSupportMain");
main.setThread(thread);
thread.start();
Thread.sleep(2000);
main.unpark();
Thread.sleep(2000);
@Override
public void run()
System.out.println(String.format("%s-步入run方法,线程名称:%s", FORMATTER.format(LocalDateTime.now()),
Thread.currentThread().getName()));
LockSupport.park();
System.out.println(String.format("%s-解除阻塞,线程继续执行,线程名称:%s", FORMATTER.format(LocalDateTime.now()),
Thread.currentThread().getName()));
private void unpark()
LockSupport.unpark(thread);
LockSupport类park()方法也有带超时的变体版本方法,遇到带超时期限阻塞等待场景下不妨可以使用LockSupport#parkNanos()。
独占线程的保存
AbstractOwnableSynchronizer是AQS的父类,一个同步器框架有可能在一个时刻被某一个线程独占,AbstractOwnableSynchronizer就是为所有的同步器实现和锁相关实现提供了基础的保存、获取和设置独占线程的功能,这个类的源码很简单:
public abstract class AbstractOwnableSynchronizer
implements java.io.Serializable
private static final long serialVersionUID = 3737899427754241961L;
protected AbstractOwnableSynchronizer()
// 当前独占线程的瞬时实例 - 提供Getter和Setter方法
private transient Thread exclusiveOwnerThread;
protected final void setExclusiveOwnerThread(Thread thread)
exclusiveOwnerThread = thread;
protected final Thread getExclusiveOwnerThread()
return exclusiveOwnerThread;
它就提供了一个保存独占线程的变量对应的Setter和Getter方法,方法都是final修饰的,子类只能使用不能覆盖。
AQS提供的CLH变体实现
这里先重点分析一下AQS中等待队列的节点AQS的静态内部类Node的源码:
static final class Node
// 标记一个节点处于共享模式下的等待
static final Node SHARED = new Node();
// 标记一个节点处于独占模式下的等待
static final Node EXCLUSIVE = null;
// 取消状态
static final int CANCELLED = 1;
// 唤醒状态
static final int SIGNAL = -1;
// 条件等待状态
static final int CONDITION = -2;
// 传播状态
static final int PROPAGATE = -3;
// 等待状态,初始值为0,其他可选值是上面的4个值
volatile int waitStatus;
// 当前节点前驱节点的引用
volatile Node prev;
// 当前节点后继节点的引用
volatile Node next;
// 当前节点持有的线程,可能是阻塞中等待唤醒的线程
volatile Thread thread;
// 下一个等待节点
Node nextWaiter;
// 当前操作的节点是否处于共享模式
final boolean isShared()
return nextWaiter == SHARED;
// 获取当前节点的前驱节点,确保前驱节点必须存在,否则抛出NPE
final Node predecessor()
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
// 空节点,主要是首次创建队列的时候创建的头和尾节点使用
Node()
// 设置下一个等待节点,设置持有线程为当前线程
Node(Node nextWaiter)
this.nextWaiter = nextWaiter;
THREAD.set(this, Thread.currentThread());
// 设置waitStatus,设置持有线程为当前线程
Node(int waitStatus)
WAITSTATUS.set(this, waitStatus);
THREAD.set(this, Thread.currentThread());
// CAS更新waitStatus
final boolean compareAndSetWaitStatus(int expect, int update)
return WAITSTATUS.compareAndSet(this, expect, update);
// CAS设置后继节点
final boolean compareAndSetNext(Node expect, Node update)
return NEXT.compareAndSet(this, expect, update);
// 设置前驱节点
final void setPrevRelaxed(Node p)
PREV.set(this, p);
// 下面是变量句柄的实现,在VarHandle出现之前使用的是Unsafe,其实底层还是照样使用Unsafe
private static final VarHandle NEXT;
private static final VarHandle PREV;
private static final VarHandle THREAD;
private static final VarHandle WAITSTATUS;
static
try
MethodHandles.Lookup l = MethodHandles.lookup();
NEXT = l.findVarHandle(Node.class, "next", Node.class);
PREV = l.findVarHandle(Node.class, "prev", Node.class);
THREAD = l.findVarHandle(Node.class, "thread", Thread.class);
WAITSTATUS = l.findVarHandle(Node.class, "waitStatus", int.class);
catch (ReflectiveOperationException e)
throw new ExceptionInInitializerError(e);
其中,变量句柄(VarHandle)是JDK9引入的新特性,其实底层依赖的还是Unsafe的方法,笔者认为可以简单理解它为Unsafe的门面类,而定义的方法基本都是面向变量属性的操作。这里需要关注一下Node里面的几个属性:
waitStatus:当前Node实例的等待状态,可选值有5个。
- 初始值整数0:当前节点如果不指定初始化状态值,默认值就是0,侧面说明节点正在等待队列中处于等待状态,一般是队尾节点。
- Node#CANCELLED整数值1:表示当前节点实例因为超时或者线程中断而被取消,等待中的节点永远不会处于此状态,被取消的节点中的线程实例不会阻塞。
- Node#SIGNAL整数值-1:表示当前节点的后继节点是(或即将是)阻塞的(通过LockSupport#park()),当它释放或取消时,当前节点必须LockSupport#unpark()它的后继节点。
- Node#CONDITION整数值-2:表示当前节点是条件队列中的一个节点,当它转换为同步队列中的节点的时候,状态会被重新设置为0。
- Node#PROPAGATE整数值-3:此状态值通常只设置到调用了doReleaseShared()方法的头节点,确保releaseShared()方法的调用可以传播到其他的所有节点,简单理解就是共享模式下节点释放的传递标记。
prev、next:当前Node实例的前驱节点引用和后继节点引用。
thread:当前Node实例持有的线程实例引用。
nextWaiter:这个值是一个比较容易令人生疑的值,虽然表面上它称为”下一个等待的节点”,但是实际上它有三种取值的情况。
- 值为静态实例Node.EXCLUSIVE(也就是null),代表当前的Node实例是独占模式。
- 值为静态实例Node.SHARED,代表当前的Node实例是共享模式。
- 值为非Node.EXCLUSIVE和Node.SHARED的其他节点实例,代表Condition等待队列中当前节点的下一个等待节点。
AbstractQueuedSynchronizer 的等待队列示意如下所示,注意了,之后分析过程中所说的 queue,也就是阻塞队列不包含 head,不包含 head,不包含 head。
源码流程追踪
我们从ReentrantLock的公平锁源码实现开始讲起,首先,我们先看下 ReentrantLock 的使用方式。
// 我用个web开发中的service概念吧
public class OrderService
// 使用static,这样每个线程拿到的是同一把锁,当然,spring mvc中service默认就是单例,别纠结这个
private static ReentrantLock reentrantLock = new ReentrantLock(true);
public void createOrder()
// 比如我们同一时间,只允许一个线程创建订单
reentrantLock.lock();
// 通常,lock 之后紧跟着 try 语句
try
// 这块代码同一时间只能有一个线程进来(获取到锁的线程),
// 其他的线程在lock()方法上阻塞,等待获取到锁,再进来
// 执行代码...
// 执行代码...
// 执行代码...
finally
// 释放锁
reentrantLock.unlock();
ReentrantLock 在内部用了内部类 Sync 来管理锁,所以真正的获取锁和释放锁是由 Sync 的实现类来控制的。
abstract static class Sync extends AbstractQueuedSynchronizer
Sync 有两个实现,分别为 NonfairSync(非公平锁)和 FairSync(公平锁),我们看 FairSync 部分。
public ReentrantLock(boolean fair)
sync = fair ? new FairSync() : new NonfairSync();
公平锁acquire源码实现
static final class FairSync extends Sync
private static final long serialVersionUID = -3000897897090466540L;
// 争锁
final void lock()
acquire(1);
// 来自父类AQS,我直接贴过来这边,下面分析的时候同样会这样做,不会给读者带来阅读压力
// 我们看到,这个方法,如果tryAcquire(arg) 返回true, 也就结束了。
// 否则,acquireQueued方法会将线程压到队列中
public final void acquire(int arg) // 此时 arg == 1
// 首先调用tryAcquire(1)一下,名字上就知道,这个只是试一试
// 因为有可能直接就成功了呢,也就不需要进队列排队了,
// 对于公平锁的语义就是:本来就没人持有锁,根本没必要进队列等待(又是挂起,又是等待被唤醒的)
if (!tryAcquire(arg) &&
// tryAcquire(arg)没有成功,这个时候需要把当前线程挂起,放到阻塞队列中。
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
/**
* Fair version of tryAcquire. Don't grant access unless
* recursive call or no waiters or is first.
*/
// 尝试直接获取锁,返回值是boolean,代表是否获取到锁
// 返回true:1.没有线程在等待锁;2.重入锁,线程本来就持有锁,也就可以理所当然可以直接获取
protected final boolean tryAcquire(int acquires)
final Thread current = Thread.currentThread();
int c = getState();
// state == 0 此时此刻没有线程持有锁
if (c == 0)
// 虽然此时此刻锁是可以用的,但是这是公平锁,既然是公平,就得讲究先来后到,
// 看看有没有别人在队列中等了半天了
if (!hasQueuedPredecessors() &&
// 如果没有线程在等待,那就用CAS尝试一下,成功了就获取到锁了,
// 不成功的话,只能说明一个问题,就在刚刚几乎同一时刻有个线程抢先了 =_=
// 因为刚刚还没人的,我判断过了
compareAndSetState(0, acquires))
// 到这里就是获取到锁了,标记一下,告诉大家,现在是我占用了锁
setExclusiveOwnerThread(current);
return true;
// 会进入这个else if分支,说明是重入了,需要操作:state=state+1
// 这里不存在并发问题
else if (current == getExclusiveOwnerThread()) 《并发系列一》AbstractQueuedSynchronizer(AQS)- 互斥锁源码剖析
全网最详细的AbstractQueuedSynchronizer(AQS)源码剖析资源的获取和释放
AbstractQueuedSynchronizer AQS框架源码剖析