jdk1.8 J.U.C并发源码阅读------AQS之conditionObject内部类分析

Posted Itzel_yuki

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了jdk1.8 J.U.C并发源码阅读------AQS之conditionObject内部类分析相关的知识,希望对你有一定的参考价值。

一、继承关系

 public class ConditionObject implements Condition, java.io.Serializable 
实现了Condition接口和Serializable接口,是AbstractQueuedSynchronizer类的一个内部类。

二、成员变量

/** First node of condition queue. */
private transient Node firstWaiter;//Condition队列的头指针
/** Last node of condition queue. */
private transient Node lastWaiter;//Condition队列的尾指针

Condition队列的节点类型是AQS中另一个内部类Node类型的。

Node源码如下:

static final class Node 
       //标志Node的状态:独占状态。
        static final Node SHARED = new Node();
        //共享状态
        static final Node EXCLUSIVE = null;

        //因为超时或者中断,node会被设置成取消状态,被取消的节点时不会参与到竞争中的,会一直保持取消状态不会转变为其他状态;
		//CLH队列中使用
        static final int CANCELLED =  1;
       //该节点的后继节点被阻塞,当前节点释放锁或者取消的时候(cancelAcquire)需要唤醒后继者。
        //CLH队列中使用
		static final int SIGNAL    = -1;
       //CONDITION队列中的状态,CLH队列中节点没有该状态,当将一个node从CONDITION队列中transfer到CLH队列中时,状态由CONDITION转换成0
	   //CLH队列不使用,CONDITION队列中使用
        static final int CONDITION = -2;
        //该状态表示下一次节点如果是Shared的,则无条件获取锁。
		//CLH队列中使用
        static final int PROPAGATE = -3;

        
		 //当一个新的node在CLH队列中被创建时初始化为0,在CONDITION队列中创建时被初始化为CONDITION状态
        volatile int waitStatus;

        //队列中的前驱节点
		//CONDITION队列中使用
        volatile Node prev;

		 //CLH队列中使用,指向下一个节点的引用
        volatile Node next;

        //当前线程
        volatile Thread thread;

		//CONDITION队列中指向下一个node的指针,CLH队列中不使用
        Node nextWaiter;

        
        final boolean isShared() 
            return nextWaiter == SHARED;
        

       //返回前驱节点
        final Node predecessor() throws NullPointerException 
            Node p = prev;
            if (p == null)
                throw new NullPointerException();
            else
                return p;
        

        Node()     // Used to establish initial head or SHARED marker
        

        Node(Thread thread, Node mode)      // Used by addWaiter
            this.nextWaiter = mode;
            this.thread = thread;
        

        Node(Thread thread, int waitStatus)  // Used by Condition
            this.waitStatus = waitStatus;
            this.thread = thread;
        
    
/** Mode meaning to reinterrupt on exit from wait */
private static final int REINTERRUPT =  1;
/** Mode meaning to throw InterruptedException on exit from wait */
private static final int THROW_IE    = -1;


三、方法分析

(1)await():实现考虑中断的等待,若thread中断,则抛出异常。


总结:

        await只能在当前线程获取了锁之后调用。因此CLH队列和CONDITION队列的情况为:当前处于CLH队列队首的节点调用await方法,新new一个node,添加到CONDITION队列队尾,然后在CLH队列队首释放当前线程占有的锁,唤醒后继节点。当前线程以新node的形式在CONDITION队列中park,等待被唤醒。


具体过程:
step1:将该线程封装成node,新节点的状态为CONDITION,添加到队列尾部
step2:尝试释放当前线程占有的锁,释放成功,则调用unparkSuccessor方法唤醒该节点在CLH队列中的后继节点。
step3:在while循环中调用isOnSyncQueue方法检测node是否再次transfer到CLH队列中(其他线程调用signal或signalAll时,该线程可能从CONDITION队列中transfer到CLH队列中),如果没有,则park当前线程,等待唤醒,同时调用checkInterruptWhileWaiting检测当前线程在等待过程中是否发生中断,设置interruptMode的值来标志中断状态。如果检测到当前线程已经处于CLH队列中了,则跳出while循环。
step4:调用acquireQueued阻塞方法来在CLH队列中获取锁。
step5:检查interruptMode的状态,在最后调用reportInterruptAfterWait统一抛出异常或发生中断。

