Condition条件队列核心源码分析,AQS的独占模式的应用

Posted 可持续化发展

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Condition条件队列核心源码分析,AQS的独占模式的应用相关的知识,希望对你有一定的参考价值。

注:图片作者为小刘老师。哔哩哔哩搜索:小刘讲源码。

看前须知:

Condition的await/signal与object的wait/notify 类似。
wait/notify的流程
wait/notify是JVM层面实现的,c++写的。
condition的await(挂起)/signal(唤醒一个挂起的线程)
与object的wait/notify相比,区别在于:
object依赖对象的monitor。调用wait的时候会释放锁,将线程加入wait set中。
调用notify的时候,会唤醒wait set中的一个线程。
一个对象只有一个monitor。

Lock.newCondition方法返回一个condition。这个方法可以多次调用。
即给一个锁Lock创建多个条件队列。
 使用方式:
private Lock lock = new ReentrantLock();
private Condition notFull = lock.newCondition();
private Condition notEmpty = lock.newCondition();

第一种类型的任务:
同一个Lock.lock();获取锁
try{
    业务逻辑
    满足条件?:调用condition.await()挂起当前线程(挂起的时候,会释放锁Lock);
}finally{
    Lock.unlock();
}
第二种类型的任务:
使当前的对象满足某种条件,唤醒被挂起的线程
Lock.lock();
try{
    满足条件的一些逻辑
    满足条件后,同一个condition.signal/signalAll
}finally{
    Lock.unlock(); 
}

当前线程持有独占锁后,再去调用await。否则是错误的写法。
当线程持有独占锁后,再去signal。否则会抛出异常。

条件队列中的node节点的waitStatus 的可能值有:-2、1、0

