AQS

Posted 有心有梦

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了AQS相关的知识,希望对你有一定的参考价值。

AQS

队列同步器(AbstractQueuedSynchronizer),是用来构建锁或者其他同步组件的基础框架,它使用了一个int成员变量表示同步状态,通过内置的FIFO队列来完成资源获取线程的排队工作。

同步器的主要使用方式是继承,子类通过继承同步器并实现它的抽象方法来管理同步状态,在抽象方法的实现过程中免不了要对同步状态进行更改,这时就需要使用同步器提供的3个方法(下面的三个方法)来进行操作。因为这三个方法能够保证状态的改变是安全的。

同步器自身没有实现任何同步接口,它仅仅是定义了若干同步状态获取和释放的方法来供自定义同步组件使用,同步器既可以支持独占式地获取同步状态,也可以支持共享式地获取同步状态,这样就可以方便实现不同类型的同步组件。

同步器是实现锁或者其他任意同步组件的关键,在锁的实现中聚合同步器,利用同步器实现锁的语义。可以这样理解二者之间的关系:锁是面向使用者的,它定义了使用者与锁交互的接口,隐藏了实现细节;同步器面向的是锁的实现者,它简化了锁的实现方式,屏蔽了同步状态管理、线程排队、等待与唤醒等底层操作。

队列同步器的设计是基于模版方法模式的,也就是说使用者需要继承同步器并重写指定的方法,随后将同步器子类组合在自定义同步组件的实现中,并调用同步器提供的模板方法,而这些模板方法将会调用使用者重写的方法。

重写同步器指定的方法时,需要使用同步器提供的如下三个方法来访问或者修改同步状态:

// 获取当前同步状态
protected final int getState() {
    return state;
}

// 设置当前同步状态
protected final void setState(int newState) {
    state = newState;
}

// 使用CAS设置当前状态,该方法能够保证状态设置的原子性
protected final boolean compareAndSetState(int expect, int update) {
   // See below for intrinsics setup to support this
   return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}

同步器可重写的方法

  • protected boolean tryAcquire(int arg) :独占式获取同步状态,实现该方法需要查询当前状态并判断同步状态是否符合预期,然后再进行CAS设置同步状态;

  • protected boolean tryRelease(int arg):独占式释放同步状态,等待获取同步状态的线程将有机会获取同步状态;

  • protected int tryAcquireShared(int arg):共享式获取同步状态,返回大于等于0的值,表示获取成功,反之获取失败;

  • protected boolean tryReleaseShared(int arg):共享式释放同步状态;

  • protected boolean isHeldExclusively():当前同步器是否在独占模式下被线程占用,一般该方法表示是否被当前线程独占。

同步器提供的模板方法

实现自定义组件时,将会调用同步器提供的模板方法,这些模板方法基本上分为3类:独占式获取与释放同步状态、共享式获取与释放同步状态、查询同步队列中的等待线程情况。

1、独占式获取与释放同步状态

(1)独占式获取同步状态,如果当前线程获取同步状态成功,则为由该方法返回,否则,将会进入同步队列等待,该方法将会调用重写的tryAcquire方法。

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

(2)与acquire相同,但是该方法响应中断,当前线程未获取到同步状态而进入同步队列中,如果当前线程被中断,则该方法会抛出InterruptedException并返回。

public final void acquireInterruptibly(int arg)
        throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    if (!tryAcquire(arg))
        doAcquireInterruptibly(arg);
}

(3)在acquireInterruptibly方法的基础上增加了超时限制,如果当前线程在超时时间内没有获取到同步状态,那么将会返回false,如果获取到了返回true。

public final boolean tryAcquireNanos(int arg, long nanosTimeout)
        throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    return tryAcquire(arg) ||
        doAcquireNanos(arg, nanosTimeout);
}

(4)独占式的释放同步状态,该方法会在释放同步状态之后,将同步队列中第一个节点包含的线程唤醒。该方法将会调用重写的tryRelease方法。

public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

2、共享式获取与释放同步状态

(1)共享式的获取同步状态,如果当前线程未获取到同步状态,将会进入同步队列等待,与独占式获取的主要区别是在同一时刻可以有多个线程获取到同步状态。该方法会调用重写的tryAcquireShared方法。

public final void acquireShared(int arg) {
    if (tryAcquireShared(arg) < 0)
        doAcquireShared(arg);
}

(2)与acquireShared方法相同,该方法响应中断。

