AQS
Posted dushenzi
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了AQS相关的知识,希望对你有一定的参考价值。
简单解释一下J.U.C,即JDK中提供的并发工具包 java.util.concurrent 。里面提供了很多并发编程中很常用的实用工具类,比如:atomic原子操作、同步lock锁、fork/join等。
以Lock作为切入点
本文我们以Lock作为切入点来讲解AQS,如果单独讲解AQS的原理,比较晦涩难懂。毕竟,同步锁是解决线程安全问题的通用手段,也是我们工作中比较常用的方式。
Lock API
Lock是一个接口,方法定义如下:
public interface Lock { // 如果锁可用,就获取锁;如果锁不可用,就阻塞到锁释放 void lock(); // 与lock方法类似,但是阻塞的线程可以中断,抛出InterruptedException void lockInterruptibly() throws InterruptedException; // 非阻塞的方式获取锁,如果获取成功返回true;获取失败返回false boolean tryLock(); // 带有超时事件的非阻塞锁获取方法 boolean tryLock( long time, TimeUnit unit) throws InterruptedException; // 释放锁 void unlock(); } |
Lock接口常见实现类
- ReentrantLock:重入锁,它是唯一一个直接实现Lock接口的类。重入锁是指,当前线程在获取锁之后,再次获取锁的时候无需阻塞,直接将关联的计数器+1即可。
- ReentrantReadWriteLock:可冲入读写锁,它实现了ReadWriteLock接口。在这个类中维护了两把锁:ReadLock和WriteLock。读写锁是一种适合读多写少场景下,解决线程安全问题的工具,基本原则是:读和读不互斥,读和写互斥,写和写互斥。例如,我们可以用读写锁来实现RPC框架中,从注册中心同步服务提供方信息的场景。
ReentrantLock的简单使用
public class ReentrantLockDemoTest { private static ReentrantLock reentrantLock = new ReentrantLock(); private static int index = 0 ; private static void incr() { reentrantLock.lock(); try { Thread.sleep(1000L); index = index + 1 ; } catch (Exception e) { e.printStackTrace(); } finally { reentrantLock.unlock(); } } } |
这段代码只做了一件事,对静态变量index做自增。在没有同步锁的情况下,多线程操作会引发线程安全问题。所以用ReentrantLock来实现同步锁,并且在finally中释放同步锁。
那我们引出一个问题:
在多线程场景下,多个线程竞争lock锁;如果竞争失败,线程是如何实现等待和唤醒的呢?
什么是AQS?
AQS的全称是AbstractQueuedSynchronizer(抽象队列同步器),它提供了一个FIFO先进先出队列,可以看成是一个用来实现同步锁以及其他涉及到同步功能的核心组件。如Lock,CountDownLatch等。
AbstractQueuedSynchronizer是一个抽象组件,主要通过继承的方式来使用。它本身没有实现任何同步接口,仅仅是定义了同步状态的获取以及释放的方法,提供给自定义组件使用。
AQS的两种功能
- 独占锁:每次只能有一个线程持有锁。涉及到的抽象方法是:tryAcquire tryRelease。例如ReentrantLock就是独占互斥锁。
- 共享锁:允许多个线程同时持有锁,并发访问共享资源。涉及到的抽象方法是:tryAcquireShared tryReleaseShared。例如ReentrantReadWriteLock
ReentrantLock的类图
我们仍然以ReentrantLock为例,来分析AQS在重入锁中的使用。毕竟单纯的分析AQS没有太多的意义。先理解这个类图,可以让我们对整体类的结构有一个概念,方面我们理解AQS
以非公平锁为例:
AQS的内部实现
AQS的实现依赖内部的同步队列,也就是FIFO的双向队列。如果当前线程竞争锁失败,那么AQS会将当前线程以及等待状态信息构造成一个Node节点,加入到同步队列的尾部;同时,阻塞该线程。当获取锁的线程释放锁之后,会从队列中唤醒一个阻塞的节点。
AQS内部维护了一个FIFO的双向链表,这种数据结构的特点是每个节点都有两个指针。分别指向直接的后继节点和直接的前驱节点。所以双向链表可以从任何一个节点开始很方便的找到前驱与后继。每个Node其实是对线程的封装,竞争锁失败后会封装为Node并加入到AQS队列中。
释放锁以及添加Node对与队列的变化
添加节点
这里涉及两个变化:
- 新的线程封装成Node节点追加到同步队列中,设置prve节点已经修改当前tail节点的next指向自己(数据结构双向链表)
- 通过CAS将tail重新设置为新的尾部节点
释放锁移除节点
这个过程涉及到两个变化
- 修改head节点指向下一个获取锁的节点
- 新的获取锁的节点,移除prve指针的指向
AQS的源码解析
在搞清楚AQS整体架构以后,我们来分析一下AQS的源码,仍然以ReentrantLock为例。
ReentrantLock整体时序图
我们以ReentrantLock中的lock方法流程为例,源码的调用过程时序图如下:
从上面的流程可以看到,如果获取锁失败后,会调用addWaiter将Node加入到AQS尾部。基于这个思路,我们来分析AQS的源码实现
源码分析
加锁流程
ReentrantLock.lock()
public void lock() { sync.lock(); } |
这个方法是获取锁的入口,调用内部类Sync里面的方法,我们来看下Sync这个类的定义:
abstract static class Sync extends AbstractQueuedSynchronizer |
Sync是一个静态内部类,继承了AbstractQueuedSynchronizer这个抽象类。前面说过AQS是一个同步工具,主要来实现同步控制,我们在这里利用这个工具的时候,会继承他的同步控制功能。
进一步发现,Sync这个类有两个实现类,分别是:NonfairSync(非公平锁)、FairSync(公平锁)。
- 非公平锁表示可以存在抢占锁的行为,也就是说不管当前等待队列中到底有没有其他线程等待,新线程都有机会抢占锁
- 公平锁标识所有线程都按照FIFO先来后到获取锁
因为我们常用的是非公平锁,所以本文我们也以非公平锁作为主要分析逻辑
NonfairSync.lock()
final void lock() { // 通过CAS修改state的值,标识争抢锁 if (compareAndSetState( 0 , 1 )) // 争抢成功,设置当前锁状态的线程 setExclusiveOwnerThread(Thread.currentThread()); else // 非公平锁,再次尝试竞争锁 acquire( 1 ); } |
这段代码的逻辑简单解释一下:
- 调用lock方法时,先通过CAS去抢占锁
- 如果抢占锁成功,保存获取锁成功的当前线程
- 如果抢占失败,调用acquire来走竞争锁逻辑
acquire
acquire是AQS中的方法,如果CAS没有成功,只能说明state已经不是0;此时继续acquire(1)操作,将state置为1,或者重入累加1:
public final void acquire( int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } |
这个方法的主要逻辑:
- 通过tryAcquire尝试获取独占锁,如果成功返回true,结束流程
- 如果tryAcquire失败,则会通过addWaiter方法将当前线程封装为Node添加到AQS队列的队尾
- acqireQueued,将Node作为参数,通过自旋尝试获取锁
NonfairSync.tryAcquire()
这个方法的作用是尝试获取锁
它是重写了AQS的tryAcquire方法,实现中调用了Sync中的nonfairTryAcquire方法
protected final boolean tryAcquire( int acquires) { return nonfairTryAcquire(acquires); } |
nonfairTryAcquire
实现如下:
/** * Performs non-fair tryLock. tryAcquire is implemented in * subclasses, but both need nonfair try for trylock method. */ final boolean nonfairTryAcquire( int acquires) { // 获取当前线程 final Thread current = Thread.currentThread(); int c = getState(); // 获取state的值 // 如果state的值为0,标识当前是无锁状态 if (c == 0 ) { // 使用cas将state的状态置为1,使用cas的原因是该操作是原子性的,避免直接修改state的线程安全问题 if (compareAndSetState( 0 , acquires)) { // cas设置成功,表示该线程顺利拿到锁,保存当前线程信息 setExclusiveOwnerThread(current); return true ; } } // 如果是同一个线程来获取锁,直接增加重入次数 else if (current == getExclusiveOwnerThread()) { // 这里为什么不用cas?同一个线程操作,不需要保证线程安全 int nextc = c + acquires; if (nextc < 0 ) // overflow throw new Error( "Maximum lock count exceeded" ); setState(nextc); return true ; } return false ; } |
addWaiter
当tryAcquire方法获取锁失败后,则会先调用addWaiter将当前线程封装为Node,添加到AQS队列尾
/** * Creates and enqueues node for current thread and given mode. * * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared * @return the new node */ private Node addWaiter(Node mode) { // 将当前线程封装为Node,并且是独占锁模式 Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure // tail是AQS中标识队尾的属性,刚开始为null,此时就需要直接进行enq方法 Node pred = tail; if (pred != null ) { // tail不为空的情况,说明队列中有节点存在 // 将当前节点的prve设置为tail node.prev = pred; // 通过cas将node节点设置为队尾 if (compareAndSetTail(pred, node)) { // 设置成功,将旧的tail节点的next指向新的tail pred.next = node; return node; } } // tail为空或者设置队尾失败,通过自旋方式将当前节点加入到队列中 enq(node); return node; } |
- 将当前线程封装为Node节点
- 判断当前AQS中的tail是否为null,如果不为null,通过cas操作把当前线程的node添加到aqs队列
- 如果tail为null或者cas失败,调用enq自旋将节点添加到AQS中
enq
enq是通过自旋操作将节点加入到队列中
/** * Inserts node into queue, initializing if necessary. See picture above. * @param node the node to insert * @return node‘s predecessor */ private Node enq( final Node node) { // 自旋 for (;;) { // 如果是第一次添加队列,tail为null Node t = tail; if (t == null ) { // Must initialize // CAS创建一个空Node,作为头结点 if (compareAndSetHead( new Node())) // 此时队列中只有一个节点,tail指向他 tail = head; } else { // 第二次循环,tail不为空,进入else逻辑。将当前线程的prve指向tail节点 node.prev = t; // 然后通过cas将tail指向当前节点 if (compareAndSetTail(t, node)) { // 如果cas成功,tail已经指向了当前的node。此时将之前的tail节点的next指向当前node。如果是多个线程,一个线程成功的话,下一个线程继续自旋该流程 t.next = node; return t; } } } } |
假设有两个线程同时进入到enq的流程,一个线程cas成功,另外一个线程失败的话,进入到下一次循环,直到将自己设置成功为止
acquireQueued
将加入队列成功的节点,作为参数传入,在这里面做线程锁抢占操作
/** * Acquires in exclusive uninterruptible mode for thread already in * queue. Used by condition wait methods as well as acquire. * * @param node the node * @param arg the acquire argument * @return {@code true} if interrupted while waiting */ final boolean acquireQueued( final Node node, int arg) { boolean failed = true ; try { boolean interrupted = false ; // 自旋抢占锁,为什么自旋?如果将线程从用户态变成内核态等待,这个过程的成本很高 for (;;) { // 获取当前节点的prve,如果为null,抛出NPE final Node p = node.predecessor(); // 如果prve是head,才有资格抢占锁。这个是入队后的抢锁操作,和入队前不一样。 if (p == head && tryAcquire(arg)) { // 获取锁成功后,不需要做同步操作,直接将head设置为当前的node setHead(node); p.next = null ; // help GC failed = false ; return interrupted; } // 如果获取锁失败,根据节点的waitStatus决定是否需要挂起线程 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) // 如果前面为true,判断线程下次唤醒的时候是否检测中断标志 interrupted = true ; } } finally { if (failed) cancelAcquire(node); } } |
- 获取当前节点的前置prve节点
- 如果prve是head节点,才有资格争抢锁,调用tryAcquired抢占锁
- 抢占锁成功后,把获得锁的节点设置为node,并移除原来的head对应的node节点
- 如果抢占锁失败,根据waitStatus判断线程是否需要挂起
- 最后,如果异常,取消操作
其实前面的逻辑都很好理解,主要看下shouldParkAfterFailedAcquire和parkAndCheckInterrupt的作用
shouldParkAfterFailedAcquire
从上面的分析我们可以看出,非公平锁在入队前会再做一次抢占锁的过程;而队列中,只有第二个节点可以有机会抢占锁,如果成功获取锁,则此节点晋升为头结点。对于第三个及以后的节点,要先进行shouldParkAfterFailedAcquire操作。
shouldParkAfterFailedAcquire方法是判断一个抢占锁的线程,是否应该被阻塞。它先判断一个节点的前置节点是否为SIGNAL,如果是,说明该线程可以安全的阻塞,并在锁释放的时候通知到,返回true
/** * Checks and updates status for a node that failed to acquire. * Returns true if thread should block. This is the main signal * control in all acquire loops. Requires that pred == node.prev. * * @param pred node‘s predecessor holding status * @param node the node * @return {@code true} if thread should block */ private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; // 前继节点的状态 // 如果是SIGNAL状态,意味着当前线程需要被unpark唤醒 if (ws == Node.SIGNAL) /* * This node has already set status asking a release * to signal it, so it can safely park. */ return true ; // 如果前继节点的状态大于0-CANCELLED状态 if (ws > 0 ) { /* * Predecessor was cancelled. Skip over predecessors and * indicate retry. */ // 将CANCELLED的Node从队列中剔除掉 do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0 ); pred.next = node; } else { /* * waitStatus must be 0 or PROPAGATE. Indicate that we * need a signal, but don‘t park yet. Caller will need to * retry to make sure it cannot acquire before parking. */ // 将前继节点的状态设置为SIGNAL compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false ; } |
parkAndCheckInterrupt
如果上一步shouldParkAfterFailedAcquire返回了true,则会执行parkAndCheckInterrupt。它是通过LockSupport的park将线程挂起到WAITING状态,它需要等待一个中断或者unpark方法唤醒。
/** * Convenience method to park and then check if interrupted * * @return {@code true} if interrupted */ private final boolean parkAndCheckInterrupt() { LockSupport.park( this ); return Thread.interrupted(); } |
此时,没有获取到锁的线程在队列里等待获取和竞争锁
锁的释放流程
ReentrantLock.unlock()
加锁流程分析完成后,再来分析一下锁的释放流程。
/** * Attempts to release this lock. * * <p>If the current thread is the holder of this lock then the hold * count is decremented. If the hold count is now zero then the lock * is released. If the current thread is not the holder of this * lock then {@link IllegalMonitorStateException} is thrown. * * @throws IllegalMonitorStateException if the current thread does not * hold this lock */ public void unlock() { sync.release( 1 ); } |
主要是调用Sync的Release方法,在这个方法里主要做两件事:1.释放锁 2.唤醒park的线程
/** * Releases in exclusive mode. Implemented by unblocking one or * more threads if {@link #tryRelease} returns true. * This method can be used to implement method {@link Lock#unlock}. * * @param arg the release argument. This value is conveyed to * {@link #tryRelease} but is otherwise uninterpreted and * can represent anything you like. * @return the value returned from {@link #tryRelease} */ public final boolean release( int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0 ) unparkSuccessor(h); return true ; } return false ; } |
tryRelease
这个动作可以认为就是一个设置锁状态的操作,而且是将状态减掉传入的参数值1,如果结果状态是0,就将排它锁的Owner设置为null,以使得其他线程有机会进行执行。
在排它锁中,加锁的时候状态会+1,解锁的时候会-1。同一个锁,如果是可重入的,state会一直叠加。只有unlock的次数与lock的次数对应,才会将当前锁的owner置为空,而且也只有这个时候会返回true
protected final boolean tryRelease( int releases) { // 将锁的数量-1 int c = getState() - releases; // 如果释放锁的线程和当前获得锁的线程不一致,抛出异常 if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false ; if (c == 0 ) { // 由于重入的关系,不是每次释放锁之后c都为0,直到最后c=0,才会把当前线程释放 free = true ; setExclusiveOwnerThread( null ); } setState(c); return free; } |
unparkSuccessor
在这个方法中,就是真正的释放锁,它传入head节点(head节点就是占用锁的线程节点),当前线程释放锁之后,要唤醒下一个节点的线程
/** * Wakes up node‘s successor, if one exists. * * @param node the node */ private void unparkSuccessor(Node node) { /* * If status is negative (i.e., possibly needing signal) try * to clear in anticipation of signalling. It is OK if this * fails or if status is changed by waiting thread. */ int ws = node.waitStatus; if (ws < 0 ) compareAndSetWaitStatus(node, ws, 0 ); /* * Thread to unpark is held in successor, which is normally * just the next node. But if cancelled or apparently null, * traverse backwards from tail to find the actual * non-cancelled successor. */ Node s = node.next; if (s == null || s.waitStatus > 0 ) { //判断后继节点是否为空,或者是取消状态 s = null ; // 从尾部向前遍历找到最前面一个waitStatus<0的节点 for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0 ) s = t; } if (s != null ) // 释放锁许可 LockSupport.unpark(s.thread); } |
总结
这篇文章主要基于非公平重入锁来分析AQS的实现机制。在获取同步锁时,AQS维护一个同步队列,获得锁失败的线程都会被加入到队列尾并在队列中做自旋;移除队列(或者停止自旋)的条件是前驱节点或者获取了同步状态。在释放同步状态时,唤醒头结点的后继节点。
以上是关于AQS的主要内容,如果未能解决你的问题,请参考以下文章