一文就懂AQS!

Posted 杨 戬

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了一文就懂AQS!相关的知识,希望对你有一定的参考价值。

文章目录

看了很多帖子,原理说啥的都有,算了还是自己整理吧,既然没个统一,那我就自己整理一下🤣🤣🤣,刚升博客专家,我说的就对 🧐🧐🧐

AQS介绍

AQS概念

AbstractQueuedSynchronized(AQS),是抽象的队列式的同步器,AQS定义了一套多线程访问共享资源的同步器框架**,许多同步类实现都依赖于它,例如常用的ReentrantLock/Semaphore/CountDownLatch等

AQS是一个用来构建锁和其他同步组件的基础框架,使用AQS可以简单且高效地构造出应用广泛的同步器,它提供了一个FIFO队列(先进先出队列),可以看成是一个用来实现同步锁以及其他涉及到同步功能的核心组件。

模型整体的工作流程如下所示:

AQS模式分类

AQS支持独占锁(exclusive)和共享锁(share)两种模式。

  • 独占锁:只能被一个线程获取到(例如:Reentrantlock)

    这其中又可分为公平锁和非公平锁:

    • 公平锁:按照线程在队列中的排队顺序,先到者先拿到锁
    • 非公平锁:当线程要获取锁时,无视队列顺序直接去抢锁,谁抢到就是谁的
  • 共享锁:可以被多个线程同时获取(例如:CountDownLatch,ReadWriteLock等)

但是无论是独占锁还是共享锁,本质上都是对AQS类内部的一个变量state的获取。

这里的 state是一个原子的int变量,用来表示锁状态、资源数等。

变量state源码示例如下(不急,后面会讲到):

不同的自定义同步器争用共享资源的方式也不同。自定义同步器在实现时只需要实现共享资源state的获取与释放方式即可,至于具体线程等待队列的维护(如获取资源失败入队/唤醒出队等),AQS已经在顶层实现好了。

AQS核心思想

AQS的核心思想也很简单:

  • 如果被请求的共享资源空闲,则将当前请求资源的线程设置为有效的工作线程,并且将共享资源设置为锁定状态

  • 如果被请求的共享资源被占用,那么就使用一套线程阻塞等待以及被唤醒时锁分配的机制

这个机制AQS是用CLH同步队列锁实现的,即将暂时获取不到锁的线程加入到等待队列中。

AQS源码结构

CLH同步队列

我们的 AQS底层的数据结构采用CLH队列,AQS依赖它来完成同步状态的管理,CLH队列是一个FIFO双向队列,即不存在队列的实例,仅存在节点之间的关联关系

原理如下:AQS是将每条请求共享资源的线程封装成一个CLH锁队列的一个结点(Node)来实现锁的分配。当共享资源被某个线程占有(即当前线程获取同步状态失败),AQS则会将当前线程已经等待状态等信息构造成一个节点(Node)并将其加入到CLH同步队列,同时会阻塞当前线程,当同步状态释放时,会把首节点唤醒(公平锁),使其再次尝试获取同步状态。

如下结构图就是一个CLH同步队列:

在CLH同步队列中,一个节点表示一个线程,它保存着线程的引用(thread)、状态(waitStatus)、前驱节点(prev)、后继节点(next)

而在源码的注释中,也能看到这样的结构介绍:

翻译如下:

/**
 * 等待队列节点 class.
 *
* <p>等待队列是“CLH”的变体(Craig、Landin和
* Hagersten)锁定队列。CLH锁通常用于
* 自旋锁。
* ...........
* <p>要排队进入CLH锁,您可以将其作为新的
* 尾部。要退出队列,只需设置head字段。
 * <pre>
 *      +------+  prev +-----+       +-----+
 * head |      | <---- |     | <---- |     |  tail
 *      +------+       +-----+       +-----+
 * </pre>
 * ..............

结点Node源码如下:

static final class Node 
    /** 共享 */
    static final Node SHARED = new Node();

    /** 独占 */
    static final Node EXCLUSIVE = null;

    /**
     * 因为超时或者中断,节点会被设置为取消状态,被取消的节点时不会参与到竞争中的,他会一直保持取消状态不会转变为其他状态;
     */
    static final int CANCELLED =  1;

    /**
     * 后继节点的线程处于等待状态,而当前节点的线程如果释放了同步状态或者被取消,将会通知后继节点,使后继节点的线程得以运行
     */
    static final int SIGNAL    = -1;

    /**
     * 节点在等待队列中,节点线程等待在Condition上,当其他线程对Condition调用了signal()后,改节点将会从等待队列中转移到同步队列中,加入到同步状态的获取中
     */
    static final int CONDITION = -2;

    /**
     * 表示下一次共享式同步状态获取将会无条件地传播下去
     */
    static final int PROPAGATE = -3;

    /** 等待状态:
         * 0 INITAIL: 初始状态
         * 1 CANCELLED: 由于等待超时或者被中断,需要从同步队列中取消等待,节点进入该状态不会被改变
         * -1 SIGNAL: 当前节点释放同步状态或被取消,则等待状态的后继节点被通知
         * -2 CONDITION: 节点在等待队列中,线程在Condition上,需要其它线程调用Condition的signal()方法才能从等待队转移到同步队列
         * -3 PROPAGATE: 表示下一个共享式同步状态将会无条件被传播下去
         */
    volatile int waitStatus;

    /** 前驱节点 */
    volatile Node prev;

    /** 后继节点 */
    volatile Node next;

    /** 获取同步状态的线程 */
    volatile Thread thread;

    Node nextWaiter;

    final boolean isShared() 
        return nextWaiter == SHARED;
    

    final Node predecessor() throws NullPointerException 
        Node p = prev;
        if (p == null)
            throw new NullPointerException();
        else
            return p;
    

    Node() 
    

    Node(Thread thread, Node mode) 
        this.nextWaiter = mode;
        this.thread = thread;
    

    Node(Thread thread, int waitStatus) 
        this.waitStatus = waitStatus;
        this.thread = thread;
    

其中首先节点的类型是AQS的静态内部类Node,Node节点的状态有如下四种,AQS中关于Node状态的源码如下

//当前节点由于超时或中断被取消
static final int CANCELLED =  1;

//表示当前节点的前节点被阻塞
static final int SIGNAL    = -1;

//当前节点在等待condition
static final int CONDITION = -2;

//状态需要向后传播
static final int PROPAGATE = -3;
  • CANCELLED = 1:表示当前节点从同步队列中取消,即当前线程被取消

  • SIGNAL = -1:表示后继节点的线程处于等待状态,如果当前节点释放同步状态会通知后继节点,使得后继节点的线程能够运行

  • CONDITION = -2:表示当前节点在等待condition,也就是在condition queue中

  • PROPAGATE = -3:表示下一次共享式同步状态获取将会无条件传播下去

state同步状态

AQS中的共享资源是使用一个int成员变量来表示同步状态,通过内置的FIFO队列来完成获取资源线程的排队工作。

AQS使用CAS对该同步状态进行原子操作实现对其值的修改。状态信息通过procted类型的getState,setState,compareAndSetState进行操作。

AQS中同步状态获取的源码如下:

//同步状态,使用volatile来保证其可见性
private volatile int state;
//获取同步状态
protected final int getState() 
	return state;

//设置同步状态
protected final void setState(int newState) 
	state = newState;


//原子性地修改同步状态
protected final boolean compareAndSetState(int expect, int update) 
    // See below for intrinsics setup to support this
    return unsafe.compareAndSwapInt(this, stateOffset, expect, update);

