AQS源码剖析第一篇---全貌概览

Posted 热爱编程的大忽悠

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了AQS源码剖析第一篇---全貌概览相关的知识,希望对你有一定的参考价值。

AQS源码剖析第一篇---全貌概览


引言

AQS全称AbstractQueuedSynchronizer(基于队列实现的抽象同步器),它是 Java 并发包的基础工具类,是实现 ReentrantLock、CountDownLatch、Semaphore、FutureTask 等类的基础。

  • AQS能为我们提供什么?
    • 同步状态的原子性管理。
    • 线程的阻塞和解除阻塞。
    • 提供阻塞线程的存储队列。

基于这三大功能,衍生出下面的附加功能:

  • 通过中断实现的任务取消,此功能基于线程中断实现。
  • 可选的超时设置,也就是调用者可以选择放弃等待任务执行完毕直接返回。
  • 定义了Condition接口,用于支持管程形式的await/signal/signalAll操作,代替了Object类基于JNI提供的wait/notify/notifyAll。

AQS还根据同步状态的不同管理方式区分为两种不同的实现:独占状态的同步器和共享状态的同步器。


AQS 结构

先来看看 AQS 有哪些属性,搞清楚这些基本就知道 AQS 是什么套路了,毕竟可以猜嘛!

// 头结点,你直接把它当做 当前持有锁的线程 可能是最好理解的
private transient volatile Node head;

// 阻塞的尾节点,每个新的节点进来,都插入到最后,也就形成了一个链表
private transient volatile Node tail;

// 这个是最重要的,代表当前锁的状态,0代表没有被占用,大于 0 代表有线程持有当前锁
// 这个值可以大于 1,是因为锁可以重入,每次重入都加上 1
private volatile int state;

// 代表当前持有独占锁的线程,举个最重要的使用例子,因为锁可以重入
// reentrantLock.lock()可以嵌套调用多次,所以每次用这个来判断当前线程是否已经拥有了锁
// if (currentThread == getExclusiveOwnerThread()) state++
private transient Thread exclusiveOwnerThread; //继承自AbstractOwnableSynchronizer

CLH队列

AQS的核心在于他的队列实现,AQS借鉴了CLH的思想,并衍生出符合AQS场景下的变体实现,因此我们需要先来了解一下CLH队列实现思想。

CLH锁底层是基于队列实现,一般也称为CLH队列锁。CLH锁也是一种基于链表的可扩展、高性能、公平的自旋锁,申请线程仅仅在本地变量上自旋,它不断轮询前驱的状态,假设发现前驱释放了锁就结束自旋。从实现上看,CLH锁是一种自旋锁,能确保无饥饿性,提供先来先服务的公平性。先看简单的CLH锁的一个简单实现:

package com.TheadTest;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;

public class CLHLock implements Lock 

    /**
     * 队列尾部节点
     */
    AtomicReference<QueueNode> tail = new AtomicReference<>(new QueueNode());

    /**
     * 前驱节点
     */
    ThreadLocal<QueueNode> pred;

    /**
     * 当前线程对应的节点
     */
    ThreadLocal<QueueNode> current;

    /**
     * 初始化头结点,头结点的前驱节点为空
     */
    public CLHLock() 
        current = ThreadLocal.withInitial(QueueNode::new);
        pred = ThreadLocal.withInitial(() -> null);
    

    /**
     * 1.拿到当前线程对应的节点,并上锁 <p>
     * 2.设置尾结点为当前节点,并返回旧的尾结点 <p>
     * 3.旧的尾结点,作为当前节点的前驱节点 <p>
     * 4.不断轮询,直到前驱节点释放了锁 <p>
     *
     * ps: 没获取到锁,就一直阻塞,直到前驱节点释放了锁
     */
    @Override
    public void lock() 
        QueueNode node = current.get();
        node.locked = true;
        QueueNode pred = tail.getAndSet(node);
        this.pred.set(pred);
        while (pred.locked) 
        
    

    /**
     * 1.获取当前线程对应的节点 <p>
     * 2.当前节点解锁 <p>
     * 3.设置当前节点重新指向它的前驱节点 <p>
     */
    @Override
    public void unlock() 
        QueueNode node = current.get();
        node.locked = false;
        current.set(this.pred.get());
    


    static class QueueNode 
        //确保线程可见性  
        volatile boolean locked;
    

    // 忽略其他接口方法的实现

    @Override
    public Condition newCondition() 
        return null;
    

    @Override
    public void lockInterruptibly() throws InterruptedException 

    

    @Override
    public boolean tryLock() 
        return false;
    

    @Override
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException 
        return false;
    

