AQS原理初探
Posted 风在哪
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了AQS原理初探相关的知识,希望对你有一定的参考价值。
AQS原理初探
AQS全称为AbstractQueuedSynchronizer,如果直接按名字翻译的话就是抽象队列式同步器,AQS定义了一套多线程访问共享资源的同步器框架,许多同步类的实现都依赖于它,如ReentrantLock(可重入锁)、Semaphore(信号量)等等。它是构建锁或者其他同步组件的基础框架。
可重入锁
这里解释一下可重入锁:可重入锁就是如果某个线程已经获得某个锁,可以再次获取该锁而不会导致死锁。ReentrantLock以及synchronized都是可重入锁,其中ReentrantLock需要自己手动释放(如果获取次数和释放次数不一致会有问题),而synchronized会自动释放。
AQS成员变量
private transient volatile Node head; // 队列的头节点
private transient volatile Node tail; // 队列的尾节点
private volatile int state; // 标识同步状态
使用int类型的state是AQS定义线程获取锁的模式包括独占模式和共享模式,所以使用int可以标识这两种状态。
AQS如何实现同步
AQS使用一个int类型的成员变量state来表示同步状态,当state>0时表示已经获得了锁,当state=0时表示释放了锁,它提供了三个方法getState(),setState(int newState),compareAndSetState(int expect, int update)来对同步状态state进行操作,当然AQS可以确保对state的操作是安全的。
state是volatile类型的变量,volatie保证了共享变量的可见性和有序性,但是不保证原子性,所以这里还有一个cas操作保证了设置state操作的原子性。
private volatile int state;
protected final int getState() { // 返回当前状态的值
return state;
}
protected final void setState(int newState) { // 设置当前状态的值
state = newState;
}
// 以cas的方式设置当前状态的值
protected final boolean compareAndSetState(int expect, int update) {
// See below for intrinsics setup to support this
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
AQS定义了两种资源共享方式:Exclusive(独占方式,只有一个线程执行)和Share(共享式,多个线程可以同时执行)
CLH同步队列
CLH同步队列是一个FIFO双向队列,AQS依赖它来完成同步状态的管理,当前线程如果获取同步状态失败时,AQS则会将当前线程已经等待状态等信息构造成一个节点(Node)并将其加入到CLH同步队列,同时会阻塞当前线程,当同步状态释放时,会把首节点唤醒(公平锁),使其再次尝试获取同步状态。
在CLH同步队列中,一个节点表示一个线程,它保存着线程的引用(thread)、状态(waitStatus)、前驱节点(prev)、后继节点(next),其数据结构如下
队列的node节点包含如下字段:
/** 标识该节点在共享模式 */
static final Node SHARED = new Node();
/** 标识该节点在独占模式 */
static final Node EXCLUSIVE = null;
/** 标识此线程取消了争夺锁的操作 */
static final int CANCELLED = 1;
// 表示当前节点的后继节点对应的线程需要被唤醒
static final int SIGNAL = -1;
/** waitStatus value to indicate thread is waiting on condition */
static final int CONDITION = -2;
/**
* waitStatus value to indicate the next acquireShared should
* unconditionally propagate
*/
static final int PROPAGATE = -3;
// 当前线程的状态
volatile int waitStatus;
/**
* 当前结点的前置结点
*/
volatile Node prev;
/**
* 当前结点的下一个结点
*/
volatile Node next;
/**
* 使该结点排队的线程,在构造函数中初始化,并且用完以后置为null
*/
volatile Thread thread;
/**
* 指向下一个等待某一条件的结点,或者处于SHARED状态的结点
*/
Node nextWaiter;
如何自定义同步器
不同的自定义的同步器争用共享资源的方式不同,自定义同步器在实现时只需要实现共享资源的state的获取于释放方式即可,至于具体线程等待队列的维护AQS顶层已经为我们实现了,自定义同步器需要实现以下方法:
- isHeldExclusively():该线程是否正在独占资源
- tryAcquire(int):以独占的方式尝试获取资源,成功则返回true,失败则返回false,成功获取同步状态以后,其他线程需要等待该线程释放同步状态才能获取同步状态
- tryRelease(int):以独占的方式去尝试释放资源,成功则返回true,失败返回false
- tryAcquireShared(int):以共享方式尝试获取资源,其中返回负数表示失败;0表示成功,但没有剩余资源可用;整数表示成功,且有剩余资源
- tryReleaseShared(int):以共享方式尝试释放资源,成功则返回true,失败则返回false
当然AQS还包含其他的已经给我们实现好的方法:
- acquire(int arg):独占式获取同步状态,如果当前线程获取同步状态成功,则由该方法返回,否则,将会进入同步队列等待,该方法将会调用可重写的tryAcquire(int arg)方法;
- acquireInterruptibly(int arg):与acquire(int arg)相同,但是该方法响应中断,当前线程为获取到同步状态而进入到同步队列中,如果当前线程被中断,则该方法会抛出InterruptedException异常并返回;
- tryAcquireNanos(int arg,long nanos):超时获取同步状态,如果当前线程在nanos时间内没有获取到同步状态,那么将会返回false,已经获取则返回true;
- acquireShared(int arg):共享式获取同步状态,如果当前线程未获取到同步状态,将会进入同步队列等待,与独占式的主要区别是在同一时刻可以有多个线程获取到同步状态;
- acquireSharedInterruptibly(int arg):共享式获取同步状态,响应中断;
- tryAcquireSharedNanos(int arg, long nanosTimeout):共享式获取同步状态,增加超时限制;
- release(int arg):独占式释放同步状态,该方法会在释放同步状态之后,将同步队列中第一个节点包含的线程唤醒;
- releaseShared(int arg):共享式释放同步状态;
源码浅析
获得锁
如果想要获取锁的话,会调用tryAcquire方法,我们来看一下这个方法:
/*
通过这个源码可以看出这个方法会返回boolean值,判断是否成功获得锁,其中使用int型变量arg来修改state
这个方法没有具体的实现,需要继承AQS的类自己去实现,给上层的调用开放空间,上层可以自由编写业务逻辑
*/
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
我们来看一个具体的实现:
// 这是ReentrantLock的内部类,FairSync继承自Sync,Sync是继承了AbstractQueuedSynchronizer
static final class FairSync extends Sync {
private static final long serialVersionUID = -3000897897090466540L;
final void lock() {
acquire(1);
}
/**
* Fair version of tryAcquire. Don't grant access unless
* recursive call or no waiters or is first.
*/
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
当获取锁成功,那么线程就可以对资源进行操作,使用完成之后再释放,如果获取锁失败呢?
当获取锁失败的时候,如果上层业务不想等待锁,那么就可以进行相应的处理(例如直接返回false);如果等待的话, 可以调用acquire方法,而不用自己去实现复杂的排队处理,这就是框架灵活的地方,我们来看看这个acquire方法:
// 使用关键字final修饰,所有的继承类不能改变此方法
public final void acquire(int arg) {
/*
如果tyrAcquire()获取锁成功,那么就直接返回,跳出这个方法
否则会调用acquireQueued方法,这个里面会
*/
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
上述的方法调用了addWaiter和acquireQueued方法,我们首先来看看addWaiter方法:
// 将当前线程封装为node节点加入等待队列
// 先尝试进行快速入队,如果失败则再进行完整的入队
// 注意这里返回的是尾节点
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
// 当前尾节点为空,或者cas操作入队失败的话,会调用enq方法,进入完整的入队方法
if (pred != null) {
node.prev = pred;
// if里面的内容会不会产生线程安全问题?
if (compareAndSetTail(pred, node)) {
// 这里只是将前置节点的指针指向了当前节点,即使这时候尾节点发生变化也不会产生线程安全问题
pred.next = node;
return node;
}
}
enq(node);
return node;
}
private Node enq(final Node node) {
// 自旋,采用尾插法
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
接下来就是acquireQueued方法:
- 如果当前线程所在的节点为头节点的下一个节点,那么将会不断的尝试拿锁,直到拿锁成功,否则将会判断是否需要挂起
- 如何判断是不是需要挂起,如果当前线程所在节点之前除了head还有其他节点,那这些节点的waitStatus为signal,那这些节点就需要挂起,保证head之后只有一个节点获取锁,其他节点都处于已挂起或正在被挂起状态,避免无用的自旋
- 当一个线程使用完资源,并且尝试去释放锁的时候应该尝试去唤醒挂起的线程,
final boolean acquireQueued(final Node node, int arg) {
// 从finally方法块中,我们可以看出,当方法正常执行时,failed永远是为false,不会执行cancelAcquire方法,只有方法抛出异常,failed为true进入finally以后才会调用cancelAcquire方法
boolean failed = true;
try {
//
boolean interrupted = false;
// 自旋操作
for (;;) {
final Node p = node.predecessor();
// 如果当前节点的前置结点为头节点,而且当前线程尝试获取锁成功了,那么直接返回即可
// 在这个地方,头节点其实是一个空节点,只是充当一个标识符,头节点的next节点才是真正需要拿锁的节点,当他拿到锁以后,它就变成了头节点,并且会将p节点置为空,使它参与垃圾回收。
// 判断当前节点有没有资格拿锁
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// 判断当前线程是否需要挂起,减少自旋等待的线程,减少cpu的压力,提升性能
// shouldParkAfterFailedAcquire返回true,则当前线程需要挂起
// 如果当前线程需要挂起,并且成功挂起,而且Thread.interrupted()的放回为true时,interrupted会被置为true
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
// cancelAcquire就是将node.waitStatus置为cancel,并进行一定的清理操作
if (failed)
cancelAcquire(node);
}
}
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
// 当waitStatus>0时,只可能是cancel状态,此时直接将节点从等待队列删除
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
// 删除前置结点为cancel的node
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
// cas操作将前置节点的waitStatus置为signal
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
/* 这里返回Thread.interrupted()意欲何为?若当前线程被挂起,
那么在此期间,很有可能有aqs以外的其他操作中断这个线程,
调用了该线程的interrupt方法,(如果线程调用wait或sleep等方法挂起,
那么interrupt方法可能会抛出异常),使用中断原语LockSupport.park挂起后,
再中断是不会抛出异常的,如果其他某个地方调用了该线程的interrupt方法,
只会改变线程内部的中断状态值,所以我们需要通过变量记录这个值。*/
/* 如果外部调用了线程的interrupt,那么该线程被唤醒的时候,
此时interrupted为true,会将中断信息带到外层,这样将会调用外层的selfInterrupt()方法*/
/* 当线程处于等待队列时,无法响应外部的中断请求,
只有当这个线程拿到锁之后再进行中断响应*/
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
static void selfInterrupt() {
Thread.currentThread().interrupt();
}
释放锁
tryRelease也是将实现交给继承它的子类,提高编程的自由度
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}
release中,如果尝试释放锁成功,那么将尝试唤醒队列中的其他节点
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
/* 传进来的是头节点,该方法是为了唤醒头节点后的node,
此时head节点其实已经完成了所有的操作,需要先将他的waitStatus置为0,
不影响其他函数的判断*/
/* 然后程序开始从这个先进先出的队列的尾节点开始搜索,
找到除了head节点以外最靠前的waitStatus小于等于0的节点,
然后唤醒该线程,当线程被唤醒后它会自行执行acquireQueued方法,
尝试自旋的获取锁。当他被唤醒之前还被阻塞在那里。*/
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
为什么唤醒操作不直接从头开始搜索,而是从后往前搜索?
从后往前遍历是因为插入的时候,cas和next节点赋值的时候可能会有其他线程打断,导致从前往后遍历会出现null。
在enq方法里,在执行“if(compareAndSetTail(t, node)){//…}”时,cas是原子操作,但是当cas成功(tail指向当前node),执行if代码块里的内容时,此时不是同步操作。这个时刻,node与前一个节点t之间,node的prev指针在cas操作之前已经建立,而t的next指针还未建立。此时若其他线程调用了unpark操作,从头开始找就无法遍历完整的队列,而从后往前找就可以
参考
以上是关于AQS原理初探的主要内容,如果未能解决你的问题,请参考以下文章