因为 AQS 也只是一个抽象类,这个state在他的子类中,可以用于表示任意状态,例如:

  • ReentrantLock用它来表示锁的持有者线程已经重复获取该锁的次数,而对于非锁的持有者线程来说,如果state大于0,意味着无法获取该锁,将该线程包装为Node,加入到同步等待队列里。
  • Semaphore用它来表示剩余的许可数量,当许可数量为0时,对未获取到许可但正在努力尝试获取许可的线程来说,会进入同步等待队列,阻塞,直到一些线程释放掉持有的许可(state+1),然后争用释放掉的许可。
  • FutureTask用它来表示任务的状态(未开始、运行中、完成、取消)。
  • ReentrantReadWriteLock在使用时,稍微有些不同,int型state用二进制表示是32位,前16位(高位)表示为读锁,后面的16位(低位)表示为写锁。
  • CountDownLatch使用state表示计数次数,state大于0,表示需要加入到同步等待队列并阻塞,直到state等于0,才会逐一唤醒等待队列里的线程。

在AQS类中没有同步器方法的具体实现,所以我们来具体看其子类的源码的实现原理

  • 独占式同步状态获取与释放
  • 共享式同步状态获取与释放

独占式同步状态获取与释放

首先同步状态的获取需要通过调用同步器acquire(int arg)方法可以获取同步状态,该方法中断不敏感,也就是由于线程获取同步状态失败后进入同步队列中,后序线程对进行中断操作时,线程不会从同步队列中移出

public final void acquire(int arg) 
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();

这个同步状态获取主要的流程步骤如下:

1)首先调用自定义同步器实现tryAcquire(int arg)方法,该方法保证线程安全的获取同步状态

2)如果获取失败则构造同步节点(独占式Node.EXCLUSIVE)并通过addWaiter(Node ndoe)方法将该节点加入到同步队列的尾部,同时调用enq(node)通过for(;;)循环保证安全设置尾节点。

CLH队列入列也很简单,和数据结构一样就是tail指向新节点、新节点的prev指向当前最后的节点,当前最后一个节点的next指向当前节点。

如下是addWaiter(Node node)方法的源码:

private Node addWaiter(Node mode) 
    //新建Node
    Node node = new Node(Thread.currentThread(), mode);
    //快速尝试添加尾节点
    Node pred = tail;
    if (pred != null) 
        node.prev = pred;
        //CAS设置尾节点
        if (compareAndSetTail(pred, node)) 
            pred.next = node;
            return node;
        
    
    //多次尝试
    enq(node);
    return node;

addWaiter(Node node)先通过快速尝试设置尾节点,如果失败,则调用enq(Node node)方法设置尾节点

private Node enq(final Node node) 
    //多次尝试,直到成功为止
    for (;;) 
        Node t = tail;
        //tail不存在,设置为首节点
        if (t == null) 
            if (compareAndSetHead(new Node()))
                tail = head;
         else 
            //设置为尾节点
            node.prev = t;
            if (compareAndSetTail(t, node)) 
                t.next = node;
                return t;
            
        
    

在上面代码中,两个方法都是通过一个CAS方法compareAndSetTail(Node expect, Node update)来设置尾节点,该方法可以确保节点是线程安全添加的。

在enq(Node node)方法中,AQS通过“死循环(自旋,死循环是一种表象的说法)”的方式来保证节点可以正确添加,只有成功添加后,当前线程才会从该方法返回,否则会一直执行下去。

过程图如下:

后面黄色的Node是我们新加的结点,获取同步失败后被加到队尾

3)节点进入同步队列之后“自旋”,即acquireQueued(final Node node, int arg)方法,在这个方法中,当前node死循环尝试获取锁状态,但是只有node的前驱结点是Head才能尝试获取同步状态,获取成功之后立即设置当前节点为Head,并成功返回。否则就会一直自旋。

源码如下:

final boolean acquireQueued(final Node node, int arg) 
    boolean failed = true;
    try 
        boolean interrupted = false;
        for (;;) 
            final Node p = node.predecessor();
            // 当前node节点的前驱是Head时(p == head),才能有资格去尝试获取同步状态(tryAcquire(arg))
            // 这是因为当前节点的前驱结点获得同步状态,才能唤醒后继节点,即当前节点
            if (p == head && tryAcquire(arg))  // 以上条件满足之后
                setHead(node); // 设置当前节点为Head
                p.next = null; // help GC // 释放ndoe的前驱节点
                failed = false;
                return interrupted;
            
            // 线程被中断或者前驱结点被释放,则继续进入检查:p == head && tryAcquire(arg
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        
     finally 
        if (failed)
            cancelAcquire(node);
    