public final void acquireSharedInterruptibly(int arg)
        throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    if (tryAcquireShared(arg) < 0)
        doAcquireSharedInterruptibly(arg);
}

(3)在acquireSharedInterruptibly方法的基础上增加了超时限制;

public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout)
        throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    return tryAcquireShared(arg) >= 0 ||
        doAcquireSharedNanos(arg, nanosTimeout);
}

(4)共享式的释放同步状态,该方法会调用重写的tryReleaseShared方法;

public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}

3、查询同步队列中的等待线程情况

获取等待在同步队列上的线程集合。

public final Collection<Thread> getQueuedThreads() {
    ArrayList<Thread> list = new ArrayList<Thread>();
    for (Node p = tail; p != null; p = p.prev) {
        Thread t = p.thread;
        if (t != null)
            list.add(t);
    }
    return list;
}

自定义同步组件将使用同步器提供的模板方法来实现自己的同步语义。

自定义一个独占锁的示例

独占锁就是在同一时刻只能有一个线程获取到锁,而其他获取锁的线程只能处于同步队列等待中,只有获取锁的线程释放了锁,后继的线程才能获取锁。

package concurrent.lock;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;

public class Mutex implements Lock {

    // 静态内部类,自定义同步器,重写对应的方法
    private static class Sync extends AbstractQueuedSynchronizer{

        /**
         * 判断是否处于独占状态
         * @return
         */
        @Override
        protected boolean isHeldExclusively() {
            return getState() == 1;
        }

        /**
         * 当状态为0的时候获取锁
         * @param arg
         * @return
         */
        @Override
        protected boolean tryAcquire(int arg) {

            if (compareAndSetState(0,1)){
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }

        /**
         * 释放锁,将状态设置未0
         * @param arg
         * @return
         */
        @Override
        protected boolean tryRelease(int arg) {

            if (getState() == 0){
                throw new IllegalMonitorStateException();
            }

            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }

        // 返回一个Condition,每个condition包含了一个condition队列
        Condition newCondition() {
            return new ConditionObject();
        }
    }

    // 只需将操作代理到Sync上即可
    private final Sync sync = new Sync();
    @Override
    public void lock() {
        sync.acquire(1);
    }

    @Override
    public void lockInterruptibly() throws InterruptedException {
        sync.acquireInterruptibly(1);
    }

    @Override
    public boolean tryLock() {
        return sync.tryAcquire(1);
    }

    @Override
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        return sync.tryAcquireNanos(1,unit.toNanos(time));
    }

    @Override
    public void unlock() {
        sync.release(1);
    }

    @Override
    public Condition newCondition() {
        return sync.newCondition();
    }

    public boolean hasQueuedThreads(){
        return sync.hasQueuedThreads();
    }

    public boolean isLocked(){
        return sync.isHeldExclusively();
    }
}

在这个示例中,独占锁Mutex是一个自定义同步组件,它在同一时刻只允许一个线程占有锁。在这个自定义组件中,定义了一个静态内部类,该内部类继承了同步器并实现了独占式获取和释放同步状态。这也正符合了上面所说的设计一个自定义的锁的原则,即先继承AQS并重写相关方法,然后将同步器子类组合在自定义同步组件的实现中。

队列同步器的实现分析

1.同步队列

同步器依赖内部的同步队列(一个FIFO双向队列)来完成同步状态的管理,当前线程获取同步状态失败时,同步器会将当前线程以及等待状态等信息构造成为一个节点(Node)并将其加入同步队列中,同时会阻塞当前线程,当同步状态释放时,会把首节点中的线程唤醒,使其再次尝试获取同步状态。

同步队列中的节点(Node)用来保存获取同步状态失败的线程引用、等待状态以及前驱和后继节点。同步队列的基本结构如下所示:

image-20210608085936458

基本结构包括节点和同步器,节点是同步队列的基础,同步器拥有两个节点的引用,head引用指向头节点,tail引用指向尾节点。没有成功获取同步状态的线程将会成为节点加入该队列的尾部。

因为在同一时刻获取同步状态失败的线程可能会有多个,所以这些线程被构造成为节点加入到同步队列时必须要保证线程安全,因此同步器提供了一个基于CAS算法设置尾部节点的方法:compareAndSetTail(Node expect,Node update),调用该方法时,需要传递当前线程“期望”的尾部节点和当前线程构造的节点,只有设置成功后,当前节点才正式与之前的尾部节点建立关联。

同步队列遵循FIFO原则,首节点是获取同步状态成功的节点,首节点的线程在释放同步状态时将会唤醒后继节点,而后继节点将会在获取同步状态成功时将自己设置为首节点。

