深入理解AbstractQueuedSynchronizer

Posted Dream_it_possible!

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了深入理解AbstractQueuedSynchronizer相关的知识,希望对你有一定的参考价值。

目录

什么是AQS?

AQS类、核心方法和属性分析

1.  AbstractOwnerSynchronizer抽象类。

2. 初识静态内部类Node和内部类ConditionObject

        问题1:  在AQS中,为什么会有Node静态内部类和ConditionObject内部类?

        问题2: 在AQS中,为什么要维护2个队列呢?一个同步队列不够用嘛?

3. 深入理解条件等待队列

4.深入理解同步等待队列

        问题3: 处于阻塞等待的线程是如何在同步队列里被唤醒的?

5.条件队列转同步等待队列 

小结


什么是AQS?

         AbstractQueuedSynchronizer简称AQS,它 是一个抽象的队列同步器,AQS通过维护一个共享的资源状态state, 其中state 被关键字volatile修饰,和一个先进先出的线程等待队列来实现多线程环境下的共享资源的同步访问。AQS在并发编程中是一个非常重要的抽象类,AbstractQueuedSynchronizer类在java.uti.concurrent.locks包下的,很多并发关键字,例如,CountDownLatch、CyclicBarrier、ReentrantLock、ReentranReadWriteLock、Semaphore等核心都是基于AQS实现的,在学习中,我们不光要知道他的用法,掌握AQS的核心原理是至关重要的,在学习的过程中,阅读源代码和官方的注释能够帮我们事半功倍。

AQS类、核心方法和属性分析

1.  AbstractOwnerSynchronizer抽象类。

       首先,我们从类的角度分析,AbstractQueuedSyunchronizer继承了抽象类AbstractOwnerSynchronizer,该抽象类只有2个方法,一个setter和一个getter加一个核心属性exclusiveOwnerThread。

public abstract class AbstractOwnableSynchronizer
    implements java.io.Serializable 

    /** Use serial ID even though all fields transient. */
    private static final long serialVersionUID = 3737899427754241961L;

    /**
     * Empty constructor for use by subclasses.
     */
    protected AbstractOwnableSynchronizer()  

    /**
     * The current owner of exclusive mode synchronization.
     */
    private transient Thread exclusiveOwnerThread;

    /**
     * Sets the thread that currently owns exclusive access.
     * A @code null argument indicates that no thread owns access.
     * This method does not otherwise impose any synchronization or
     * @code volatile field accesses.
     * @param thread the owner thread
     */
    protected final void setExclusiveOwnerThread(Thread thread) 
        exclusiveOwnerThread = thread;
    

    /**
     * Returns the thread last set by @code setExclusiveOwnerThread,
     * or @code null if never set.  This method does not otherwise
     * impose any synchronization or @code volatile field accesses.
     * @return the owner thread
     */
    protected final Thread getExclusiveOwnerThread() 
        return exclusiveOwnerThread;
    

接着我们看一下该类官方给出的注释: A synchronizer that may be exclusively owned by a thread, 翻译为:  一个同步者或许是独占式的,只能被一个线程所拥有,简单的讲就是独占式的锁模式下,一般只有一个线程能够访问该共享资源,其他线程不能访问,即exclusiveOwnerThread为currentThread,主要的功能是用来资源的加锁和解释相关的服务调用,该类的具体用法会在接下来的内容中得到解释。

2. 初识静态内部类Node和内部类ConditionObject

        看到Node属性,我们可能会联想到单链表,每个node持有指针域和值, 单链表中的Node可以通过next指针将多个node串联起来, 链表中的第一个结点就是Head, 尾结点一般叫做Tail,这里的Node是用来做同步等待队列。

        问题1:  在AQS中,为什么会有Node静态内部类和ConditionObject内部类?

        首先我们看一下官方解释Wait queue node class,解释为Node是为了同步等待队列设计的,同步等待队列是一个双向链表,含有next指针和prev指针。

               +------+  prev      +-----+          +-----+
       head |        |  <----       |         |  <---- |       |  tail
               +------+ --next- >  +-----+   ----> +-----+

        ConditionObject 是AbstractQueuedSynchronizer类里的一个内部类, 主要作用是标记一个条件等待队列,也是利用Node节点来实现,不过条件等待队列里使用的单链表结构 。

        在上文中我们说到AQS其实是一个抽象的队列同步器,提到条件等待队列,那AQS种为什么要用到条件等待队列呢? 只有一个同步队列不够用嘛?

        问题2: 在AQS中,为什么要维护2个队列呢?一个同步队列不够用嘛?

        想要寻找问题的答案,那么我们需要先了解2个概念,独占式锁和共享式锁,独占式的锁指的是一个共享资源只能被一个线程所持有,在持有的过程中,其他线程不能抢占,只能等待资源被释放后,其他线程才能去争夺重新加锁,例如ReentrantLock就是独占式锁,共享式锁指的是多个线程能够能按照一定的顺序或规则去对共享资源加锁,比如Semaphore,处于同步队列的头结点A能首先争夺到state,等头结点执行完毕后,state-1, 其他在同步队列里的结点能在A结点释放资源后继续争夺该state, 等state=0时,所有线程加锁执行完毕后,该资源将会被释放掉条件duilie,同步队列的作用就体现出来了,用于等待正在被其他线程所占用的共享资源释放,同时保证了多个线程执行任务的同步,共享模式下,有线程持有共享资源时,其他能持有共享锁的线程会进入到条件等待队列里进行wait,然后等待唤醒, 简单的讲,条件等待队列是为了阻塞设计的。

        那么条件队列是个什么东西呢?接着看条件等待队列的原理。