文字总结:

1)同步器会维护一个双向FIFO队列,获取同步失败的线程将会被构造成Node加入队尾(并且做自旋检查:检查前驱结点是否是Head);

2)当前线程想要获得同步状态,前提是其前驱结点是头结点,并且获得了同步状态;

3)当Head调用release(int arg)释放锁的同时会唤醒后继节点(即当前节点),后继节点结束自旋

出队列过程:

CLH同步队列遵循FIFO,首节点的线程释放同步状态后,将会唤醒它的后继节点(next),而后继节点将会在获取同步状态成功时将自己设置为首节点,这个过程非常简单,head执行该节点并断开原首节点的next和当前节点的prev即可,注意在这个过程是不需要使用CAS来保证的,因为只有一个线程能够成功获取到同步状态。

同步器的release方法:释放锁的同时,唤醒后继节点(进而时后继节点重新获取同步状态)

public final boolean release(int arg) 
   if (tryRelease(arg)) 
       Node h = head;
       if (h != null && h.waitStatus != 0)
           // 该方法会唤醒Head节点的后继节点,使其重试尝试获取同步状态
           unparkSuccessor(h);
       return true;
   
   return false;

UnparkSuccessor(Node node)方法使用LookSupport(LockSupport.unpark)唤醒处于等待状态的线程。

过程图如下:

流程图总结:

共享式同步状态获取与释放

共享锁跟独占式锁最大的不同就是:某一时刻有多个线程同时获取到同步状态,获取判断是否获取同步状态成功的关键,获取到的同步状态要大于等于0。而其他步骤基本都是一致的,还是从源码开始分析起:带后缀Share都为共享式同步方法。

1)acquireShared(int arg)获取同步状态:如果获取失败则加入队尾,并且检查是否具备退出自旋的条件(前驱结点是头结点并且能成功获取同步状态)

public final void acquireShared(int arg) 
    // tryAcquireShared 获取同步状态,大于等于0才是获取状态成功,否则就是失败
    if (tryAcquireShared(arg) < 0)
        // 获取状态失败则构造共享Node,加入队列;
        // 并且检查是否具备退出自旋的条件:即preNode为head,并且能获取到同步状态
        doAcquireShared(arg);

2)doAcquireShared(arg):获取失败的Node加入队列,如果当前节点的前驱结点是头结点的话,尝试获取同步状态,如果大于等于0则在for(;;)中退出(退出自旋)。

private void doAcquireShared(int arg) 
	// 构造共享模式的Node
	final Node node = addWaiter(Node.SHARED);
	boolean failed = true;
	try 
	    boolean interrupted = false;
	    for (;;) 
	        final Node p = node.predecessor();
	        if (p == head) 
	            int r = tryAcquireShared(arg);
	            // 前驱节点是头结点,并且能获取状态成功,则return返回,退出死循环(自旋)
	            if (r >= 0) 
	                setHeadAndPropagate(node, r);
	                p.next = null; // help GC
	                if (interrupted)
	                    selfInterrupt();
	                failed = false;
	                return;
	            
	        
	        if (shouldParkAfterFailedAcquire(p, node) &&
	            parkAndCheckInterrupt())
	            interrupted = true;
	    
	 finally 
	    if (failed)
	        cancelAcquire(node);
	

3)releaseShared(int arg):释放同步状态,通过loop+CAS方式释放多个线程的同步状态。

public final boolean releaseShared(int arg) 
    if (tryReleaseShared(arg)) 
        // 通过loop+CAS方式释放多个线程的同步状态
        doReleaseShared();
        return true;
    
    return false;

以上是关于一文就懂AQS!的主要内容,如果未能解决你的问题,请参考以下文章

Java注解-一文就懂

一文看懂JUC之AQS机制

一文你就懂C程序内存布局

CAS和AQS一文搞懂

一文就懂HashMap原理!学不会你来砍我!

一文就懂HashMap原理!学不会你来砍我!