AQS
Posted angelica-duhurica
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了AQS相关的知识,希望对你有一定的参考价值。
java.util.concurrent.locks.AbstractQueuedSynchronizer
ReentrantLock
、Semaphore
、CountDownLatch
都有一个内部类Sync
,而所有的Sync都是继承自AbstractQueuedSynchronizer
。
AQS核心是通过一个共享变量来同步状态,变量的状态由子类去维护,而AQS框架做的是:
- 线程阻塞队列的维护
- 线程阻塞和唤醒
共享变量的修改都是通过Unsafe
类提供的CAS操作完成的。AbstractQueuedSynchronizer类的主要方法是acquire
和release
,典型的模板方法, 下面这4个方法由子类去实现:
// Main exported methods
protected boolean tryAcquire(int arg) throw new UnsupportedOperationException();
protected boolean tryRelease(int arg) throw new UnsupportedOperationException();
protected int tryAcquireShared(int arg) throw new UnsupportedOperationException();
protected boolean tryReleaseShared(int arg) throw new UnsupportedOperationException();
acquire方法用来获取锁,返回true说明线程获取成功继续执行,一旦返回false则线程加入到等待队列中,等待被唤醒,release方法用来释放锁。 一般来说实现的时候这两个方法被封装为lock
和unlock
方法。
等待的线程是按照阻塞时的顺序依次获取到锁的,这是因为AQS是基于CLH lock queue
的一个变种来实现线程阻塞队列的。
CLH lock queue
CLH lock queue
其实就是一个FIFO的队列,队列中的每个结点(线程)只要等待其前继释放锁就可以了。
通常就是用CLH lock queue
来实现自旋锁(spin lock),简单来说就是线程通过循环来等待而不是睡眠。
AQS中线程不是一直在自旋的,而可能会反复的睡眠和唤醒,这就需要前继释放锁的时候通过next 指针找到其后继将其唤醒,也就是AQS的等待队列中后继是被前继唤醒的。AQS结合了自旋和睡眠/唤醒两种方法的优点。其中线程的睡眠和唤醒就是用到LockSupport
。
LockSupport
阻塞和唤醒是对于线程来说的,LockSupport的park/unpark更符合这个语义,以“线程”作为方法的参数, 语义更清晰。Object
对象的wait和notify方法的实现使得“线程”的阻塞/唤醒对线程本身来说是被动的。
LockSupport
并不需要获取对象的监视器。LockSupport机制是每次unpark
给线程有且仅有1个许可,而park
则相反,如果当前线程有许可,那么park方法会消耗许可并返回,否则会阻塞线程直到线程重新获得许可,在线程启动之前调用park/unpark
方法没有任何效果。
用来创建锁和其他同步类的基本线程阻塞原语。
// 解除阻塞线程,不会遇到Thread.resume所可能引发的死锁问题。
public static void unpark(Thread thread)
if (thread != null)
UNSAFE.unpark(thread);
// 阻塞线程,不会遇到Thread.suspend所可能引发的死锁问题。
// 和Object的wait一样也能响应中断,但是跟Thread.sleep()不同的是它不会抛出InterruptedException。
public static void park(Object blocker)
Thread t = Thread.currentThread();
setBlocker(t, blocker);
UNSAFE.park(false, 0L);
// 保证在park(Object blocker)整个函数执行完后,该线程的parkBlocker字段又恢复为null
setBlocker(t, null);
private static final sun.misc.Unsafe UNSAFE;
AbstractQueuedSynchronizer
抽象类,其为实现依赖于先进先出 (FIFO) 等待队列的阻塞锁和相关同步器(信号量、事件等)提供一个框架。此类的设计目标是成为依靠单个原子 int 值来表示状态的大多数同步器的一个有用基础。
底层的数据结构是使用双向链表,是队列的一种实现,故也可看成是队列,其中Sync queue,即同步队列,是双向链表,包括head结点和tail结点,head结点主要用作后续的调度。
static final class Node
// 标记当前结点是共享模式
static final Node SHARED = new Node();
// 标记当前结点是独占模式
static final Node EXCLUSIVE = null;
// 表示当前的线程被取消
static final int CANCELLED = 1;
// 表示当前节点的后继节点包含的线程需要运行,需要进行unpark操作
static final int SIGNAL = -1;
// 表示当前节点在condition queue中,在等待condition
static final int CONDITION = -2;
// 代表后续结点会传播唤醒的操作,共享模式下起作用
static final int PROPAGATE = -3;
// 结点的等待状态
volatile int waitStatus;
volatile Node prev;
volatile Node next;
// 拥有当前结点的线程
volatile Thread thread;
Node nextWaiter;
final boolean isShared()
return nextWaiter == SHARED;
final Node predecessor() throws NullPointerException
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
Node() // Used to establish initial head or SHARED marker
Node(Thread thread, Node mode) // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
Node(Thread thread, int waitStatus) // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread;
// Sync queue,即同步队列,是双向链表
private transient volatile Node head;
private transient volatile Node tail;
// 自旋时间,doAcquireNanos方法的for循环用到了这个时间
static final long spinForTimeoutThreshold = 1000L;
// 实现了Condition接口
public class ConditionObject implements Condition, java.io.Serializable
/** First node of condition queue. */
private transient Node firstWaiter;
/** Last node of condition queue. */
private transient Node lastWaiter;
入队
private Node addWaiter(Node mode)
Node node = new Node(Thread.currentThread(), mode);
// 尝试快速入队操作,因为大多数时候尾节点不为 null
Node pred = tail;
// 这个if分支其实是一种优化:CAS操作失败的话才进入enq中的循环。
if (pred != null)
node.prev = pred;
if (compareAndSetTail(pred, node))
pred.next = node;
return node;
// 如果尾节点为空(也就是队列为空) 或者尝试CAS入队失败(由于并发原因),进入enq方法
enq(node);
return node;
private Node enq(final Node node)
for (;;)
Node t = tail;
if (t == null) // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
else
// 可以看到这一部分和上面是重复的
node.prev = t;
if (compareAndSetTail(t, node))
t.next = node;
return t;
出队
private void setHead(Node node)
head = node;
node.thread = null;
node.prev = null;
独占模式获取
// 以独占模式获取(资源),忽略中断,即线程在aquire过程中,中断此线程是无效的。
// tryAcquire由子类实现本身不会阻塞线程,如果返回true,则线程继续,
// 如果返回false那么就加入阻塞队列阻塞线程,并等待前继结点释放锁
public final void acquire(int arg)
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
// acquireQueued返回true,说明当前线程被中断唤醒后获取到锁,
// 重置其interrupt status为true。
selfInterrupt();
final boolean acquireQueued(final Node node, int arg)
boolean failed = true;
try
boolean interrupted = false;
// 等待前继结点释放锁
// 自旋re-check
// 支持超时的获取版本
for (;;)
final Node p = node.predecessor();
if (p == head && tryAcquire(arg))
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
// parkAndCheckInterrupt就是用LockSupport来阻塞当前线程
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
finally
if (failed)
cancelAcquire(node);
独占模式释放
public final boolean release(int arg)
if (tryRelease(arg))
Node h = head;
if (h != null && h.waitStatus != 0)
// 唤醒后续的结点
unparkSuccessor(h);
return true;
return false;
共享模式获取
// 如果没有许可了则入队等待
public final void acquireShared(int arg)
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
private void doAcquireShared(int arg)
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try
boolean interrupted = false;
for (;;)
final Node p = node.predecessor();
if (p == head)
int r = tryAcquireShared(arg);
if (r >= 0)
// 获取成功则前继出队,跟独占不同的是,
// 会往后面结点传播唤醒的操作,保证剩下等待的线程能够尽快获取到许可。
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
finally
if (failed)
cancelAcquire(node);
共享模式释放
public final boolean releaseShared(int arg)
if (tryReleaseShared(arg))
doReleaseShared();
return true;
return false;
Condition
public interface Condition
void await() throws InterruptedException;
void awaitUninterruptibly();
long awaitNanos(long nanosTimeout) throws InterruptedException;
boolean await(long time, TimeUnit unit) throws InterruptedException;
boolean awaitUntil(Date deadline) throws InterruptedException;
void signal();
void signalAll();
ReentrantLock
一个可重入的互斥锁 Lock,它具有与使用 synchronized 方法和语句所访问的隐式监视器锁相同的一些基本行为和语义,但功能更强大。
class ReentrantLock implements Lock
private final Sync sync;
abstract static class Sync extends AbstractQueuedSynchronizer
// 由子类实现分为公平和非公平
abstract void lock();
// 非公平方式获取
final boolean nonfairTryAcquire(int acquires)
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0)
if (compareAndSetState(0, acquires))
setExclusiveOwnerThread(current);
return true;
else if (current == getExclusiveOwnerThread())
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
return false;
protected final boolean tryRelease(int releases)
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0)
free = true;
setExclusiveOwnerThread(null);
setState(c);
return free;
// 是否被当前线程占有
protected final boolean isHeldExclusively()
return getExclusiveOwnerThread() == Thread.currentThread();
// 获取占有资源的线程
final Thread getOwner()
return getState() == 0 ? null : getExclusiveOwnerThread();
...
static final class NonfairSync extends Sync
final void lock()
// 比较并设置状态成功,状态0表示锁没有被占用
if (compareAndSetState(0, 1))
// 把当前线程设置独占了锁
setExclusiveOwnerThread(Thread.currentThread());
else
// 锁已经被占用,或者set失败
// 以独占模式获取对象,忽略中断
acquire(1);
protected final boolean tryAcquire(int acquires)
return nonfairTryAcquire(acquires);
static final class FairSync extends Sync
final void lock()
// 这里就是调用父类AQS的acquire方法
// 然后在AQS的方法里调用到下面的tryAcquire方法和AQS的addWaiter方法、acquireQueued方法
acquire(1);
protected final boolean tryAcquire(int acquires)
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0)
// 不存在已经等待更久的线程,比较并且设置状态成功
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires))
setExclusiveOwnerThread(current);
return true;
else if (current == getExclusiveOwnerThread())
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
return false;
ReentrantLock的绝大部分操作都是基于AQS类的。
CyclicBarrier
一个同步辅助类,它允许一组线程互相等待,直到到达某个公共屏障点 (common barrier point)。在涉及一组固定大小的线程的程序中,这些线程必须不时地互相等待,此时 CyclicBarrier 很有用。因为该barrier在释放等待线程后可以重用,所以称它为循环的barrier。
private final ReentrantLock lock = new ReentrantLock();
private final Condition trip = lock.newCondition();
// 对外提供的await函数在底层都是调用该了doawait函数
private int dowait(boolean timed, long nanos)throws InterruptedException, BrokenBarrierException, TimeoutException
final ReentrantLock lock = this.lock;
lock.lock();
try
final Generation g = generation;
// 判断屏障是否被破坏
if (g.broken)
throw new BrokenBarrierException();
// 判断线程是否被中断
if (Thread.interrupted())
// 损坏当前屏障,并且唤醒所有的线程
breakBarrier();
throw new InterruptedException();
// 判断等待进入屏障的线程数量
int index = --count;
if (index == 0) // tripped,所有线程都已经进入运行的动作标识
boolean ranAction = false;
try
final Runnable command = barrierCommand;
if (command != null)
command.run();
ranAction = true;
// 进入下一代,在所有线程进入屏障后会被调用,
// 即生成下一个版本,所有线程又可以重新进入到屏障中,唤醒所有线程
nextGeneration();
return 0;
finally
if (!ranAction)
breakBarrier();
// loop until tripped, broken, interrupted, or timed out
for (;;)
try
if (!timed)
trip.await();
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos);
catch (InterruptedException ie)
if (g == generation && ! g.broken)
breakBarrier();
throw ie;
else
Thread.currentThread().interrupt();
if (g.broken)
throw new BrokenBarrierException();
if (g != generation)
return index;
if (timed && nanos <= 0L)
breakBarrier();
throw new TimeoutException();
finally
lock.unlock();
CountDownLatch
一个同步辅助类,在完成一组正在其他线程中执行的操作之前,它允许一个或多个线程一直等待。
典型的用法是将一个程序分为n个互相独立的可解决任务,并创建值为n的CountDownLatch。当每一个任务完成时,都会在这个锁存器上调用countDown,等待问题被解决的任务调用这个锁存器的await,将他们自己拦住,直至锁存器计数结束。
// 内部类Sync
private final Sync sync;
public void await() throws InterruptedException
// 调用了Sync的tryAcquireShared和AQS的doAcquireSharedInterruptibly函数
// tryAcquireShared:试图在共享模式下获取对象状态
sync.acquireSharedInterruptibly(1);
public boolean await(long timeout, TimeUnit unit)
throws InterruptedException
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
// 递减锁存器的计数,如果计数到达零,则在共享模式下释放所有等待的线程
public void countDown()
sync.releaseShared(1);
public long getCount()
return sync.getCount();
CountDownLatch是采用共享模式,而ReentrantLock是采用独占模式。
Semaphore
一个计数信号量,从概念上讲,信号量维护了一个许可集。如有必要,在许可可用前会阻塞每一个acquire(),然后再获取该许可。每个release() 添加一个许可,从而可能释放一个正在阻塞的获取者。但是,Semaphore只对可用许可的号码进行计数,并采取相应的行动,不使用实际的许可对象。通常用于限制可以访问某些资源(物理或逻辑的)的线程数目。
与ReentrantLock的内部类的结构相同,Sync、NonfairSync、FairSync三个内部类。基于Semaphore对象的操作绝大多数都转移到了对sync的操作。
不会使用到AQS的条件队列。
ReentrantReadWriteLock
读写锁接口ReadWriteLock的实现类,它包括Lock子类ReadLock和WriteLock。ReadLock是共享锁,WriteLock是独占锁。
class ReentrantReadWriteLock implements ReadWriteLock
public interface ReadWriteLock
Lock readLock();
Lock writeLock();
五个内部类:Sync、NonfairSync、FairSync、ReadLock、WriteLock。
写锁数量由state的低十六位表示。读锁数量由state的高十六位表示。
可以实现多个线程同时读,此时,写线程会被阻塞。并且,写线程获取写入锁后可以获取读取锁,然后释放写入锁,这样写入锁变成了读取锁。
参考:
http://zhanjindong.com/tags/#AQS
https://www.cnblogs.com/leesf456/p/5453091.html
以上是关于AQS的主要内容,如果未能解决你的问题,请参考以下文章
AQS(AbstractQueuedSynchronizer)源码深度解析—AQS的设计与总体结构