AQS(AbstractQueuedSynchronizer)——源码分析

Posted 李子捌

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了AQS(AbstractQueuedSynchronizer)——源码分析相关的知识,希望对你有一定的参考价值。

有经典,有干货,微信搜索【李子捌】关注这个每日更新的程序员


简介

AbstractQueuedSynchronizer(队列同步器),是用来构建锁或者其他同步组件的基础框架,它通过使用一个int类型的变量来表示同步状态的同时内置FIFO队列来完成资源获取线程的排队工作,AbstractQueuedSynchronizer是大部分同步需求实现的基础。

1、Lock接口

在学习AbstractQueuedSynchronizer之前,先了解一下Lock接口。了解Lock接口之前呢,先了解一下什么是锁?锁是用来控制多个线程访问共享资源的方式,一个互斥锁能够防止多个线程同时访问共享资源。在Lock接口出现之前,我们都知道Java程序员是通过synchronized关键字来实现锁功能的,而Java SE 5之后,并发包中新增Lock接口及其相关实现用来实现所得功能,Lock接口及其实现提供了与synchronized类似的同步功能,相比之下有如下不同点。

  1. Lock需要显示的获取锁和释放锁,虽然便捷性低,但是具有更强的可操作性,synchronized可以隐式的获取锁,但是其获取锁的方式是固化的,也就是先获取再释放
  2. Lock具有可中断性、超时获取锁能多种synchronized不具备的功能

1.1 Lock接口提供的synchronized关键字不具备的主要特性

特性描述
尝试非阻塞的获取锁当前线程尝试获取锁,如果这一时刻锁没有被其他线程获取到,则成功获取并持有锁
能被中断的获取锁lock锁持有的线程能够响应中断,当线程被中断时,抛出中断异常,释放锁
超时获取锁在指定时间内获取锁,超过指定时间未获取到锁,返回

1.2 使用示例

Lock lock = new ReentrantLock();
lock.lock();
try {
    // Todo...
} finally {
	lock.unlock();
}

如上代码需要注意的一点是,锁的释放一定要在finally块中释放锁,目的是保证在获取到锁之后,最终一定会被释放。不要将锁的获取写在try代码块中,因为如果锁获取的时候发生了异常,那么此时是不应该执行释放锁的代码块的。

1.3 Lock接口主要API(获取锁/释放锁)

public interface Lock {

    /**
     * 获取锁;调用该方法的线程将会获取锁,当获取到锁后,从该方法返回
     */
    void lock();

    /**
     * 可以中断的获取锁,和Lock方法相比不同之处在于该方法可以响应中断,即在锁获取的过程中可以中断当前线程
     *
     * @throws InterruptedException
     */
    void lockInterruptibly() throws InterruptedException;

    /**
     * 尝试非阻塞的获取锁,调用该方法后立即返回,如果能够获取则返回true,否则返回false
     *
     * @return
     */
    boolean tryLock();

    /**
     * 超时获取锁,如下情况会返回:
     * 1、当前线程在指定超时时间内获取到锁
     * 2、当前线程在超时时间内被中断
     * 3、超时时间结束,返回false
     *
     * @param time
     * @param unit
     * @return
     * @throws InterruptedException
     */
    boolean tryLock(long time, TimeUnit unit) throws InterruptedException;

    /**
     * 释放锁
     */
    void unlock();

    /**
     * 获取等待通知组件,该组件和当前的锁绑定,当前线程只有获取了锁,才能调用该组件的wait()方法,而调用后,当前线程会释放锁
     *
     * @return
     */
    Condition newCondition();
}
方法名称描述
void lock()​获取锁;调用该方法的线程将会获取锁,当获取到锁后,从该方法返回
void lockInterruptibly() throws InterruptedException;可以中断的获取锁,和Lock方法相比不同之处在于该方法可以响应中断,即在锁获取的过程中可以中断当前线程
boolean tryLock();尝试非阻塞的获取锁,调用该方法后立即返回,如果能够获取则返回true,否则返回false
boolean tryLock(long time, TimeUnit unit) throws InterruptedException;超时获取锁,如下情况会返回:1、当前线程在指定超时时间内获取到锁2、当前线程在超时时间内被中断3、超时时间结束,返回false
void unlock();释放锁
Condition newCondition();获取等待通知组件,该组件和当前的锁绑定,当前线程只有获取了锁,才能调用该组件的wait()方法,而调用后,当前线程会释放锁