3. 深入理解条件等待队列

         理解了上述的两个概念后,我们可以先看AbstractQueuedSynchronizer里的一个方法: addConditionWaiter(), 该方法的作用是往条件等待队列里添加结点,将当前线程添加到Node里,然后属性为Node.CONDITION, 由于条件等待队列是一个单链表,每次插入结点从队尾插入,每次最后的赋值是lastWaiter.next=node,因此是从单链表的指向方向插入一个新的结点到条件等待队列里。


        /**
         * Adds a new waiter to wait queue.
         * @return its new wait node
         */
        private Node addConditionWaiter() 
            Node t = lastWaiter;
            // If lastWaiter is cancelled, clean out.
            if (t != null && t.waitStatus != Node.CONDITION) 
                unlinkCancelledWaiters();
                t = lastWaiter;
            
            Node node = new Node(Thread.currentThread(), Node.CONDITION);
            if (t == null)
                firstWaiter = node;
            else
                t.nextWaiter = node;
            lastWaiter = node;
            return node;
        

        此方法是将结点添加到条件同步队列里,如果有正在执行的线程的waitStatus=-2时,那么该线程会进入到该条件队列里,等待被唤醒,在Node静态内部类里含有一个常量属性waitStatus, 其中CONDITION表示处于条件等待队列里的线程。

static final int CONDITION = -2;

        该常量属性的官方解释为:  waitStatus value to indicate thread is waiting on condition,该值为-2时表示该线程处于等待状态。 

        画了一个图,简述节点插入到条件等待队列waitQueue里的过程。 

         总而言之,条件等待队列是为同步等待队列准备的,是为资源的同步、阻塞服务的,这样的插入方式其实是为了实现一个先进先出的单向队列,接着看同步等待队列。

4.深入理解同步等待队列

        理解了条件等待队列里的节点插入原理后,再看另外一个方法addWaiter(Node node), 该方法的作用是往同步队列里插入节点,其中enq()方法包含了创建同步队列的方法和过程,也是AQS的核心算法之一,非常的精髓!

   /**
     * Creates and enqueues node for current thread and given mode.
     *
     * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
     * @return the new node
     */
    private Node addWaiter(Node mode) 
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) 
            node.prev = pred;
            if (compareAndSetTail(pred, node)) 
                pred.next = node;
                return node;
            
        
    // 如果tail节点为空,那么表示现在是进入同步等待队列里。
        enq(node);
        return node;
    

        我们都知道线程可以从阻塞状态变为唤醒状态,那么唤醒状态的线程会存放到同步队列里,此方法的用来接收处于唤醒状态的结点,只有处于唤醒的状态的结点在同步队列中可以去争夺共享资源。

        enq方法通过循环,将初始化队列的过程展示出来,最终形成的结果是一个双向链表。

    private Node enq(final Node node) 
        for (;;) 
            Node t = tail;
            if (t == null)  // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
             else 
                node.prev = t;
                if (compareAndSetTail(t, node)) 
                    t.next = node;
                    return t;
                
            
        
    

        下面图解同步队列的创建过程:

         由图可以发现,创建的最终结果为一个双向链表, 这下我们知道了AQS是怎样往同步队列里插入一个新的结点!

        看到这里,条件等待队列的作用就显现出来的,一方面存放阻塞的线程,一方面是为了同步等待队列服务的。

        问题3: 处于阻塞等待的线程是如何在同步队列里被唤醒的?

        如果没有真正的去看过源码,那么真的会难以理解,首先处于等待的节点在条件等待队列里,按照先进先出的原则,依次出队,出队后会插入到同步等待队列里,然后会进行一个compareAndSetWaitStatus(p, ws, Node.SIGNAL)方法将节点的状态职为SINGAL状态,置好了后,再由LockSupport.unpark(node.thread)将线程真正的唤醒。

        为了理解同步队列的作用,直接看一个例子CountDownLatch, 它的实现是采用共享式的Node, 一个线程用完共享资源后,会执行doReleaseShared()方法,该方法的作用和原理如下:

        1. 通过循环,拿到同步队列头节点,如果头节点是处于SIGNAL状态,那么就将头节点的状态置为0, 节点的默认的waitStatus状态为0, 官方解释: It will not be used as a sync queue node until transferred, at which time the status will be set to 0, 即同步队列的头节点释放资源相当于从同步队列里移除。

    private void doReleaseShared() 
        /*
         * Ensure that a release propagates, even if there are other
         * in-progress acquires/releases.  This proceeds in the usual
         * way of trying to unparkSuccessor of head if it needs
         * signal. But if it does not, status is set to PROPAGATE to
         * ensure that upon release, propagation continues.
         * Additionally, we must loop in case a new node is added
         * while we are doing this. Also, unlike other uses of
         * unparkSuccessor, we need to know if CAS to reset status
         * fails, if so rechecking.
         */
        for (;;) 
            Node h = head;
            if (h != null && h != tail) 
                int ws = h.waitStatus;
                if (ws == Node.SIGNAL) 
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;            // loop to recheck cases
                     // 唤醒下一个节点
                    unparkSuccessor(h);
                
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;                // loop on failed CAS
            
            if (h == head)                   // loop if head changed
                break;
        
    

        2. 然后接着执行unparkSuccessor(h), 此方法的作用是唤醒继任者, 即把共享锁让给继任者。 Node s = node.next, 通过LockSupport.unpark(s.thread)方法去唤醒。

   if (s != null)
        LockSupport.unpark(s.thread);

         unparkSuccessor()方法代码如下: 


    /**
     * Wakes up node's successor, if one exists.
     *
     * @param node the node
     */
    private void unparkSuccessor(Node node) 
        /*
         * If status is negative (i.e., possibly needing signal) try
         * to clear in anticipation of signalling.  It is OK if this
         * fails or if status is changed by waiting thread.
         */
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);

        /*
         * Thread to unpark is held in successor, which is normally
         * just the next node.  But if cancelled or apparently null,
         * traverse backwards from tail to find the actual
         * non-cancelled successor.
         */
        Node s = node.next;
        if (s == null || s.waitStatus > 0) 
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        
        if (s != null)
            LockSupport.unpark(s.thread);
    

        从代码里可以找到答案,就是通过循环,拿到同步队列的头节点,然后通过将头节点的状态置为0然后出队列,接着调用unparkSucceccor(h)方法,唤醒下一个节点。

        同步队列节点唤醒的逻辑弄清楚了,那么我们可以继续深究条件队列是怎么转化到同步等待队列里的。