上面是一个简单的CLH队列锁的实现,内部类QueueNode只使用了一个简单的布尔值locked属性记录了每个线程的状态,如果该属性为true,则相应的线程要么已经获取到锁,要么正在等待锁,如果该属性为false,则相应的线程已经释放了锁。新来的想要获取锁的线程必须对tail属性调用getAndSet()方法,使得自身成为队列的尾部,同时得到一个指向前驱节点的引用pred,最后线程所在节点在其前驱节点的locked属性上自旋,直到前驱节点释放锁。上面的实现是无法运行的,因为一旦自旋就会进入死循环导致CPU飙升,可以尝试使用下文将要提到的LockSupport进行改造。

CLH队列锁本质是使用队列(实际上是单向链表)存放等待获取锁的线程,等待的线程总是在其所在节点的前驱节点的状态上自旋,直到前驱节点释放资源。从实际来看,过度自旋带来的CPU性能损耗比较大,并不是理想的线程等待队列的实现。


基于原始的CLH队列锁中提供的等待队列的基本原理,AQS实现一种了CLH锁队列的变体(Variant)。AQS类的protected修饰的构造函数里面有一大段注释用于说明AQS实现的等待队列的细节事项,这里列举几点重要的:

  • AQS实现的等待队列没有直接使用CLH锁队列,但是参考了其设计思路,等待节点会保存前驱节点中线程的信息,内部也会维护一个控制线程阻塞的状态值。
  • 每个节点都设计为一个持有单独的等待线程并且”带有具体的通知方式”的监视器,这里所谓通知方式就是自定义唤醒阻塞线程的方式而已。
  • 一个线程是等待队列中的第一个等待节点的持有线程会尝试获取锁,但是并不意味着它一定能够获取锁成功(这里的意思是存在公平和非公平的实现),获取失败就要重新等待。
  • 等待队列中的节点通过prev属性连接前驱节点,通过next属性连接后继节点,简单来说,就是双向链表的设计。
  • CLH队列本应该需要一个虚拟的头节点,但是在AQS中没有直接提供虚拟的头节点,而是延迟到第一次竞争出现的时候懒创建虚拟的头节点(其实也会创建尾节点,初始化时头尾节点是同一个节点)。
  • Condition(条件)等待队列中的阻塞线程使用的是相同的Node结构,但是提供了另一个链表用来存放,Condition等待队列的实现比非Condition等待队列复杂。

线程阻塞与唤醒

线程的阻塞和唤醒在JDK1.5之前,一般只能依赖于Object类提供的wait()、notify()和notifyAll()方法,它们都是JNI方法,由JVM提供实现,并且它们必须运行在获取监视器锁的代码块内(synchronized代码块中),这个局限性先不谈性能上的问题,代码的简洁性和灵活性是比较低的。JDK1.5引入了LockSupport类,底层是基于Unsafe类的park()和unpark()方法,提供了线程阻塞和唤醒的功能,它的机制有点像只有一个允许使用资源的信号量java.util.concurrent.Semaphore,也就是一个线程只能通过park()方法阻塞一次,只能调用unpark()方法解除调用阻塞一次,线程就会唤醒(多次调用unpark()方法也只会唤醒一次),可以想象是内部维护了一个0-1的计数器。

