JUC系列LOCK框架系列之五 核心锁类AbstractQueuedSynchonizer
Posted 顧棟
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了JUC系列LOCK框架系列之五 核心锁类AbstractQueuedSynchonizer相关的知识,希望对你有一定的参考价值。
初步认识AbstractQueuedSynchonizer
文章目录
源码采用JDK8
什么是AQS
AQS是用来构建锁或者其他同步组件的基础框架,它使用了一个int的变量表示同步状态,通过内置的FIFO队列来完成资源获取线程的排队工作。
它的主要使用方式是继承,子类通过继承AQS并实现他的抽象方法来管理同步状态,在抽象方法的实现过程中免不了要对同步状态进行修改,AQS为此提供了三个方法(getState()
、setState(int newState)
、compareAndSetState(int expect, int update)
)来操进行作操作,它们可以保证修改是安全的。使用自定义同步组件的静态内部类来实现子类,同步器自身没有实现任何同步接口,它仅仅定义了一些关于同步状态获取和释放的方法来使用。AQS既支持抢占式的获取同步状态,也支持共享式的获取同步状态。
AQS的接口和示例
AQS的设计基于模板方法模式的,这就需要我们继承AQS并重写指定的方法,随后AQS组合在自定义的组件实现中,并调用同步器提供的模板方法,而这些模板方法会调用我们重写的方法。子类需要重写的三个方法:
- getState() :获取当前同步状态
- setState(int newState):设置当前同步状态
- compareAndSetState(int expect, int update):使用CAS设置当前状态,该方法能够保证状态设置的原子性
AQS可以被重写的方法
方法名称 | 描述 |
---|---|
protected boolean tryAcquire(int arg) | 独占式获取同步状态,实现的时候需要查询当前状态并判断是否符合预期,再使用CAS设置同步状态 |
protected boolean tryRelease(int arg) | 独占式释放同步状态,等待获取同步状态的线程将有机会获取同步状态 |
protected int tryAcquireShared(int arg) | 共享式获取同步状态,返回大于等于0的值,表示获取成功,否知失败 |
protected boolean tryReleaseShared(int arg) | 共享式释放同步状态 |
protected boolean isHeldExclusively() | 当前AQS是否在独占模式下被线程占用,一般该方法表示是否被当前线程所独占 |
AQS提供的模板方法基本分为3类
- 独占式获取与释放同步状态
- 共享式获取与释放同步状态
- 查询同步队列中的等待线程情况
方法名称 | 描述 |
---|---|
void acquire(int arg) | 独占式获取同步状态,如果当前线程获取同步状态成功则返回,否则进入同步队列等待,该发放会去调用重写的tryAcquire(int arg) |
void acquireInterruptibly(int arg) throws InterruptedException | 与acquire(int arg)的差异是 这个方法响应中断,当前线程在等待队列中,如果被中断,那么该方法会抛出异常并返回 |
boolean tryAcquireNanos(int arg, long nanosTimeout) throws InterruptedException | 在acquireInterruptibly(int arg)基础上增加了超时机制,在规定时间内取不到同步状态,也会以false返回。 |
boolean release(int arg) | 独占式释放同步状态,在释放同步状态后,会将等待队列的第一个节点对应的线程唤醒 |
void acquireShared(int arg) | 共享式获取同步状态,如果当前线程获取同步状态成功则返回,否则进入同步队列等待,该发放会去调用重写的tryAcquire(int arg),与独占式不同,可以有多个线程获取到同步状态。 |
void acquireSharedInterruptibly(int arg) throws InterruptedException | 与acquireShared(int arg)的差异是 这个方法响应中断,当前线程在等待队列中,如果被中断,那么该方法会抛出异常并返回 |
boolean tryAcquireSharedNanos(int arg, long nanosTimeout) throws InterruptedException | 在acquireSharedInterruptibly(int arg)基础上增加了超时机制,在规定时间内取不到同步状态,也会以false返回。 |
boolean releaseShared(int arg) | 共享式释放同步状态, |
Collection getQueuedThreads() | 获取等待队列中的线程集合 |
官网例子
在JDK1.8源码里面有提供了一个独占锁的实现例子Mutex自定义同步组件,它在同一时刻只能有一个线程能获取到锁。在Mutex中定义一个静态内部类Sync。
Sync继承了AQS,并使用getState()
、setState(int newState)
、compareAndSetState(int expect, int update)
重写了isHeldExclusively()
、tryAcquire(int acquires)
、tryRelease(int releases)
方法。
import java.io.IOException;
import java.io.ObjectInputStream;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
public class Mutex implements Lock, java.io.Serializable
// 继承AQS的子类,静态内部类形式
private static class Sync extends AbstractQueuedSynchronizer
// 是否是独占的
@Override
protected boolean isHeldExclusively()
return getState() == 1;
// 当同步状态是0时,获取锁
@Override
public boolean tryAcquire(int acquires)
assert acquires == 1; // Otherwise unused
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
return true;
return false;
// 释放锁将 同步状态改为0
@Override
protected boolean tryRelease(int releases)
assert releases == 1; // Otherwise unused
if (getState() == 0)
throw new IllegalMonitorStateException();
setExclusiveOwnerThread(null);
setState(0);
return true;
// 提供一个ConditionObject
Condition newCondition()
return new ConditionObject();
// Deserializes properly
private void readObject(ObjectInputStream s)
throws IOException, ClassNotFoundException
s.defaultReadObject();
setState(0); // reset to unlocked state
// sync完成所有艰苦的工作。 我们是需要将操作代理到sync上。
private final Sync sync = new Sync();
@Override
public void lock()
sync.acquire(1);
@Override
public void lockInterruptibly() throws InterruptedException
sync.acquireInterruptibly(1);
@Override
public boolean tryLock()
return sync.tryAcquire(1);
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException
return sync.tryAcquireNanos(1, unit.toNanos(time));
@Override
public void unlock()
sync.release(1);
@Override
public Condition newCondition()
return sync.newCondition();
编写测试MutexTest类,会使用AQS的模板方法进行操作。通过lock()
进行获取锁(内部调用了Sync的tryAcquire),unlock()
(Sync的tryRelease)解锁。
public class MutexTest
public static void main(String[] args)
Mutex lock = new Mutex();
ExecutorService executorService = Executors.newCachedThreadPool();
for (int i = 0; i < 5; i++)
executorService.execute(() ->
lock.lock();
try
System.out.println(Thread.currentThread() + " get lock");
Thread.sleep(1000);
catch (InterruptedException e)
e.printStackTrace();
finally
System.out.println(Thread.currentThread() + " release lock");
lock.unlock();
);
executorService.shutdown();
通过结果,可以看出一次只有一个线程获取到锁。
Thread[pool-1-thread-1,5,main] get lock. 17:21:23
Thread[pool-1-thread-1,5,main] release lock. 17:21:24
Thread[pool-1-thread-2,5,main] get lock. 17:21:24
Thread[pool-1-thread-2,5,main] release lock. 17:21:25
Thread[pool-1-thread-3,5,main] get lock. 17:21:25
Thread[pool-1-thread-3,5,main] release lock. 17:21:26
Thread[pool-1-thread-4,5,main] get lock. 17:21:26
Thread[pool-1-thread-4,5,main] release lock. 17:21:27
Thread[pool-1-thread-5,5,main] get lock. 17:21:27
Thread[pool-1-thread-5,5,main] release lock. 17:21:28
AQS实现分析
核心思想
如果被请求的共享资源空闲,则将当前请求资源的线程设置为有效的工作线程,并且将共享资源设置为锁定状态。如果被请求的共享资源被占用,那么就需要一套线程阻塞等待以及被唤醒时锁分配的机制,这个机制在AQS是用CLH队列锁实现的,即将暂时获取不到锁的线程加入到队列中。队列有同步队列(sync queue)和条件队列(condition queue)。
CLH(Craig,Landin,and Hagersten)队列是一个虚拟的双向队列(虚拟的双向队列,即不存在队列实例,仅存在结点之间的关联关系)。AQS是将每条请求共享资源的线程封装成一个CLH锁队列的一个结点(Node)来实现锁的分配。
我的理解就是没有用类似QUEUE那样的队列的实例,而是通过NODE中存放前后结点PreNode和NextNode形成一种双向链表似的关系
sync queue 同步队列
AQS使用一个int成员变量来表示同步状态,通过内置的FIFO队列来完成获取资源线程的排队工作。AQS使用CAS操作对该同步状态进行原子操作实现对其值的修改。CAS操作主要借助sun.misc.Unsafed类来实现。
// 代表同步状态的变量
private volatile int state;
同步队列中的结点用来保存获取同步状态失败的线程引用、等待状态以及前驱和后继结点,结点的属性类型与名称, 源码如下
static final class Node
// 模式,分为共享与独占
// 共享模式
static final Node SHARED = new Node();
// 独占模式
static final Node EXCLUSIVE = null;
// 结点状态的值
// CANCELLED,值为1,表示当前的线程被取消(由于等待超时或者中断)
// SIGNAL,值为-1,表示当前结点的后继结点包含的线程需要唤醒(处于等待状态),也就是unpark。当前结点的线程释放同步状态或者取消了,将唤醒后继结点的线程
// CONDITION,值为-2,表示当前节点在等待condition,也就是在condition队列中。
// PROPAGATE,值为-3,表示当前场景下后续的acquireShared能够得以执行。共享式同步状态获取的时候将会无条件传播下去
// 值为0,初始值。表示当前节点在sync队列中,等待着获取锁
static final int CANCELLED = 1;
static final int SIGNAL = -1;
static final int CONDITION = -2;
static final int PROPAGATE = -3;
// 结点状态
volatile int waitStatus;
/**
* 前驱结点
*/
volatile Node prev;
/**
* 后继结点
*/
volatile Node next;
/**
* 结点所对应的线程
*/
volatile Thread thread;
/**
* 下一个等待者 后继结点
*/
Node nextWaiter;
/**
* 节点是否在共享模式下等待 当Returns true if node is waiting in shared mode.
*/
final boolean isShared()
return nextWaiter == SHARED;
/**
* 获取前驱结点,若前驱结点为空,抛出异常
*/
final Node predecessor() throws NullPointerException
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
//无参构造方法
Node() // 用于建立初始头部或共享标记
Node(Thread thread, Node mode) // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
Node(Thread thread, int waitStatus) // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread;
同步队列的数据结构
在AQS中有两个结点类型的引用,head是指向头结点(状态值不会是1),tail是指向尾结点。
/**
* Head of the wait queue, lazily initialized. Except for
* initialization, it is modified only via method setHead. Note:
* If head exists, its waitStatus is guaranteed not to be
* CANCELLED.
*/
private transient volatile Node head;
/**
* Tail of the wait queue, lazily initialized. Modified only via
* method enq to add new wait node.
*/
private transient volatile Node tail;
独占式同步状态获取与释放
以上述官网独占式获取同步状态为例,在使用sync.acquire(1);
获取同步状态失败的时候,会执行addWaiter(Node.EXCLUSIVE), arg)
,先通过compareAndSetTail(pred, node)
尝试快速填充队尾,如果填充失败或者当没有尾结点时,去调用enq(final Node node)
进行队列初始化,通过compareAndSetHead(Node update)
和compareAndSetTail(Node expect, Node update)
来设置AQS的head结点和tail结点。在完成addWaiter之后,继续执行acquireQueued(final Node node, int arg)
,如果当前结点的前驱结点是首结点,再次尝试获同步状态,若成功,将当前结点更新head结点,若失败,进行线程park,等待前驱结点释放锁唤醒当前线程,若期间当前线程中断,也会被唤醒。若发生异常情况, 会通过cancelAcquire(Node node)
取消继续获取(资源)。
锁获取的主要流程图如下:
独占锁的释放主要通过调用release(int arg)
来释放锁。
源码部分
addWaiter(Node mode)
private Node addWaiter(Node mode)
// 为当前线程构建一个Node,独占模式
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
// 当队列尾结点不为空,快速填充队尾
if (pred != null)
node.prev = pred;
// 比较pred是否为尾结点,是则将尾结点设置为node
if (compareAndSetTail(pred, node))
pred.next = node;
return node;
// 尾结点为空(即还没有被初始化过),或者是compareAndSetTail操作失败,则入队列
enq(node);
return node;
enq(final Node node)
private Node enq(final Node node)
// 无限循环 势必将结点加入队列中
for (;;)
// 获取AQS当前尾结点
Node t = tail;
// 如果尾结点是null,则进行初始化,新建一个空Node同时作为head结点和tail结点
if (t == null) // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
else
// 尾结点不为空,即已经被初始化过
// 将当前尾结点作为node结点的前置结点
node.prev = t;
// 比较结点t是否为尾结点,若是则将尾结点设置为node
if (compareAndSetTail(t, node))
t.next = node;
return t;
acquireQueued(final Node node, int arg)
// 以独占不间断模式获取已在队列中的线程。
final boolean acquireQueued(final Node node, int arg)
boolean failed = true;
try
// 中断标志
boolean interrupted = false;
// 无限循环
for (;;)
// 获取前置结点
final Node p = node.predecessor();
// 如果前置结点是head且当前线程成功获取到同步状态,将自身结点变为head结点,返回中断标记
if (p == head && tryAcquire(arg))
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
// 当获取资源失败,更新结点状态并阻塞线程,返回其中断标识
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
finally
// 取消加入队列失败的节点的资源获取
if (failed)
cancelAcquire(node);
shouldParkAfterFailedAcquire(Node pred, Node node)
// 当获取(资源)失败后,检查并且更新结点状态--只有当该节点的前驱结点的状态为SIGNAL时,才可以对该结点所封装的线程进行park操作。否则,将不能进行park操作。
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node)
// 前置结点的状态
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/*
* 前置节点已经设置了使后置结点阻塞等待的信号,因此它可以安全地park。
*/
return true;
if (ws > 0)
/*
* 前置结点已经取消了等待该锁,从前置结点向前遍历,找到未取消的节点,设置为当前节点的前置结点
*/
do
node.prev = pred = pred.prev;
while (pred.waitStatus > 0);
pred.next = node;
else
/*
* waitStatus必须为0或PROPAGATE。我们需要信号,不是立即park。调用者将需要重试,以确保在park前。它不能获得同步状态。
* 尝试将前驱结点的信号变为SIGNAL
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
return false;
parkAndCheckInterrupt()
private final boolean parkAndCheckInterrupt()
// 将其线程阻塞--线程被唤醒后或中断后会在此后继续执行
LockSupport.park(this);
// 返回当前线程是否已被中断,并对中断标识位进行复位
return Thread.interrupted();
cancelAcquire(Node node)
// 取消继续获取(资源)
private void cancelAcquire(Node node)
// 忽略结点已经不存在的情况
if (node == null)
return;
// 清空node结点的thread
node.thread = null;
// Skip cancelled predecessors
// 保存node的前驱结点,如果前驱节点已经是取消的状态,则一直向前遍历,取不是取消状态的结点作为当前结点的前驱结点
Node pred = node.prev;
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
// predNext is the apparent no以上是关于JUC系列LOCK框架系列之五 核心锁类AbstractQueuedSynchonizer的主要内容,如果未能解决你的问题,请参考以下文章
JUC系列LOCK框架系列之六 核心锁类之ReentrantLock
JUC系列LOCK框架系列之三 同步工具类 CountDownLatch
JUC系列LOCK框架系列之四 同步工具类Semaphore