2、AbstractQueuedSynchronizer

AbstractQueuedSynchronizer(队列同步器)是实现锁的关键,在锁的实现中聚合同步器,利用同步器实现锁的语义,java.util.concurrent中许多可阻塞类,例如ReentrantLock、Semaohore、ReentrantReadWriteLock、CountDownLatch、SynchronuosQueue和FutureTask等,都是基于AQS构建的。这两者之间的关系如下:

  1. 锁是面向使用者的,它定义了使用者与锁交互的接口,隐藏了实现细节
  2. 同步器是面向锁的实现者,简化了锁的实现方式,屏蔽了同步状态管理、线程的排队、等待与唤醒的底层操作

2.1 AbstractQueuedSynchronizer的接口与示例

同步器的设计基于模板方法,使用者需要继承同步器并重写指定的方法,然后将同步器组合在自定义的同步组件的实现中,并调用同步器的提供的模板方法,而模板方法将会调用使用者(子类)的重写方法。
重写同步器指定的方法时,需要使用同步器提供的如下3个方法来访问或修改同步状态

  1. getState():获取当前同步状态
  2. setState(int newState):设置当前同步状态
  3. compareAndSetState(int expect, int update) :使用CAS设置当前状态,该方法能够保证状态设置的原子性

同步器可重写方法如下:

   /**
     * 独占式获取同步状态,实现该方法需要查询当前状态并且判断同步状态是否符合预期,然后再进行CAS设置同步状态
     *
     * @param arg
     * @return
     */
    protected boolean tryAcquire(int arg) {
        throw new UnsupportedOperationException();
    }

    /**
     * 独占式释放同步状态,等待获取同步状态的线程将会有机会获取同步状态
     *
     * @param arg
     * @return
     */
    protected boolean tryRelease(int arg) {
        throw new UnsupportedOperationException();
    }

    /**
     * 共享式获取同步状态,返回大于等于0的值,表示获取成功,反正获取失败
     *
     * @param arg
     * @return
     */
    protected int tryAcquireShared(int arg) {
        throw new UnsupportedOperationException();
    }


    /**
     * 共享式释放同步状态
     *
     * @param arg
     * @return
     */
    protected boolean tryReleaseShared(int arg) {
        throw new UnsupportedOperationException();
    }

    /**
     * 当前同步器释放在独占模式下被线程占用,一般该方法表示释放被当前线程所占
     *
     * @return
     */
    protected boolean isHeldExclusively() {
        throw new UnsupportedOperationException();
    }

总结:

方法名称描述
protected boolean tryAcquire(int arg)独占式获取同步状态,实现该方法需要查询当前状态并且判断同步状态是否符合预期,然后再进行CAS设置同步状态
protected boolean tryRelease(int arg)独占式释放同步状态,等待获取同步状态的线程将会有机会获取同步状态
protected int tryAcquireShared(int arg)共享式获取同步状态,返回大于等于0的值,表示获取成功,反正获取失败
protected boolean tryReleaseShared(int arg)共享式释放同步状态
protected boolean isHeldExclusively()当前同步器释放在独占模式下被线程占用,一般该方法表示释放被当前线程所占