基本流程:首先将node加入condition队列,然后释放锁,挂起当前线程等待唤醒,唤醒后重新在CLH队列中调用acquireQueued获取锁。(实现Object.wait方法的功能)


注意:

step1和step2的顺序不能颠倒,否则CONDITION队列尾部添加新节点可能需要考虑并发的情况。首先将该节点添加到队列尾部,然后在CLH队列首节点(当前线程拥有锁,一定在CLH队列的队首)释放锁并唤醒后继节点。
CONDITION队列节点状态分析:CONDITION状态和CANCELLED状态,新new的一个节点状态为CONDITION,当该节点在CLH队列队首执行fullyRelease时,释放锁失败,则该node在CONDITION队列中的状态会变成CANCELLED。(还可能为0,当超时将node从CONDITION队列中加到CLH队列中,该节点在CONDITION队列中的nextWaiter连接没有取消,此时该节点状态为0,不过后面调用unlinkCancelledWaiters将清除CONDITION队列中所有非CONDITION状态的节点)
await系列方法和signal系列都是在当前线程占有锁时才能正确执行,否则会抛出异常。


源码分析

public final void await() throws InterruptedException 
            if (Thread.interrupted())
                throw new InterruptedException();
			//将该线程添加到CONDITION队列中
            Node node = addConditionWaiter();
			//该节点加入condition队列中等待,await则需要释放掉当前线程占有的锁
            int savedState = fullyRelease(node);
            int interruptMode = 0;
			//判断该节点是否在CLH队列中
            while (!isOnSyncQueue(node)) 
			//不在,则阻塞该节点
                LockSupport.park(this);
				//在阻塞的过程中发生中断
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
            
			//出了while循环,代表线程被唤醒,并且已经将该node从CONDITION队列transfer到了CLH队列中
			//acquireQueued在队列中获取锁,会阻塞当前线程,并且在上面while循环等待的过程中没有发生异常,则修改interruptMode状态为REINTERRUPT
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;
			//该节点调用transferAfterCancelledWait添加到CLH队列中的,此时该节点的nextWaiter不为null,需要调用unlinkCancelledWaiters将该节点从CONDITION队列中删除,该节点的状态为0
            if (node.nextWaiter != null) // clean up if cancelled
                unlinkCancelledWaiters();
				//如果interruptMode不为0,则代表该线程在上面过程中发生了中断或者抛出了异常,则调用reportInterruptAfterWait方法在此处抛出异常
            if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode);
        
	//首先检查尾节点是否为cancelled状态的节点,如果是则调用unlinkCancelledWaiters删除CONDITION队列中所有cancelled状态的节点,不是,则直接将该新创建的节点添加到CONDITION队列的末尾。
	private Node addConditionWaiter() 
			//尾指针
            Node t = lastWaiter;
            //如果尾节点状态是cancelled,则调用unlinkCancelledWaiters方法删除CONDITION链表中所有cancelled状态的节点
            if (t != null && t.waitStatus != Node.CONDITION) 
                unlinkCancelledWaiters();
				//t为新的尾节点
                t = lastWaiter;
            
			//创建一个node节点,状态为CONDITION
            Node node = new Node(Thread.currentThread(), Node.CONDITION);
			//添加到队尾
            if (t == null)
                firstWaiter = node;
            else
                t.nextWaiter = node;
            lastWaiter = node;
            return node;
        
	//遍历一次CONDITION链表,删除状态为CANCELLED的节点。
	private void unlinkCancelledWaiters() 
			//首节点
            Node t = firstWaiter;
            Node trail = null;
            while (t != null) 
				//下一个节点
                Node next = t.nextWaiter;
				//如果t的状态是cancelled的,则需要删除t
                if (t.waitStatus != Node.CONDITION) 
				//清除t的nextWaiter连接
                    t.nextWaiter = null;
					//删除的是首节点
                    if (trail == null)
                        firstWaiter = next;
                    else
					//直接将前一个节点的连接指向该节点的下一个节点
                        trail.nextWaiter = next;
					//设置新的尾节点
                    if (next == null)
                        lastWaiter = trail;
                
				//状态为CONDITION的节点不需要清除
                else
                    trail = t;
                t = next;
            
        
	//完全释放锁,释放成功则返回,失败则将当前节点的状态设置成cancelled表示当前节点失效
	final int fullyRelease(Node node) 
        boolean failed = true;
        try 
		//获取当前锁重入的次数
            int savedState = getState();
			//释放锁
            if (release(savedState)) 
			//释放成功
                failed = false;
                return savedState;
             else 
                throw new IllegalMonitorStateException();
            
         finally 
		//释放锁失败,则当前节点的状态变为cancelled(此时该节点在CONDITION队列中)
            if (failed)
                node.waitStatus = Node.CANCELLED;
        
    
	//尝试释放锁,释放成功则调用unparkSuccessor唤醒后继节点
	public final boolean release(int arg) 
	//调用tryRelease释放锁。
        if (tryRelease(arg)) 
		//释放成功,则查看head节点状态,如果不为null且状态不为0(为0表示没有后继或者当前节点已经unparkSuccessor过),则调用unparkSuccessor唤醒后继节点
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        
        return false;
    
	//判断该节点是否在CLH队列中
	final boolean isOnSyncQueue(Node node) 
	//如果该节点的状态为CONDITION(该状态只能在CONDITION队列中出现,CLH队列中不会出现CONDITION状态),或者该节点的prev指针为null,则该节点一定不在CLH队列中
        if (node.waitStatus == Node.CONDITION || node.prev == null)
            return false;
		//如果该节点的next(不是nextWaiter,next指针在CLH队列中指向下一个节点)状态不为null,则该节点一定在CLH队列中
        if (node.next != null) // If has successor, it must be on queue
            return true;
        //否则只能遍历CLH队列(从尾节点开始遍历)查找该节点
        return findNodeFromTail(node);
    
	//从尾节点开始,使用prev指针,遍历整个CLH队列
	private boolean findNodeFromTail(Node node) 
        Node t = tail;
		//从尾节点开始,使用prev指针,开始遍历整个CLH队列
        for (;;) 
		//找到该节点
            if (t == node)
                return true;
			//遍历完成,没有找到该节点
            if (t == null)
                return false;
            t = t.prev;
        
    
	//在等待后发生中断,在此处根据interruptMode统一处理
	 private void reportInterruptAfterWait(int interruptMode)
            throws InterruptedException 
            if (interruptMode == THROW_IE)
                throw new InterruptedException();
            else if (interruptMode == REINTERRUPT)
                selfInterrupt();
        