条件队列中的节点状态:
-2,-2,-2
迁移到阻塞队列时,-2改为0。signal方法会做迁移操作。先做出队操作,再改状态,再入阻塞队列。
当调用await,执行完全释放锁的方法时,会检查当前线程是否为独占锁持有者。如果不是,即调用await的时候未获取锁,将node的状态改为取消状态,1。后继节点入条件队列的时候,会把所有取消状态的节点出队。
/**AQS的内部类Node 主要关注这几个属性*/
static final class Node {
    static final int CONDITION = -2;
    volatile int waitStatus;
    volatile Thread thread;
    Node nextWaiter;
}
public class ReentrantLock implements Lock, java.io.Serializable {
    /**ReentrantLock.newCondition -> sync.newCondition -> AQS的内部类ConditionObject的构造方法 */
    public Condition newCondition() {
        return sync.newCondition();
    }
}
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {
    /** */
    public class ConditionObject implements Condition, java.io.Serializable {
        //指向条件队列的第一个node节点
        private transient Node firstWaiter;
        //指向条件队列的最后一个node节点
        private transient Node lastWaiter;

        public ConditionObject() { }


        public final void await() throws InterruptedException {
            //判断当前线程是否是中断状态,如果是则直接给个中断异常了..
            if (Thread.interrupted())
                throw new InterruptedException();
            //将调用await方法的线程包装成为node并且加入到条件队列中,并返回当前线程的node。
            Node node = addConditionWaiter();
            //完全释放掉当前线程对应的锁(将state置为0)
            //为什么要释放锁呢?  加着锁 挂起后,谁还能救你呢?
            int savedState = fullyRelease(node);
            //0 在condition队列挂起期间未接收过中断信号
            //-1 在condition队列挂起期间接收到中断信号了
            //1 在condition队列挂起期间为未接收到中断信号,但是迁移到“阻塞队列”之后 接收过中断信号。
            int interruptMode = 0;
            //SyncQueue是指阻塞队列
            //isOnSyncQueue 返回true 表示当前线程对应的node已经迁移到 “阻塞队列” 了
            //返回false 说明当前node仍然还在 条件队列中,需要继续park!
            while (!isOnSyncQueue(node)) {
                //挂起当前node对应的线程。  接下来去看signal过程...
                LockSupport.park(this);
                //什么时候会被唤醒?都有几种情况呢?
                //1.常规路径:外部线程获取到lock之后,调用了 signal()方法 转移条件队列的头节点到 阻塞队列, 当这个节点获取到锁后,会唤醒。
                //2.转移至阻塞队列后,发现阻塞队列中的前驱节点状态 是 取消状态,此时会唤醒当前节点
                //3.当前节点挂起期间,被外部线程使用中断唤醒..
                //checkInterruptWhileWaiting :就算在condition队列挂起期间 线程发生中断了,
                // 对应的node也会被迁移到 “阻塞队列”。
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
            }
            //执行到这里,就说明 当前node已经迁移到 “阻塞队列”了

            //acquireQueued :竞争队列的逻辑..
            //条件一:返回true 表示在阻塞队列中 被外部线程中断唤醒过..
            //条件二:interruptMode != THROW_IE 成立,说明当前node在条件队列内 未发生过中断
            //设置interruptMode = REINTERRUPT
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;
            //考虑下 node.nextWaiter != null 条件什么时候成立呢?
            //其实是node在条件队列内时 如果被外部线程 中断唤醒时,会加入到阻塞队列,但是并未设置nextWaiter = null。
            if (node.nextWaiter != null) // clean up if cancelled
                //清理条件队列内取消状态的节点..会将条件队列中的取消状态的节点全部出队
                unlinkCancelledWaiters();

            //条件成立:说明挂起期间 发生过中断(1.条件队列内的挂起 2.条件队列之外的挂起)
            if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode);
        }
        /** 调用await方法的线程 都是 持锁状态的,也就是说 addConditionWaiter 这里不存在并发!*/
        private Node addConditionWaiter() {
            1.如果条件队列中有取消状态的节点,就会清理掉条件队列中所有取消状态的节点 unlinkCancelledWaiters();
            2.为当前线程生成Node节点。将node节点入队。
            Node node = new Node(Thread.currentThread(), Node.CONDITION);
            3.更新队头和队尾指针。firstWaiter、lastWaiter
             //返回当前线程的node
             return node;
        }
        /** AQS的方法*/
        final int fullyRelease(Node node) {
            //完全释放锁是否成功,
            成功的情况:
            1.调用release(savedState)方法去释放锁
            2.如果完全释放锁成功了,会返回调用await方法前,当前线程入锁的层数,AQS.state。
            因为ReentrantLock是可重入锁.signal之后,当前node会被迁移到阻塞队列中。在阻塞队列中被唤醒后,
            AQS.state等于0,需要重新加锁,让state变为调用await方法前的值。
            acquireQueued(node, savedState)

            失败的情况:
            当前线程未持有锁时,调用了await方法。
            因为调用fullyRelease之前,线程已经入队了。但入队的时候,没有判断线程是否持有锁。
            1.在finally代码块中 会将刚刚加入到 条件队列的 当前线程对应的node状态 修改为 取消状态
            2.后继线程就会将 取消状态的 节点 给清理出去了..
        }
        private int checkInterruptWhileWaiting(Node node) {
            //Thread.interrupted() 返回当前线程中断标记位,并且重置当前标记位 为 false 。
            return Thread.interrupted() ?
                    //transferAfterCancelledWait 这个方法只有在线程是被中断唤醒时 才会调用!
                (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
                0;
        }

        final boolean transferAfterCancelledWait(Node node) {
            //条件成立:说明当前node一定是在 条件队列内,因为signal 迁移节点到阻塞队列时,
            //会将节点的状态修改为0
            if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
                //中断唤醒的node也会被加入到 阻塞队列中!!
                enq(node);
                //true:表示是在条件队列内被中断的.
                return true;
            }    
            //执行到这里有几种情况?
            //1.当前node已经被外部线程调用 signal 方法将其迁移到 阻塞队列内了。
            //2.当前node正在被外部线程调用 signal 方法将其迁移至 阻塞队列中 进行中状态..
            while (!isOnSyncQueue(node))
                Thread.yield();
    
            //false:表示当前节点被中断唤醒时 不在 条件队列了..
            return false;
        }

        private void reportInterruptAfterWait(int interruptMode)
            throws InterruptedException {
            //条件成立:说明在条件队列内发生过中断,此时await方法抛出中断异常
            if (interruptMode == THROW_IE)
                throw new InterruptedException();

            //条件成立:说明在条件队列外发生的中断,此时设置当前线程的中断标记位 为true
            //中断处理 交给 你的业务处理。 如果你不处理,那什么事 也不会发生了...
            else if (interruptMode == REINTERRUPT)
                selfInterrupt();
        }

        public final void signal() {
            //判断调用signal方法的线程是否是独占锁持有线程,如果不是,直接抛出异常..
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();

            //获取条件队列的一个node
            Node first = firstWaiter;
            //第一个节点不为null,则将第一个节点 迁移到 阻塞队列
            if (first != null)
                doSignal(first);
        }

        /**
         * 迁移队头节点
         * 帮助队头节点出队
         */
        private void doSignal(Node first) {
            do {
                //firstWaiter = first.nextWaiter 因为当前first马上要出条件队列了,
                //所以更新firstWaiter为 当前节点的下一个节点..
                //如果当前节点的下一个节点 是 null,说明条件队列只有当前一个节点了...当前出队后,整个队列就空了..
                //所以需要更新lastWaiter = null
                if ( (firstWaiter = first.nextWaiter) == null)
                    lastWaiter = null;
                //当前first节点 出 条件队列。断开和下一个节点的关系.
                first.nextWaiter = null;
                //transferForSignal(first)
                //boolean:true 当前first节点迁移到阻塞队列成功  false 迁移失败...
                //while循环 :(first = firstWaiter) != null  当前first迁移失败,则将first更新为 first.next 继续尝试迁移..
                //直至迁移某个节点成功,或者 条件队列为null为止。signal必须成功迁移一个节点。
            } while (!transferForSignal(first) &&
                     (first = firstWaiter) != null);
        }
        /**AQS的方法 */
        final boolean transferForSignal(Node node) {
            //cas修改当前节点的状态,修改为0,因为当前节点马上要迁移到 阻塞队列了
            //成功:当前节点在条件队列中状态正常。
            //失败:1.取消状态 (线程await时 未持有锁,最终线程对应的node会设置为 取消状态)
            //     2.node对应的线程 挂起期间,被其它线程使用 中断信号 唤醒过...(就会主动进入到 阻塞队列,这时也会修改状态为0)
            if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
                return false;

            //enq最终会将当前 node 入队到 阻塞队列,p 是当前节点在阻塞队列的 前驱节点.
            Node p = enq(node);

            //ws 前驱节点的状态..
            int ws = p.waitStatus;
            //条件一:ws > 0 成立:说明前驱节点的状态在阻塞队列中是 取消状态,唤醒当前节点。
            //条件二:前置条件(ws <= 0),
            //compareAndSetWaitStatus(p, ws, Node.SIGNAL) 返回true 表示设置前驱节点状态为 SIGNAl状态成功
            //compareAndSetWaitStatus(p, ws, Node.SIGNAL) 返回false  ===> 什么时候会false?
            //当前驱node对应的线程 是 lockInterrupt入队的node时,是会响应中断的,外部线程给前驱线程中断信号之后,前驱node会将
            //状态修改为 取消状态,并且执行 出队逻辑..
            //前驱节点状态 只要不是 0 或者 -1 那么,就唤醒当前node封装的线程。
            if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
                //唤醒当前node对应的线程...回头再说。
                LockSupport.unpark(node.thread);

            return true;
        }

    }

}



以上是关于Condition条件队列核心源码分析,AQS的独占模式的应用的主要内容,如果未能解决你的问题,请参考以下文章

AQS源码探究_05 Conditon条件队列(手写一个入门的BrokingQueue)

多线程(十一AQS原理-ReentrantLock的条件队列Condition)

AQS源码剖析第二篇--公平与非公平,条件队列和线程中断

AQS源码探究_06 Conditon条件队列(await方法线程入队与挂起signal方法)

AQS源码探究_06 Conditon条件队列(await方法线程入队与挂起signal方法)

面试 LockSupport.park()会释放锁资源吗?