源码剖析:AQS-AbstractQueuedSynchronizer
Posted talk.push
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了源码剖析:AQS-AbstractQueuedSynchronizer相关的知识,希望对你有一定的参考价值。
文章目录
有的时候在一段代码上花费大量时间,在一段代码上反复花时间,都是值得的。本文的jdk版本是1.8.
本文将演示AQS如何在多线程场景下进行入队和出队操作。
从一段代码说起
这段代码中通过ReentranLock来演示在多线程环境下,ReentranLock是如何通过底层的AQS对象来实现同步的。
package com.jeff.study.concurrent.aqs;
import java.io.IOException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
import java.util.concurrent.locks.ReentrantLock;
/**
* @Description AQS的抢锁和入队过程
* @Date 2021/1/31 7:36 下午
* @Author jeff.sheng
* @see AbstractQueuedSynchronizer
* @see ReentrantLock
*/
public class AQSDemo
public static void main(String[] args) throws InterruptedException
ReentrantLock lock = new ReentrantLock();
ExecutorService executorService = Executors.newFixedThreadPool(2);
Runnable runnable = () ->
System.out.println("当前线程: " + Thread.currentThread().getName() + " 请输入:");
lock.lock();
try
System.in.read();
catch (IOException e)
e.printStackTrace();
finally
lock.unlock();
;
executorService.execute(runnable);
executorService.execute(runnable);
executorService.awaitTermination(200, TimeUnit.SECONDS);
executorService.shutdown();
AbstractQueuedSynchronizer
AbstractQueuedSynchronizer,抽象队列同步器。包含了state、head和tail两个Node类型的变量。
Node是AbstractQueuedSynchronizer的内部类,提供属性如下:
- prev:类型为Node,表示当前节点的前一个节点。
- next:类型为Node,表示当前节点的后一个节点。
- waitStatus:表示当前节点线程的状态。比如CANCELLED(1)、SIGNAL(-1)、CONDITION(-2)、PROPAGATE(-3).
- thread:表示当前AQS队列中当前节点的线程对象。
- nextWaiter:AQS队列中下一个等待节点。
ReentranLock
ReentranLock跟AbstractQueuedSynchronizer的关系
ReentranLock类内部定义了一个Sync类型的变量,而Sync则是ReentranLock的一个抽象的静态内部类,继承了AbstractQueuedSynchronizer抽象类。
abstract static class Sync extends AbstractQueuedSynchronizer
...
而AbstractQueuedSynchronizer又继承了AbstractOwnableSynchronizer类,这个类需要重点看下,因为里边有一个AQS的独占线程对象方法setExclusiveOwnerThread:
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements Serializable
.....
package java.util.concurrent.locks;
import java.io.Serializable;
public abstract class AbstractOwnableSynchronizer implements Serializable
private static final long serialVersionUID = 3737899427754241961L;
private transient Thread exclusiveOwnerThread;
protected AbstractOwnableSynchronizer()
// 设置AQS的独占线程对象
protected final void setExclusiveOwnerThread(Thread thread)
this.exclusiveOwnerThread = thread;
protected final Thread getExclusiveOwnerThread()
return this.exclusiveOwnerThread;
Sync则继续提供了两种实现,也就是FairSync和NonfailSync.也就是我们常说的公平锁和非公平锁。ReentranLock则提供了非公平Sync的默认构造器,且提供了可选的构造器。
/**
* Creates an instance of @code ReentrantLock.
* This is equivalent to using @code ReentrantLock(false).
*/
public ReentrantLock()
sync = new NonfairSync();
/**
* Creates an instance of @code ReentrantLock with the
* given fairness policy.
*
* @param fair @code true if this lock should use a fair ordering policy
*/
public ReentrantLock(boolean fair)
sync = fair ? new FairSync() : new NonfairSync();
NonfairSync
ReentranLock的内部非公平实现NonfairSync代码如下:
/**
* Sync object for non-fair locks
*/
static final class NonfairSync extends Sync
private static final long serialVersionUID = 7316153563782823691L;
/**
* Performs lock. Try immediate barge, backing up to normal
* acquire on failure.
*/
final void lock()
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
protected final boolean tryAcquire(int acquires)
return nonfairTryAcquire(acquires);
lock方法
我们先看下lock方法:
final void lock()
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
当线程T1进入时,通过CAS操作修改state变量的状态.
- 如果此时没有其他线程一起竞争,则返回true。然后设置当前独占锁的线程对象为当前线程(Thread.currentThread()).
- 如果有线程T2一起竞争,则进入acquire(1)方法。
acquire方法
acquire方法由NonfairSync的父类AbstractQueuedSynchronizer提供。
public final void acquire(int arg)
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
我们先看tryAcquire方法,这个方法由NonfairSync实现。
protected final boolean tryAcquire(int acquires)
return nonfairTryAcquire(acquires);
nonfairTryAcquire方法由ReentranLock提供。
final boolean nonfairTryAcquire(int acquires)
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0)
if (compareAndSetState(0, acquires))
setExclusiveOwnerThread(current);
return true;
else if (current == getExclusiveOwnerThread())
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
return false;
当线程T1和T2都过来时,先获取AQS的state值,state值是一个volitale变量。
- 如果T1和T2都发现state值等于0则进行CAS操作设置state的值为1,但最终只有一个线程设置成功,并设置独占锁的线程为当前线程。假如T1抢占锁成功则返回true。整个流程结束。那么T2则返回false,抢锁失败!
- 如果T1先进来抢占锁成功了,那么state值就会通过CAS操作设置为1.由于volitale变量的可见性,T2进来时发现state不是0,而且发现当前抢锁成功的线程是T1,就直接返回false了,抢锁失败!
- 如果T1先抢锁成功且未释放锁,然后又再次进入此方法,那么发现自己就是拥有锁的线程,则给state值加1,锁重入。
以上我们说明了T1线程运气好抢锁成功的场景,而T2线程运气差总是抢锁失败。那么T2抢占锁失败之后返回了false,会如何呢?可以从tryAcquire方法看到,抢锁失败之后,会进一步执行下边的判断:
acquireQueued(addWaiter(Node.EXCLUSIVE), arg)
我们来看下addWaiter方法。
addWaiter方法
Node(Thread thread, Node mode) // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
/**
* Creates and enqueues node for current thread and given mode.
*
* @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
* @return the new node
*/
private Node addWaiter(Node mode)
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null)
node.prev = pred;
if (compareAndSetTail(pred, node))
pred.next = node;
return node;
enq(node);
return node;
addWaiter方法传进来的参数是Node.EXCLUSIVE,看一下Node节点中对它的解释:
static final class Node
/** Marker to indicate a node is waiting in shared mode */
static final Node SHARED = new Node();
/** 标记Node节点是独占模式等待状态 */
static final Node EXCLUSIVE = null;
..........
T2虽然抢锁失败了,但是T2将会以独占模式节点的状态继续等待。
T2进来addWaiter方法时,会包装为一个Node节点。假设T2进入时,此时AQS的tail节点尚未初始化。也就是说AQS对象的tail和head两个头尾Node节点都是NULL。
那么就会进入enq方法先初始化:
/**
* Inserts node into queue, initializing if necessary. See picture above.
* @param node the node to insert
* @return node's predecessor
*/
private Node enq(final Node node)
for (;;)
Node t = tail;
if (t == null) // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
else
node.prev = t;
if (compareAndSetTail(t, node))
t.next = node;
return t;
可以看到tail赋值给t,t此时为null’就进行头结点的初始化了,然后把初始化的head节点引用给了tail。
继续for循环,tail就不为空了,tail赋值给t也不为空,就是初始化的那个节点。将其赋值给T2独占等待状态节点的prev节点。
然后将t,也就是尾结点TAIL更新为T2独占等待节点。
这样以来T2独占等待节点就成了AQS队列中的一个尾节点。接下来最重要的获取AQS队列节点的操作来了。
acquireQueued方法
/**
* Acquires in exclusive uninterruptible mode for thread already in
* queue. Used by condition wait methods as well as acquire.
*
* @param node the node
* @param arg the acquire argument
* @return @code true if interrupted while waiting
*/
final boolean acquireQueued(final Node node, int arg)
boolean failed = true;
try
boolean interrupted = false;
for (;;)
final Node p = node.predecessor();
if (p == head && tryAcquire(arg))
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
finally
if (failed)
cancelAcquire(node);
方法传递进来的参数就是T2独占等待节点,根据前边的描述,它的前一个节点就是哨兵节点也就是head节点,此时T2虽然抢锁失败而进入队列,但是它并不死心,它觉得此时T1可能已经释放锁了,所以想再次tryAcquire一次,如果真的如T2所愿,T1执行结束真的unlock的话,就会执行以下代码并返回interrupted值为false。
if (p == head && tryAcquire(arg))
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
/**
*/将T2独占等待节点指针设置给head节点,也就是T2节点将从队列中出队。设置thread和prev都为null。
*/
private void setHead(Node node)
head = node;
node.thread = null;
node.prev = null;
但如果是T2想的美呢?也就是T1根本就没结束还在持有锁,那么tryAcquire获取的state值就不是0而且持有锁的线程还是T1.那么不好意思,T2线程Acquire失败了,此时将尝试park住T2线程。
/**
* Checks and updates status for a node that failed to acquire.
* Returns true if thread should block. This is the main signal
* control in all acquire loops. Requires that pred == node.prev.
*
* @param pred node's predecessor holding status
* @param node the node
* @return @code true if thread should block
*/
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node)
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
if (ws > 0)
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do
node.prev = pred = pred.prev;
while (pred.waitStatus > 0);
pred.next = node;
else
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
return false;
通过debug来走读下这段代码吧!
T2进来之后,先判断pred节点,也就是T2独占等待节点的prev节点的waitStatus状态,可以看到值为0,回顾下开始时提到的waitStatus:
waitStatus:表示当前节点线程的状态。比如CANCELLED(1)、SIGNAL(-1)、CONDITION(-2)、PROPAGATE(-3).
所以都不满足,于是就使用CAS更新prev节点waitStatus的状态(更新为SIGNAL=-1):
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
然后继续执行acquireQueued方法中的下一次for循环,仍然进入了shouldParkAfterFailedAcquire方法中,但这一次由于waitStatus=-1满足第一个if条件,于是返回true。再继续判断parkAndCheckInterrupt方法是否满足条件:
private final boolean parkAndCheckInterrupt()
LockSupport.park(this);
return Thread.interrupted();
当T2线程进入调用LockSupport.park方法时,T2线程阻塞在这里了。此时此刻,T1线程阻塞在:
System.in.read()
T2线程阻塞在LockSupport.park方法。通过调用jstack方法可以观察到此时的线程stack信息:
而T1线程则是RUNABLE状态:
当在控制台输入一行字符串时,T2线程返回了。因为T1线程此时unlock了。
然后T2线程调用Thread.interruped方法检测线程T2是否被中断,本次返回了false。(
这里如果检测到T2被中断则清除中断标识返回true,如果返回了true则会调用Thread.currentThread().interrupt();重新还原T2的中断标识。)
但是由于此时T1线程已经unlock,AQS的state变量值变为0了。T2线程成功的获取到了锁,返回true。
unlock方法
执行完业务逻辑之后,在finally中通常会调用比如ReentranLock的unlock方法,这里我们就看下ReentranLock的unlock方法。
/**
* Attempts to release this lock.
*
* <p>If the current thread is the holder of this lock then the hold
* count is decremented. If the hold count is now zero then the lock
* is released. If the current thread is not the holder of this
* lock then @link IllegalMonitorStateException is thrown.
*
* @throws IllegalMonitorStateException if the current thread does not
* hold this lock
*/
public void unlock()
sync.release(1);
同样是调用了AQS的release方法,其中可以看到tryRelease这个方法,这个方法是ReentranLock提供的。
/**
* Releases in exclusive mode. Implemented by unblocking one or
* more threads if @link #tryRelease returns true.
* This method can be used to implement method @link Lock#unlock.
*
* @param arg the release argument. This value is conveyed to
* @link #tryRelease but is otherwise uninterpreted and
* can represent anything you like.
* @return the value returned from @link #tryRelease
*/
public final boolean release(int arg)
if (tryRelease(arg))
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
return false;
确切的说,实现了AQS的对象很多都提供了这个方法。而AQS抽象类中提供了一个抛出异常的实现:
protected boolean tryRelease(int arg)
throw new UnsupportedOperationException();
//ReentranLock提供
protected final boolean tryRelease(int releases)
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0)
free = true;
setExclusiveOwnerThread(null);
setState(c);
return free;
这里的逻辑很明显了,就是要释放state和独占线程这两个资源给其他线程用了。回到AQS的release方法,如果tryRelease执行成功,则会对AQS队列中下一个Node进行unparkSuccessor操作。AQS提供了实现:
/**
* Wakes up node's successor, if one exists.
*
* @param node the node
*/
private void unparkSuccessor(Node node)
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;
if (s == null || s.waitStatus > 0)
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
if (s != null)
LockSupport.unpark(s.thread);
可以看到,在找到了后继者之后唤醒了这个节点的线程:
LockSupport.unpark(s.thread);
公平锁和非公平锁的区别
代码一贴,疑惑全解。
/**以上是关于源码剖析:AQS-AbstractQueuedSynchronizer的主要内容,如果未能解决你的问题,请参考以下文章
java集合框架源码剖析系列java源码剖析之TreeMap