java并发之ReentrantLock.Condition实现原理

Posted Leo Han

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了java并发之ReentrantLock.Condition实现原理相关的知识,希望对你有一定的参考价值。

一般我们在实际开发过程中经常会遇到一种情况需要在满足一个条件才能继续,比如我们使用生产-消费模型的时候,消费服务必须有数据消费,如果没数据则等待,当生产线程产生数据的时候,唤醒消费线程。示例代码如下:


ReentrantLock lock = new ReentrantLock();
        Condition condition = lock.newCondition();
        new Thread(()->
            

                try
                    lock.lock();
                    // do something A1
                   condition.await();
                   // do something A2
                catch(Exception e)

                
                finally
                    lock.unlock();
                
            
        ).start();

        new Thread(()->
        
            try
                lock.lock();
                // do something A1
                condition.signal();
                // do something A2
            catch(Exception e)

            
            finally
                lock.unlock();
            
        
        ).start();

这里就用到了并发包中Condition作为条件队里使用,我们看下其底层实现逻辑,当我们调用lock.newCondition()的时候,

public Condition newCondition() 
        return sync.newCondition();
    
final ConditionObject newCondition() 
            return new ConditionObject();
        

ConditionObject则是在AQS中定义。

public class ConditionObject implements Condition, java.io.Serializable 
        private static final long serialVersionUID = 1173984872572414699L;
        /** First node of condition queue. */
        private transient Node firstWaiter;
        /** Last node of condition queue. */
        private transient Node lastWaiter;

可以看到,这里ConditionObject中内部维护了一个链表,复用了AQS中Node数据类型。

而我们调用await的时候,实现如下:


public final void await() throws InterruptedException 
            if (Thread.interrupted())
                throw new InterruptedException();
            // 新增条件Node添加到条件队里尾部,这里会将Node节点的waitStatus设置为CONDITION    
            Node node = addConditionWaiter();
            // 这一步很重要,我们在调用await的时候,当前线程肯定是获取到锁了,而基于前面的分析,获取到锁的时候,
            // AQS中state变量会改变,这里会首先获取当前线程获取到的锁的资源,也就是state变量,
            // 这样后面在重新获取锁之后,需要将当前线程锁资源恢复到await之前的水准。
            // fullyRelease 的逻辑是:会将state设置为0,也就是释放了当前线程的所有锁资源,这里会调用AQS的release方法,
            // 在release的时候,如果头结点是一个从条件队列结点过来的,则会唤醒这个节点的线程
            int savedState = fullyRelease(node);
            int interruptMode = 0;
            // 节点是否在同步队列上,不在同步队列上在阻塞,如果还在同步队列上不能够上锁,
            // 这里有可能在同步队列上,一种是刚放到条件队列立马被其他线程signal转义到同步队列上,
            // 还有一种是之前在条件队列休眠被唤醒加入到同步队列中去
            while (!isOnSyncQueue(node)) 
            	// 如果不在同步对垒上,通过LockSupport.park让当前线程休眠
                LockSupport.park(this);
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
            
            // 这时候当前线程已经被signal唤醒,并且将当前Node从条件队列转移到同步队列上,开始重新请求获取锁
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;
            if (node.nextWaiter != null) // clean up if cancelled
                unlinkCancelledWaiters();
            if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode);
        

final int fullyRelease(Node node) 
        boolean failed = true;
        try 
            int savedState = getState();
            if (release(savedState)) 
                failed = false;
                return savedState;
             else 
                throw new IllegalMonitorStateException();
            
         finally 
            if (failed)
                node.waitStatus = Node.CANCELLED;
        
    

public final boolean release(int arg) 
        if (tryRelease(arg)) 
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        
        return false;
    

从上面可以看到,await主要逻辑如下:

  1. 首先通过addConditionWaiter方法,将当前线程加入到条件队列的尾节点
  2. 将当前线程持有的所资源释放,同时唤醒同步队列中头结点等待线程
  3. 如果当前节点不在同步队列上,那么会将当前线程阻塞等待
  4. 当前线程被唤醒(这时候当前节点将从条件队列转移到同步队列上),通过acquireQueued去获取锁,这个逻辑与锁的请求一样

接下来我们看下signal的逻辑:

public final void signal() 
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            Node first = firstWaiter;
            if (first != null)
                doSignal(first);
        
private void doSignal(Node first) 
            do 
                if ( (firstWaiter = first.nextWaiter) == null)
                    lastWaiter = null;
                first.nextWaiter = null;
             while (!transferForSignal(first) &&
                     (first = firstWaiter) != null);
        
final boolean transferForSignal(Node node) 
        if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
            return false;
	    // 将条件队列上的节点转移到同步队列上
        Node p = enq(node);
        // enq返回是同步队列中加入当前节点后,当前节点的前一个节点
        int ws = p.waitStatus;
        // 如果 ws > 0 表名同步队列中当前节点的前一个节点被取消了,或者compareAndSetWaitStatus失败,那么唤醒当前节点的线程
        if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
            LockSupport.unpark(node.thread);
        return true;
    

从这里可以看到,signal中的主要逻辑是将条件队列中的状态正常的节点转移到同步队列中去,具体步骤如下:

  1. 通过compareAndSetWaitStatus(node, Node.CONDITION, 0)判断操作节点是否正常,通常情况下同步队列中的节点状态在初始化之后(为CONDITION)除非取消了等待,不然不会发生改变,这里通过cas操作,如果操作失败了,则当前节点的线程取消了等待,忽略该节点,继续向链表后面找到可用节点
  2. 通过enq将当前节点从条件队列转移到同步队列上
  3. enq返回的是链表中当前节点的前一个节点,和锁的逻辑类似,如果前一个节点的waitStatus >0或者将前一个节点的waitStatus设置为SIGNAL失败,则表名前一个节点有问题,直接唤醒当前节点线程
  4. 操作结束

这里我们需要注意的一点是signal方法只是将条件队列中的节点转移到了同步队列中,这时候调用signal的线程并没有锁资源,必须等待调用signal线程释放资源,后续await线程才能继续获取资源执行。
lock.newCondition()每次返回的都是一个新的Condition,而awaitsignal必须在同一个Condition上,每个Condition里面都会单独维护一个自己的条件队列,因此,可以说ReentrantLock中同步队列只有一个,但是条件队列可能会有多个。

另外通过前面ReentrantLock和这里的Condition分析来看,在实现上很多功能进行了复用,通过同步队列和条件队列,将锁和条件等待分开。

另外,通过节点的waitStatus来判断节点的线程状态。

以上是关于java并发之ReentrantLock.Condition实现原理的主要内容,如果未能解决你的问题,请参考以下文章

Java并发编程:并发容器之ConcurrentHashMap(转载)

Java并发编程:并发容器之ConcurrentHashMap

Java并发编程:并发容器之CopyOnWriteArrayList

Java并发--并发容器之ConcurrentHashMap

java并发之CopyOnWriteArraySet

Java并发之CountDownLatch的使用