同步器提供的模板方法主要分为3类:

  1. 独占式获取与释放同步状态
  2. 共享式获取与释放同步状态
  3. 查询同步队列中的等待线程情况
    /**
     * 独占式获取同步状态,如果当前线程获取同步状态成功,则由该方法返回;
     * 否则,将会进入同步等待队列等待,该方法将会调用重写的tryAcquire(int arg)方法
     *
     * @param arg
     */
    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
                acquireQueued(addWaiter(java.util.concurrent.locks.AbstractQueuedSynchronizer.Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

    /**
     * 与acquire方法相同,但是该方法可以响应中断,当前线程未获取到同步状态进入同步队列中
     * 如果当前线程被中断,则该方法抛出InterruptedException异常
     *
     * @param arg
     * @throws InterruptedException
     */
    public final void acquireInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        if (!tryAcquire(arg))
            doAcquireInterruptibly(arg);
    }

    /**
     * 在acquireInterruptibly方法的基础上增加了超时限制
     * 如果当前线程在超时时间内未获取到同步状态,则返回false,获取到则返回true
     *
     * @param arg
     * @param nanosTimeout
     * @return
     * @throws InterruptedException
     */
    public final boolean tryAcquireNanos(int arg, long nanosTimeout)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        return tryAcquire(arg) ||
                doAcquireNanos(arg, nanosTimeout);
    }

    /**
     * 共享式额获取同步状态,如果当前线程未获取到同步状态,将会进入同步队列等待
     * 与独占式获取的主要区别是在同一时刻可以由多个线程同时获取到同步状态
     *
     * @param arg
     */
    public final void acquireShared(int arg) {
        if (tryAcquireShared(arg) < 0)
            doAcquireShared(arg);
    }


    /**
     * 与acquireShared方法相同,该方法响应中断
     *
     * @param arg
     * @throws InterruptedException
     */
    public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        if (tryAcquireShared(arg) < 0)
            doAcquireSharedInterruptibly(arg);
    }

    /**
     * 在acquireSharedInterruptibly的基础上增加了超时限制
     *
     * @param arg
     * @param nanosTimeout
     * @return
     * @throws InterruptedException
     */
    public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        return tryAcquireShared(arg) >= 0 ||
                doAcquireSharedNanos(arg, nanosTimeout);
    }

    /**
     * 独占式的释放同步状态,该方法会在释放同步状态之后,将同步队列中第一个节点包含的线程唤醒
     *
     * @param arg
     * @return
     */
    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            java.util.concurrent.locks.AbstractQueuedSynchronizer.Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

    /**
     * 共享式的释放同步状态
     *
     * @param arg
     * @return
     */
    public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }

    /**
     *  获取等待在同步队列上的线程集合
     *
     * @return
     */
    public final Collection<Thread> getQueuedThreads() {
        ArrayList<Thread> list = new ArrayList<Thread>();
        for (java.util.concurrent.locks.AbstractQueuedSynchronizer.Node p = tail; p != null; p = p.prev) {
            Thread t = p.thread;
            if (t != null)
                list.add(t);
        }
        return list;
    }


总结

方法名称描述
void acquire(int arg)独占式获取同步状态,如果当前线程获取同步状态成功,则由该方法返回;否则,将会进入同步等待队列等待,该方法将会调用重写的tryAcquire(int arg)方法
void acquireInterruptibly(int arg)与acquire方法相同,但是该方法可以响应中断,当前线程未获取到同步状态进入同步队列中,如果当前线程被中断,则该方法抛出InterruptedException异常
tryAcquireNanos(int arg, long nanosTimeout)在acquireInterruptibly方法的基础上增加了超时限制,如果当前线程在超时时间内未获取到同步状态,则返回false,获取到则返回true
acquireShared(int arg)共享式额获取同步状态,如果当前线程未获取到同步状态,将会进入同步队列等待,与独占式获取的主要区别是在同一时刻可以由多个线程同时获取到同步状态
void acquireSharedInterruptibly(int arg)与acquireShared方法相同,该方法响应中断
tryAcquireSharedNanos(int arg, long nanosTimeout)在acquireSharedInterruptibly的基础上增加了超时限制
boolean release(int arg)独占式的释放同步状态,该方法会在释放同步状态之后,将同步队列中第一个节点包含的线程唤醒
boolean releaseShared(int arg)共享式的释放同步状态
Collection getQueuedThreads()获取等待在同步队列上的线程集合

2.2 自定义独占锁加强AbstractQueuedSynchronizer的工作原理的理解

独占锁指的是,同一时刻只能有一个线程获取到锁,其他获取锁的线程只能处于等待队列中等待,只有获取到锁的线程释放了锁,后继的线程才能获取到锁。(不太了解的可以写一遍,基本上就懂了)

package com.lizba.p5;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;

