同步队列器AQS之condition等待队列的实现分析

Posted yxz1025

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了同步队列器AQS之condition等待队列的实现分析相关的知识,希望对你有一定的参考价值。

 

 

我们知道每个Java对象都对应有一个监视器,此监视器用来实现线程的唤醒、通知等操作,对应Lock来说基于jdk也实现了类似的唤醒及通知的接口Condition接口,这里说明一下上一章节中同步队列器AQS的实现原理使用大量篇幅讲解了同步队列器的原理,而Condition是等待队列,下面通过BoundedBuffer(有界缓存)来分析一下使用场景

/**
*此类创建了两个condition等待队列,并通过put方法和take方法分别存放和获取数据
*在放数据时,判断如果队列已经满了,则当前线程释放锁并加入notFull队列,若队列未满,则通知notEmpty队列唤醒等待队列中的头部节点
*在拿数据时,若队列为空队列,则当前线程释放锁并加入notEmpty队列
 class BoundedBuffer 
    final Lock lock = new ReentrantLock();
    final Condition notFull  = lock.newCondition(); 
    final Condition notEmpty = lock.newCondition(); 
 
    final Object[] items = new Object[100];
    int putptr, takeptr, count;
 
    public void put(Object x) throws InterruptedException 
      lock.lock();
      try 
        while (count == items.length)
          notFull.await();
        items[putptr] = x;
        if (++putptr == items.length) putptr = 0;
        ++count;
        notEmpty.signal();
       finally 
        lock.unlock();
      
    
 
    public Object take() throws InterruptedException 
      lock.lock();
      try 
        while (count == 0)
          notEmpty.await();
        Object x = items[takeptr];
        if (++takeptr == items.length) takeptr = 0;
        --count;
        notFull.signal();
        return x;
       finally 
        lock.unlock();
      
    
  

可以看到创建一个等待队列通过lock.newCondition()方法创建,ConditionObject是AQS的内部类,它也是一个Node队列,提供firstWaiter头部节点和尾部节点lastWaiter,下面重点分析await和signal方法

  • await将当前线程加入到等待队列中,并释放锁(同步状态),使用await方法一定是当前线程为同步队列的head节点并且是获取到锁的线程,所以当前节点在释放完同步状态以后重新以自旋的方式添加的同步队列的尾部等待
        public final void await() throws InterruptedException 
            if (Thread.interrupted())
                throw new InterruptedException();
            //添加一个等待节点在等待队列的尾部
            Node node = addConditionWaiter();
            //将当前线程的锁(同步状态)进行释放
            int savedState = fullyRelease(node);
            int interruptMode = 0;
            //循环判断是否当前线程在同步队列中
            while (!isOnSyncQueue(node)) 
                LockSupport.park(this);
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
            
            //将当前的线程重新添加到同步队列的尾节点进行自旋
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;
            //如果当前节点的后继结点不是Null,则将线程的状态是condition的节点从等待队列中移除
            if (node.nextWaiter != null) // clean up if cancelled
                unlinkCancelledWaiters();
            if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode);
        

addConditionWaiter方法为在等待队列的尾部添加节点,使用Node创建一个等待节点

        private Node addConditionWaiter() 
            Node t = lastWaiter;
            //
            if (t != null && t.waitStatus != Node.CONDITION) 
                unlinkCancelledWaiters();
                t = lastWaiter;
            
            Node node = new Node(Thread.currentThread(), Node.CONDITION);
            if (t == null)
                firstWaiter = node;
            else
                t.nextWaiter = node;
            lastWaiter = node;
            return node;
        

fullyRelease方法为释放当前线程的同步状态,并唤醒下一个同步队列中的后继节点

    final int fullyRelease(Node node) 
        boolean failed = true;
        try 
            int savedState = getState();
            if (release(savedState)) 
                failed = false;
                return savedState;
             else 
                throw new IllegalMonitorStateException();
            
         finally 
            if (failed)
                node.waitStatus = Node.CANCELLED;
        
    
  • signal方法为将等待队列中的等待时间最久的线程唤醒,移除队列并加入到同步队列中,也就是firstWaiter节点
        public final void signal() 
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            Node first = firstWaiter;
            if (first != null)
                doSignal(first);
        

doSignal方法目的是将等待队列中的节点是null或者线程中断的节点移除

        private void doSignal(Node first) 
            do 
                if ( (firstWaiter = first.nextWaiter) == null)
                    lastWaiter = null;
                first.nextWaiter = null;
             while (!transferForSignal(first) &&
                     (first = firstWaiter) != null);
        
transferForSignal方法将等待队列中的节点转移到同步队列中
    final boolean transferForSignal(Node node) 
        /*
         * 如果不能设置这个节点的状态,说明已经被取消了
         */
        if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
            return false;

        /*
         * 创建一个同步节点加入到尾部
         * 如果当前节点为取消状态,尝试唤醒
         */
        Node p = enq(node);
        int ws = p.waitStatus;
        if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
            LockSupport.unpark(node.thread);
        return true;
    

关注“聊点源码”公众号获取更多资讯 

以上是关于同步队列器AQS之condition等待队列的实现分析的主要内容,如果未能解决你的问题,请参考以下文章

队列同步器——AQS

五:抽象队列同步器AQS应用Lock详解

进阶笔录-深入理解Java线程之-AQS

Java并发之AQS同步器学习

AQS

Java并发编程- J.U.C之AQS及其相关组件详解