LockSupport类如果使用得好,可以提供更灵活的编码方式,这里举个简单的使用例子:

package com.TheadTest;

import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.concurrent.locks.LockSupport;

public class LockSupportMain implements Runnable 

    private static final DateTimeFormatter FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS");

    private Thread thread;

    private void setThread(Thread thread) 
        this.thread = thread;
    

    public static void main(String[] args) throws Exception 
        LockSupportMain main = new LockSupportMain();
        Thread thread = new Thread(main, "LockSupportMain");
        main.setThread(thread);
        thread.start();
        Thread.sleep(2000);
        main.unpark();
        Thread.sleep(2000);
    

    @Override
    public void run() 
        System.out.println(String.format("%s-步入run方法,线程名称:%s", FORMATTER.format(LocalDateTime.now()),
                Thread.currentThread().getName()));
        LockSupport.park();
        System.out.println(String.format("%s-解除阻塞,线程继续执行,线程名称:%s", FORMATTER.format(LocalDateTime.now()),
                Thread.currentThread().getName()));
    

    private void unpark() 
        LockSupport.unpark(thread);
    

LockSupport类park()方法也有带超时的变体版本方法,遇到带超时期限阻塞等待场景下不妨可以使用LockSupport#parkNanos()。


独占线程的保存

AbstractOwnableSynchronizer是AQS的父类,一个同步器框架有可能在一个时刻被某一个线程独占,AbstractOwnableSynchronizer就是为所有的同步器实现和锁相关实现提供了基础的保存、获取和设置独占线程的功能,这个类的源码很简单:

public abstract class AbstractOwnableSynchronizer
    implements java.io.Serializable 

    private static final long serialVersionUID = 3737899427754241961L;

    protected AbstractOwnableSynchronizer()  
    
    // 当前独占线程的瞬时实例 - 提供Getter和Setter方法
    private transient Thread exclusiveOwnerThread;

    protected final void setExclusiveOwnerThread(Thread thread) 
        exclusiveOwnerThread = thread;
    

    protected final Thread getExclusiveOwnerThread() 
        return exclusiveOwnerThread;
    

它就提供了一个保存独占线程的变量对应的Setter和Getter方法,方法都是final修饰的,子类只能使用不能覆盖。


AQS提供的CLH变体实现

这里先重点分析一下AQS中等待队列的节点AQS的静态内部类Node的源码:

static final class Node 
   // 标记一个节点处于共享模式下的等待
   static final Node SHARED = new Node();
   // 标记一个节点处于独占模式下的等待
   static final Node EXCLUSIVE = null;
   // 取消状态
   static final int CANCELLED =  1;
   // 唤醒状态
   static final int SIGNAL    = -1;
   // 条件等待状态
   static final int CONDITION = -2;
   // 传播状态
   static final int PROPAGATE = -3;
   // 等待状态,初始值为0,其他可选值是上面的4个值
   volatile int waitStatus;
   // 当前节点前驱节点的引用
   volatile Node prev;
   // 当前节点后继节点的引用
   volatile Node next;
   // 当前节点持有的线程,可能是阻塞中等待唤醒的线程
   volatile Thread thread;
   // 下一个等待节点
   Node nextWaiter;
   // 当前操作的节点是否处于共享模式
   final boolean isShared() 
      return nextWaiter == SHARED;
   
   // 获取当前节点的前驱节点,确保前驱节点必须存在,否则抛出NPE  
   final Node predecessor() 
        Node p = prev;
        if (p == null)
            throw new NullPointerException();
        else
            return p;
    
    
    // 空节点,主要是首次创建队列的时候创建的头和尾节点使用
    Node() 

    // 设置下一个等待节点,设置持有线程为当前线程
    Node(Node nextWaiter) 
        this.nextWaiter = nextWaiter;
        THREAD.set(this, Thread.currentThread());
    

    // 设置waitStatus,设置持有线程为当前线程
    Node(int waitStatus) 
        WAITSTATUS.set(this, waitStatus);
        THREAD.set(this, Thread.currentThread());
    

    // CAS更新waitStatus  
    final boolean compareAndSetWaitStatus(int expect, int update) 
        return WAITSTATUS.compareAndSet(this, expect, update);
    
    // CAS设置后继节点
    final boolean compareAndSetNext(Node expect, Node update) 
        return NEXT.compareAndSet(this, expect, update);
    
    // 设置前驱节点
    final void setPrevRelaxed(Node p) 
        PREV.set(this, p);
    

    // 下面是变量句柄的实现,在VarHandle出现之前使用的是Unsafe,其实底层还是照样使用Unsafe
    private static final VarHandle NEXT;
    private static final VarHandle PREV;
    private static final VarHandle THREAD;
    private static final VarHandle WAITSTATUS;
    static 
        try 
            MethodHandles.Lookup l = MethodHandles.lookup();
            NEXT = l.findVarHandle(Node.class, "next", Node.class);
            PREV = l.findVarHandle(Node.class, "prev", Node.class);
            THREAD = l.findVarHandle(Node.class, "thread", Thread.class);
            WAITSTATUS = l.findVarHandle(Node.class, "waitStatus", int.class);
         catch (ReflectiveOperationException e) 
            throw new ExceptionInInitializerError(e);
        
    	  
	

其中,变量句柄(VarHandle)是JDK9引入的新特性,其实底层依赖的还是Unsafe的方法,笔者认为可以简单理解它为Unsafe的门面类,而定义的方法基本都是面向变量属性的操作。这里需要关注一下Node里面的几个属性:

waitStatus:当前Node实例的等待状态,可选值有5个。

  • 初始值整数0:当前节点如果不指定初始化状态值,默认值就是0,侧面说明节点正在等待队列中处于等待状态,一般是队尾节点。
  • Node#CANCELLED整数值1:表示当前节点实例因为超时或者线程中断而被取消,等待中的节点永远不会处于此状态,被取消的节点中的线程实例不会阻塞。
  • Node#SIGNAL整数值-1:表示当前节点的后继节点是(或即将是)阻塞的(通过LockSupport#park()),当它释放或取消时,当前节点必须LockSupport#unpark()它的后继节点。
  • Node#CONDITION整数值-2:表示当前节点是条件队列中的一个节点,当它转换为同步队列中的节点的时候,状态会被重新设置为0。
  • Node#PROPAGATE整数值-3:此状态值通常只设置到调用了doReleaseShared()方法的头节点,确保releaseShared()方法的调用可以传播到其他的所有节点,简单理解就是共享模式下节点释放的传递标记。

prev、next:当前Node实例的前驱节点引用和后继节点引用。

thread:当前Node实例持有的线程实例引用。

nextWaiter:这个值是一个比较容易令人生疑的值,虽然表面上它称为”下一个等待的节点”,但是实际上它有三种取值的情况。

  • 值为静态实例Node.EXCLUSIVE(也就是null),代表当前的Node实例是独占模式。
  • 值为静态实例Node.SHARED,代表当前的Node实例是共享模式。
  • 值为非Node.EXCLUSIVE和Node.SHARED的其他节点实例,代表Condition等待队列中当前节点的下一个等待节点。

AbstractQueuedSynchronizer 的等待队列示意如下所示,注意了,之后分析过程中所说的 queue,也就是阻塞队列不包含 head,不包含 head,不包含 head。


源码流程追踪

我们从ReentrantLock的公平锁源码实现开始讲起,首先,我们先看下 ReentrantLock 的使用方式。