(2)awaitNanos:如果当前线程发生中断,则抛出异常;超时则强制transfer到CLH队列中(但是在CONDITION队列中的nextWaiter连接并没有取消)


与await的区别:
(1)超时则强制将该节点从CONDITION队列transfer到CLH队列中。
(2)阻塞调用的是LockSupport.parkNanos(this, nanosTimeout),带有时间
(3)每次循环都要更新nanosTimeout,如果超时则发生(1)

public final long awaitNanos(long nanosTimeout)
                throws InterruptedException 
            if (Thread.interrupted())
                throw new InterruptedException();
			//新创建一个CONDITION状态的节点,并加入队列尾部
            Node node = addConditionWaiter();
			//node(此时处于CLH队列队首)释放占有的锁(在CLH队列中出队了),并且唤醒后继节点
            int savedState = fullyRelease(node);
			//根据nanosTimeout,计算deadline
            final long deadline = System.nanoTime() + nanosTimeout;
            int interruptMode = 0;
			//检测当前节点是否处于CLH队列中,没有则park当前线程,等待signal唤醒(从而将node节点从CONDITION队列中transfer到CLH队列中)
            while (!isOnSyncQueue(node)) 
			//如果超时,则调用transferAfterCancelledWait将当前Node强制transfer到CLH队列中
                if (nanosTimeout <= 0L) 
                    transferAfterCancelledWait(node);
                    break;
                
				//nanosTimeout大于spinForTimeoutThreshold,则调用parkNanos等待nanosTimeout时间
                if (nanosTimeout >= spinForTimeoutThreshold)
                    LockSupport.parkNanos(this, nanosTimeout);
				//park的过程中发生中断,则跳出循环
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
				//更新nanosTimeout
                nanosTimeout = deadline - System.nanoTime();
            
			//出了while循环,代表线程被唤醒,并且已经将该node从CONDITION队列transfer到了CLH队列中,或者发生中断
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;
			//该节点调用transferAfterCancelledWait添加到CLH队列中的,此时该节点的nextWaiter不为null,需要调用unlinkCancelledWaiters将该节点从CONDITION队列中删除
            if (node.nextWaiter != null)
                unlinkCancelledWaiters();
			//统一处理上面发生的中断或者异常情况。
            if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode);
			//返回距离超时还剩多长时间
            return deadline - System.nanoTime();
        
	final boolean transferAfterCancelledWait(Node node) 
	//将该节点状态由CONDITION变成0,调用enq将该节点从CONDITION队列添加到CLH队列中(但是在CONDITION队列中的nextWaiter连接并没有取消)
        if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) 
            enq(node);
            return true;
        
        //循环检测该node是否已经成功添加到CLH队列中
        while (!isOnSyncQueue(node))
            Thread.yield();
        return false;
    