image-20210608093038551

设置首节点是通过获取同步状态成功的线程来完成的,由于只有一个线程能够成功获取同步状态,因此设置头节点的方法并不需要使用CAS来保证,它只需要将首节点设置成为原首节点的后继节点并断开原首节点的next引用即可。

head = head.next;
head.prev.next = null

2.独占式同步状态获取与释放

独占式同步状态获取流程:

image-20210608105237689

独占式同步状态的获取是通过调用acquire完成的,该方法对中断不敏感,即线程获取同步状态失败以后,进入到同步队列中,后续对线程进行中断操作时,线程不会从同步队列中移出。

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

该方法的执行逻辑是:

​ 1、首先调用自定义同步器实现的tryAcquire方法获取同步状态,获取成功则该方法执行完毕,退出;获取失败,则执行下一步操作

​ 2、同步状态获取失败后,创建同步节点(构造成独占式(EXCLUSIVE)节点),并通过addWaiter方法将该节点加入到同步队列的尾部;

​ 3、最后调用acquireQueued方法,使该节点以“死循环”的方式获取同步状态。如果获取不到,则阻塞节点中的线程,而被阻塞线程的唤醒主要依靠前驱节点的出队或者阻塞线程被中断来实现。

下面是节点的构造和入队过程:

private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
    // Try the fast path of enq; backup to full enq on failure
    Node pred = tail;
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    enq(node);
    return node;
}

private Node enq(final Node node) {
     for (;;) {
         Node t = tail;
         if (t == null) { // Must initialize
             if (compareAndSetHead(new Node()))
                 tail = head;
         } else {
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
    }
}

compareAndSetTail方法能够确保节点被线程安全地添加到队列中。在enq方法中,同步器通过“死循环”的方保证节点的正确添加,在“死循环”中只有通过CAS将节点设置为尾节点之后,当前线程才能返回,否则将不断自旋尝试设置。可以看出enq方法将并发添加节点的请求通过CAS操作变得“串行化”了。

节点进入到同步队列以后,就进入了一个自旋的过程,每个节点(或者说每个线程)都在自旋观察,当满足条件并获取到了同步状态以后就可以退出自旋过程。

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            
            // 只有在前驱节点等待状态为SIGNAL的情况下才会将自己阻塞,因为SIGNAL状态下的节点会在状态改变时唤醒后继节点
            if (shouldParkAfterFailedAcquire(p, node) &&  // 尝试获取同步状态失败后,判断当前节点是否应该被阻塞
                parkAndCheckInterrupt()) // 将当前线程阻塞,并检查此过程中线程是否被中断
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

在acquireQueued方法中,当前线程在死循环中尝试获取同步状态,但是,只有前驱节点是头结点时才能够尝试获取同步状态,从上面的源码中就可以看出。这么的原因有两个:

(1)头结点是成功获取到同步状态的节点,而头结点的线程释放了同步状态之后,将会唤醒其后继节点,后继节点的线程被唤醒后需要检查自己的前驱节点是否是头结点。

(2)维护同步队列的FIFO原则。因为某个非首节点因为中断而从等待状态返回时,会唤醒其后继节点线程,但是它并不是首节点,所以其后继节点也不应该参与同步状态的获取过程,所以当同步队列中的某个非首节点被唤醒时,还需先判断其前驱节点是否为首节点,是首节点的话,才能继续尝试获取同步状态,否则不能尝试获取同步状态。

当同步状态获取成功以后,当前的线程会直接从acquire方法返回,对于锁这种并发组件来说,这代表了当前线程获取到了锁。

线程获取到了同步状态并执行了相应的逻辑以后,就需要释放同步状态,使得后续节点能够继续获取同步状态。release方法用于释放同步状态,该方法在释放了同步状态以后,会唤醒其后继节点,唤醒工作由unparkSuccessor方法完成。

public final boolean release(int arg) {
    if (tryRelease(arg)) { //尝试释放同步状态,自定义同步组件时需重写
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h); //唤醒h的后继节点
        return true;
    }
    return false;
}

注意:在重写tryRelease方法时,当前线程重入同一个锁arg次,那么释放的时候也需要释放arg次,锁才会真正被释放。

release方法一般被ReentrantLock类的unLock方法调用。

3.独占式超时获取同步状态

独占式超时获取同步状态流程:

image-20210608110511253

tryAcquireNanos方法是在独占式获取同步状态的基础上,又拥有了中断响应和超时获取的特性。