5.条件队列转同步等待队列 

        我们都知道线程有多种状态,一旦线程处于阻塞状态,那么就需要被唤醒,AQS中提供了2个方法doSignal()和doSignalAll()方法, 表示随机唤醒一个阻塞线程、唤醒全部阻塞线程。

    private void doSignal(Node first) 
            do 
                if ( (firstWaiter = first.nextWaiter) == null)
                    lastWaiter = null;
                first.nextWaiter = null;
             while (!transferForSignal(first) &&
                     (first = firstWaiter) != null);
        

        /**
         * Removes and transfers all nodes.
         * @param first (non-null) the first node on condition queue
         */
        private void doSignalAll(Node first) 
            lastWaiter = firstWaiter = null;
            do 
                Node next = first.nextWaiter;
                first.nextWaiter = null;
                transferForSignal(first);
                first = next;
             while (first != null);
        

        上述两个方法都执行了一个同样方法transferForSignal(Node node)方法,该方法的作用是将条件同步队列里的节点出来,然后进入到同步队列里。 

    final boolean transferForSignal(Node node) 
        /*
         * If cannot change waitStatus, the node has been cancelled.
         */
        if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
            return false;

        /*
         * Splice onto queue and try to set waitStatus of predecessor to
         * indicate that thread is (probably) waiting. If cancelled or
         * attempt to set waitStatus fails, wake up to resync (in which
         * case the waitStatus can be transiently and harmlessly wrong).
         */
        Node p = enq(node);
        int ws = p.waitStatus;
        if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
            LockSupport.unpark(node.thread);
        return true;
    

      然后会通过compareAndSetWaitStatus(p, ws, Node.SIGNAL)方法将节点的状态职为SINGAL状态,置好了后,再由LockSupport.unpark(node.thread)将线程真正的唤醒。

小结

        针对上述的问题,小结一下: 

        1.  条件队列conditionObject用于标记一组处于等待的线程,waitStatus=-2时才入此队列。

        2.  条件队列用来存放存于阻塞的线程, 等待被其他线程唤醒,唤醒的过程需要进行条件队列到同步队列的转换。

        3.  条件队列为进入到同步队列通过transferForSingal()方法转换,主要进行了2个操作,一是进入到同步队列,二是将节点的状态变更为SINGAL状态。

        4. 在释放共享锁时,如果节点的状态为SINGAL会将节点的状态置为默认值0,如果同步队列的后续结点需要被唤醒,那么同步队列里的下一个节点唤醒,保证共享资源被完全释放掉。

以上是关于深入理解AbstractQueuedSynchronizer的主要内容,如果未能解决你的问题,请参考以下文章

深入理解Tomcat

深入理解计算机系统的目录1

深入理解Eureka覆盖状态(九)

INN实现深入理解

深入理解Block

YOLO v1深入理解