(3)awaitUntil:在deadline时间之前没有被唤醒,则强制transfer到CLH队列中(但是在CONDITION队列中的nextWaiter连接并没有取消),发生中断则抛出异常

public final boolean awaitUntil(Date deadline)
                throws InterruptedException 
				//获取绝对时间
            long abstime = deadline.getTime();
            if (Thread.interrupted())
                throw new InterruptedException();
			//新new一个node,添加到CLH队列的尾部
            Node node = addConditionWaiter();
			//释放锁
            int savedState = fullyRelease(node);
            boolean timedout = false;
            int interruptMode = 0;
			//循环检测该node是否已经成功添加到CLH队列中
            while (!isOnSyncQueue(node)) 
			//超时,强制transfer到CLH队列中(但是在CONDITION队列中的nextWaiter连接并没有取消)
                if (System.currentTimeMillis() > abstime) 
                    timedout = transferAfterCancelledWait(node);
                    break;
                
				//park到abstime时间
                LockSupport.parkUntil(this, abstime);
				//发生异常,跳出循环
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
            
			//已经在CLH队列中了,或者抛出了异常
			//调用acquireQueued(同步阻塞方法)在CLH队列中获取锁
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;
			//该节点调用transferAfterCancelledWait添加到CLH队列中的,此时该节点的nextWaiter不为null,需要调用unlinkCancelledWaiters将该节点从CONDITION队列中删除
            if (node.nextWaiter != null)
                unlinkCancelledWaiters();
			//统一处理上面发生的中断或者异常情况。
            if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode);
			//返回是否超时
            return !timedout;
        

(4)await(long time, TimeUnit unit):中断则抛出异常;超时强制转换到CLH队列中(在CONDITION队列中的nextWaiter连接并没有取消,此时同时处于CLH队列和CONDITION队列中)

public final boolean await(long time, TimeUnit unit)
                throws InterruptedException 
            long nanosTimeout = unit.toNanos(time);
            if (Thread.interrupted())
                throw new InterruptedException();
			//新new一个node,添加到CONDITION队列末尾
            Node node = addConditionWaiter();
			//释放锁(此节点在CLH队列中拥有锁,此时是CLH队列头结点)并唤醒后继节点。
            int savedState = fullyRelease(node);
			//计算deadline
            final long deadline = System.nanoTime() + nanosTimeout;
            boolean timedout = false;
            int interruptMode = 0;
			//检测当前节点是否处于CLH队列中,没有则park当前线程,等待signal唤醒(从而将node节点从CONDITION队列中transfer到CLH队列中)
            while (!isOnSyncQueue(node)) 
                if (nanosTimeout <= 0L) 
                    timedout = transferAfterCancelledWait(node);
                    break;
                
				//park
                if (nanosTimeout >= spinForTimeoutThreshold)
                    LockSupport.parkNanos(this, nanosTimeout);
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
				//更新nanosTimeout
                nanosTimeout = deadline - System.nanoTime();
            
			//在CLH队列中获取锁,或者发生中断
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;
            if (node.nextWaiter != null)
                unlinkCancelledWaiters();
			//统一处理上面发生的中断或者异常情况。
            if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode);
            return !timedout;
        

(5)awaitUninterruptibly:忽略中断的等待

public final void awaitUninterruptibly() 
	//将该线程封装成node,新节点的状态为CONDITION,添加到队列尾部
            Node node = addConditionWaiter();
			//在CLH队列首部释放占有的锁
            int savedState = fullyRelease(node);
            boolean interrupted = false;
			//循环检测该node是否已经成功添加到CLH队列中
            while (!isOnSyncQueue(node)) 
                LockSupport.park(this);
				//用interrupted保存中断标志,不抛出异常
                if (Thread.interrupted())
                    interrupted = true;
            
			//在CLH队列中获取锁 或者 interrupted发生中断了,则调用selfInterrupt发生中断
			//acquireQueued是一个阻塞方法
            if (acquireQueued(node, savedState) || interrupted)
                selfInterrupt();
        