public final boolean tryAcquireNanos(int arg, long nanosTimeout)
        throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    return tryAcquire(arg) ||
        doAcquireNanos(arg, nanosTimeout);
}

其内部的核心逻辑是先判断当前线程是否被其他线程进行了中断标记操作,如果有,则直接判处中断异常。没有继续向下执行,首先尝试获取同步状态,获取成功则直接返回;获取失败则执行doAcquireNanos方法进行超时获取同步状态。

private boolean doAcquireNanos(int arg, long nanosTimeout)
        throws InterruptedException {
    if (nanosTimeout <= 0L)
        return false;
    final long deadline = System.nanoTime() + nanosTimeout;
    final Node node = addWaiter(Node.EXCLUSIVE);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return true;
            }
            nanosTimeout = deadline - System.nanoTime();
            if (nanosTimeout <= 0L)
                return false;
            if (shouldParkAfterFailedAcquire(p, node) &&
                nanosTimeout > spinForTimeoutThreshold)
                LockSupport.parkNanos(this, nanosTimeout);
            if (Thread.interrupted())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

该方法在自旋过程中,当节点的前驱节点为头结点时尝试获取同步状态,如果获取成功则从该方法返回;如果获取失败,那么会计算时间间隔nanosTimeout,它的值等于:

nanosTimeout = deadline - System.nanoTime()

其中deadline的值在刚进入方法时设置:

deadline = System.nanoTime() + nanosTimeout;  // 此处的nanosTimeout为我们指定的超时时间

也就是说每次时间间隔等于预先设置好的超时时间加上最开始的系统时间然后减去现在执行这条语句的时间,这就是新的时间间隔:

​ (1)如果这个时间间隔小于等于0,说明线程超时获取的时间已经超过了规定的超时时间,那么直接返回false。否则继续执行2、3步骤;

​ (2)如果这个时间间隔大于0就继续自旋过程,其中如果nanosTimeout大于1000纳秒,那么将会使该线程进行超时等待,等待时间为nanosTimeout,等待时间到达nanosTimeout以后会从LockSupport.parkNanos返回;

​ (3)如果nanosTimeout小于1000纳秒,那么将不会使得该线程进行超时等待,而是进入快速自旋过程。这么做的原因在于,非常短的超时等待无法做到十分精确,如果这时再进行超时等待,相反会让nanosTimeout的超时从整体上表现得反而不精确。所以在超时时间间隔非常短的情况下,同步器会进入无条件的快速自选过程。

4.共享式同步状态获取与释放

共享式同步状态的获取与独占式获取最主要的区别在于同一时刻能否有多个线程同时获取到同步状态。共享式访问资源时,其他共享式的访问均被允许,y独占式访问被阻塞;独占式访问资源时,同一时刻其他访问均被阻塞。

通过调用同步器的acquireShared方法可以共享式地获取同步状态:

public final void acquireShared(int arg) {
    if (tryAcquireShared(arg) < 0)
        doAcquireShared(arg);
}

调用tryAcquireShared方法尝试获取同步状态,其返回值类型为int类型,当返回值类型大于等于0时,表示能够获取到同步状态。因此,在共享式获取的自选过程中,成功获取到同步状态并退出自旋的条件就是tryAcquireShared方法返回值大于等于0。如果返回值小于0,那么就执行doAcquireShared方法,进入自选过程:

private void doAcquireShared(int arg) {
    final Node node = addWaiter(Node.SHARED); //构造共享式节点
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    if (interrupted)
                        selfInterrupt();
                    failed = false;
                    return;
                }
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

可以看出,doAcquireShared方法的自旋过程中,如果当前节点的前驱为头节点时,尝试获取同步状态,如果返回值大于等于0,表示该次获取同步状态成功并从自旋过程中退出。

共享式获取也需要释放同步状态,通过调用releaseShared方法可以释放同步状态:

public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}

该方法在释放同步状态之后,将会唤醒后续处于等待状态的节点。不过需要注意的,释放同步状态的操作可能会同时来自多个线程,所以releaseShared方法必须确保同步状态的线程安全释放,这一般是通过循环和CAS来保证的。

参考:《Java并发编程的艺术》

以上是关于AQS的主要内容,如果未能解决你的问题,请参考以下文章

AQS

源码剖析:AQS-AbstractQueuedSynchronizer

AQS 同步组件学习

并发编程Java并发编程-看懂AQS的前世今生

多线程(十AQS原理-ReentrantLock实现)

Java并发包基石-AQS详解