/**
 * <p>
 *      自定义独占锁
 * </p>
 *
 * @Author: Liziba
 * @Date: 2021/6/19 22:40
 */
public class Mutex implements Lock {

    private static class Sync extends AbstractQueuedSynchronizer {

        /**
         * 尝试获取锁
         *
         * @param arg
         * @return
         */
        @Override
        protected boolean tryAcquire(int arg) {
            // 当前状态如果为0则获取到锁
            if (compareAndSetState(0, 1)) {
                // 设置锁的占用线程为当线程
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }

        /**
         * 尝试释放锁
         *
         * @param arg
         * @return
         */
        @Override
        protected boolean tryRelease(int arg) {
            // 如果当前同步状态为0,调用该方法则抛出异常
            if (getState() == 0) {
                throw new IllegalMonitorStateException();
            }
            // 清空占用线程
            setExclusiveOwnerThread(null);
            // 设置共享状态为0
            setState(0);
            return true;
        }

        /**
         * 判断当前线程是否处于占用状态
         * @return
         */
        @Override
        protected boolean isHeldExclusively() {
            return getState() == 1;
        }

        /**
         * 返回一个Condition,每个Condition包含一个condition队列
         *
         * @return
         */
        Condition condition() {
            return new ConditionObject();
        }
    }

    // 将需要的操作代理至Sync上
    private Sync sync = new Sync();

    @Override
    public void lock() {
        sync.acquire(1);
    }

    @Override
    public boolean tryLock() {
        return sync.tryAcquire(1);
    }

    @Override
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        return sync.tryAcquireNanos(1, unit.toNanos(time));
    }

    @Override
    public void lockInterruptibly() throws InterruptedException {
        sync.acquireInterruptibly(1);
    }

    @Override
    public void unlock() {
        sync.release(1);
    }

    @Override
    public Condition newCondition() {
        return sync.condition();
    }

    public boolean hasQueuedThreads() {
        return sync.hasQueuedThreads();
    }
}

总结自定义同步组件Mutex互斥锁:

通过自定义同步组件Mutex我们可以看出。Mutex定义了一个静态内部类,该内部类继承了同步器并实现了独占式获取和释放同步状态。用户在使用Mutex的时候并不会直接和内部同步器打交道,而是调用Mutex提供的方法,在Mutex的实现中以获取锁的lock()方法为例,只需要在方法实现中调用同步器的模板方法acquire(int args)即可。这种实现方法大大降低了实现一个可靠自定义同步组件的门槛。(不多说_Doug Lea牛逼_)

3、AbstractQueuedSynchronizer实现分析

分析的主要内容包括如下几个方面

  1. 同步队列
  2. 独占式同步状态获取与释放
  3. 共享式同步状态获取与释放
  4. 超时获取同步状态

3.1 同步队列

同步队列实现依赖的是内部的一个(FIFO)的同步队列来完成同步状态管理的,而这个队列的重中之重就是AbstractQueuedSynchronizer的内部类Node,这个Node节点是用来保存同步失败的线程引用、等待状态以及前驱prev和后继next节点。
节点源码重点部分解释:

static final class Node {

    /**
     * 等待状态
     * 0 					初始状态
     * -3 = PROPAGATE 		表示下一次共享式同步状态获取将会无条件地传播下去
     * -2 = CONDITION		节点线程等待在Condition上,当其他线程对Condition调用了signal()后,节点将会从等待队列中转移到同步队列中,加入到同步状态的获取中
     * -1 = SIGNAL			当前节点的线程释放同步状态或者被取消,则通知后继节点,使其节点线程得以运行
     * 1 = CANCELLED		同步队列中等待的线程等待超时或者被中断,需要从同步队列中取消,进入该状态后将不会在改变状态
     */
    volatile int waitStatus;
    
    /**
     *	前驱节点
     */
    volatile Node prev;
    
    /**
     *	后继节点
     */
    volatile Node next;
    
    /**
     *	获取同步状态的线程
     */
    volatile Thread thread;
    
    /**
     *	等待在Condition上的后继节点。当前节点是共享模式时,含义特殊,它也代表节点类型(独占/共享)
     */
     Node nextWaiter;

}