(6)signal:对CONDITION队列中第一个CONDITION状态的节点(将该节点以及前面的CANCELLED状态的节点从CONDITION队列中出队),将该节点从CONDITION队列中添加到CLH队列末尾,同时需要设置该节点在CLH队列中前驱节点的状态(若前驱节点为cancelled状态或者给前驱节点执行CAS操作失败,则需要调用park操作在此处唤醒该线程,否则就是在CLH队列中设置前驱节点的signal状态成功,则不用在此处唤醒该线程,唤醒工作交给前驱节点,可以少进行一次park和unpark操作)

//唤醒CONDITION队列中首部的第一个CONDITION状态的节点
	public final void signal() 
			//判断锁是否被当前线程独占,如果不是,则当前线程不能signal其他线程
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            Node first = firstWaiter;
			//CONDITION队列不为null,则doSignal方法将唤醒CONDITION队列中所有的节点线程
            if (first != null)
                doSignal(first);
        
	//对CONDITION队列中从首部开始的第一个CONDITION状态的节点,执行transferForSignal操作,将node从CONDITION队列中转换到CLH队列中,同时修改CLH队列中原先尾节点的状态
	private void doSignal(Node first) 
            do 
				//当前循环将first节点从CONDITION队列transfer到CLH队列
				//从CONDITION队列中删除first节点,调用transferForSignal将该节点添加到CLH队列中,成功则跳出循环
                if ( (firstWaiter = first.nextWaiter) == null)
                    lastWaiter = null;
                first.nextWaiter = null;
             while (!transferForSignal(first) &&
                     (first = firstWaiter) != null);
        
	//两步操作,首先enq将该node添加到CLH队列中,其次若CLH队列原先尾节点为CANCELLED或者对原先尾节点CAS设置成SIGNAL失败,则唤醒node节点;否则该节点在CLH队列总前驱节点已经是signal状态了,唤醒工作交给前驱节点(节省了一次park和unpark操作)
	final boolean transferForSignal(Node node) 
        //如果CAS失败,则当前节点的状态为CANCELLED
        if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
            return false;
		//enq将node添加到CLH队列队尾,返回node的prev节点p
        Node p = enq(node);
        int ws = p.waitStatus;
		//如果p是一个取消了的节点,或者对p进行CAS设置失败,则唤醒node节点,让node所在线程进入到acquireQueue方法中,重新进行相关操作
		//否则,由于该节点的前驱节点已经是signal状态了,不用在此处唤醒await中的线程,唤醒工作留给CLH队列中前驱节点
        if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
            LockSupport.unpark(node.thread);
        return true;
    

(7)signalAll:将CONDITION队列中所有node出队,逐个添加到CLH队列末尾,同时修改它们在CLH队列中前驱节点的状态,修改为signal成功,则不用在此处唤醒该节点的线程,唤醒工作交给CLH队列中的前驱节点,否则需要在此处park当前线程。

public final void signalAll() 
	//查看当前线程是否独占锁,若不是,则当前线程没有权限执行signalAll操作,抛出异常
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            Node first = firstWaiter;
			//唤醒CONDITION队列中所有节点,同时transfer到CLH队列中
            if (first != null)
                doSignalAll(first);
        
	private void doSignalAll(Node first) 
            lastWaiter = firstWaiter = null;
            do 
				//将first节点从CONDITION队列中出队
                Node next = first.nextWaiter;
                first.nextWaiter = null;
				//将first节点在CLH队列中入队,同时可能需要执行unpark操作
                transferForSignal(first);
				//更新first的指向
                first = next;
             while (first != null);
        

















以上是关于jdk1.8 J.U.C并发源码阅读------AQS之conditionObject内部类分析的主要内容,如果未能解决你的问题,请参考以下文章

jdk1.8 J.U.C并发源码阅读------ReentrantLock源码解析

jdk1.8 J.U.C并发源码阅读------ReentrantLock源码解析

jdk1.8 J.U.C并发源码阅读------CountDownLatch源码解析

jdk1.8 J.U.C并发源码阅读------CountDownLatch源码解析

jdk1.8 J.U.C并发源码阅读------CyclicBarrier源码解析

jdk1.8 J.U.C并发源码阅读------CyclicBarrier源码解析