// 我用个web开发中的service概念吧
public class OrderService 
    // 使用static,这样每个线程拿到的是同一把锁,当然,spring mvc中service默认就是单例,别纠结这个
    private static ReentrantLock reentrantLock = new ReentrantLock(true);

    public void createOrder() 
        // 比如我们同一时间,只允许一个线程创建订单
        reentrantLock.lock();
        // 通常,lock 之后紧跟着 try 语句
        try 
            // 这块代码同一时间只能有一个线程进来(获取到锁的线程),
            // 其他的线程在lock()方法上阻塞,等待获取到锁,再进来
            // 执行代码...
            // 执行代码...
            // 执行代码...
         finally 
            // 释放锁
            reentrantLock.unlock();
        
    

ReentrantLock 在内部用了内部类 Sync 来管理锁,所以真正的获取锁和释放锁是由 Sync 的实现类来控制的。

abstract static class Sync extends AbstractQueuedSynchronizer 

Sync 有两个实现,分别为 NonfairSync(非公平锁)和 FairSync(公平锁),我们看 FairSync 部分。

public ReentrantLock(boolean fair) 
    sync = fair ? new FairSync() : new NonfairSync();


公平锁acquire源码实现

static final class FairSync extends Sync 
    private static final long serialVersionUID = -3000897897090466540L;
      // 争锁
    final void lock() 
        acquire(1);
    
      // 来自父类AQS,我直接贴过来这边,下面分析的时候同样会这样做,不会给读者带来阅读压力
    // 我们看到,这个方法,如果tryAcquire(arg) 返回true, 也就结束了。
    // 否则,acquireQueued方法会将线程压到队列中
    public final void acquire(int arg)  // 此时 arg == 1
        // 首先调用tryAcquire(1)一下,名字上就知道,这个只是试一试
        // 因为有可能直接就成功了呢,也就不需要进队列排队了,
        // 对于公平锁的语义就是:本来就没人持有锁,根本没必要进队列等待(又是挂起,又是等待被唤醒的)
        if (!tryAcquire(arg) &&
            // tryAcquire(arg)没有成功,这个时候需要把当前线程挂起,放到阻塞队列中。
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) 
              selfInterrupt();
        
    

    /**
     * Fair version of tryAcquire.  Don't grant access unless
     * recursive call or no waiters or is first.
     */
    // 尝试直接获取锁,返回值是boolean,代表是否获取到锁
    // 返回true:1.没有线程在等待锁;2.重入锁,线程本来就持有锁,也就可以理所当然可以直接获取
    protected final boolean tryAcquire(int acquires) 
        final Thread current = Thread.currentThread();
        int c = getState();
        // state == 0 此时此刻没有线程持有锁
        if (c == 0) 
            // 虽然此时此刻锁是可以用的,但是这是公平锁,既然是公平,就得讲究先来后到,
            // 看看有没有别人在队列中等了半天了
            if (!hasQueuedPredecessors() &&
                // 如果没有线程在等待,那就用CAS尝试一下,成功了就获取到锁了,
                // 不成功的话,只能说明一个问题,就在刚刚几乎同一时刻有个线程抢先了 =_=
                // 因为刚刚还没人的,我判断过了
                compareAndSetState(0, acquires)) 

                // 到这里就是获取到锁了,标记一下,告诉大家,现在是我占用了锁
                setExclusiveOwnerThread(current);
                return true;
            
        
          // 会进入这个else if分支,说明是重入了,需要操作:state=state+1
        // 这里不存在并发问题
        else if (current == getExclusiveOwnerThread()) 《并发系列一》AbstractQueuedSynchronizer(AQS)- 互斥锁源码剖析

JDK源码那些事儿之传说中的AQS-概览

全网最详细的AbstractQueuedSynchronizer(AQS)源码剖析资源的获取和释放

AbstractQueuedSynchronizer AQS框架源码剖析

通俗易懂的JUC源码剖析-ReentrantLock&AQS

源码剖析:AQS-AbstractQueuedSynchronizer