在AbstractQueuedSynchronizer的内部类代表同步队列中的节点,而AbstractQueuedSynchronizer会持有首节点(head)和尾节点(tail),获取同步状态失败的节点加入队列的尾部。

/**
 * AbstractQueuedSynchronizer中持有的同步队列的头节点
 */
private transient volatile Node head;

/**
 * AbstractQueuedSynchronizer中持有的同步队列的尾节点
 */
private transient volatile Node tail;

通过一组图来查看AQS的结构

AQS同步队列的基本结构

  1. 同步器持有首节点和尾节点,初始都为null
  2. 获取同步状态失败的线程节点加入队列尾部

在这里插入图片描述

AQS同步队列加入节点

  1. 通过compareAndSetTail(Node expect, Node update)提供的CAS操作来正确的设置尾节点
  2. 加入尾节点成功后,将原先尾节点的后继节点指向新的尾节点,新的尾节点的前驱节点设置为原尾节点
    在这里插入图片描述

AQS同步队列设置首节点

  1. 同步队列节点顺序出入队遵循FIFO
  2. 原先首节点释放同步状态,唤醒后继节点,后继节点获取同步状态成功后设置自己为首节点
    在这里插入图片描述

3.2 独占式同步状态获取与释放

独占式同步状态获取通过调用同步器的acquire(int arg)获取同步状态,注意该方法不响应中断。

获取解析

acquire(int arg)源码解析

/**
 * 方法主要完成以下功能
 * 1、线程安全的同步状态获取
 * 2、节点构造加入同步队列
 * 3、同步队列中自旋
 */
public final void acquire(int arg) {
      if (!tryAcquire(arg) &&
          acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
          selfInterrupt();
 }

acquire(int arg)之addWaiter(Node mode)

/**
 * 尝试将获取同步状态失败的节点通过CAS的方式加入同步队列尾部
 */
private Node addWaiter(Node mode) {
    // 创建一个新的节点,设置节点信息,和节点线程为当前线程
    Node node = new Node(Thread.currentThread(), mode);
    // 获取尾节点,当尾节点不为空的时候,尝试设置尾节点
    Node pred = tail;
    if (pred != null) {
        // 设置当前插入节点的前驱节点为当前同步队列中的尾节点
        node.prev = pred;
        // 通过CAS快速设置尾节点为当前插入的节点
        if (compareAndSetTail(pred, node)) {
            // 如设置尾节点成功,将pred节点(同步队列上一个的尾节点,此时新的尾节点为插入节点)的后继节点指向新插入的节点(新的尾节点)
            pred.next = node;
            // 返回节点对象
            return node;
        }
    }
    enq(node);
    return node;
}

addWaiter(Node mode)之 enq(node)

/**
 * 通过“死循环”的方式来正确的添加节点
 */
private Node enq(final Node node) {
   	// 不断循环,直至CAS插入节点成功
    for (;;) {
        Node t = tail;
        if (t == null) { 
            // 当尾节点为null,此时需要初始化头节点和尾节点
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            // 插入节点前驱节点指向原先尾节点
            node.prev = t;
            // CAS插入至同步队列的尾节点
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

acquire(int arg)之 acquireQueued(final Node node, int arg)

/**
 * 死循环获取同步状态,并且当前仅当前驱节点是头节点是才能够尝试获取同步状态
 */
final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        // 不断循环
        for (;;) {
            // 获取当前节点的前驱节点,如果前驱节点为null将会抛出空指针异常
            final Node p = node.predecessor();
            // 如果当前节点的前驱节点是头节点,尝试获取同步状态
            if (p == head && tryAcquire(arg)) {
                // 设置当前节点为头节点,并且将节点线程和节点的前驱节点置为null,help GC
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            // 如果不符合条件,则判断当前节点前驱节点的waitStatus状态来决定是否需要挂起LockSupport.park(this);
            if (shouldParkAfterFailedAcquire(pAQS 原理以及 AQS 同步组件总结

AQS(AbstractQueuedSynchronizer)源码深度解析—AQS的设计与总体结构

4.从AbstractQueuedSynchronizer(AQS)说起——AQS结语

AQS 详解

AQS学习

Java AQS学习