Condition用例源码分析详解(上)

Posted 47gamer

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Condition用例源码分析详解(上)相关的知识,希望对你有一定的参考价值。

在前面学习 synchronized 的时候,有讲到 wait/notify 的基本使用,结合 synchronized 可以实现对线程的通信。那么这个时候我就在思考了,既然 J.U.C 里面提供了锁的实现机制,那 J.U.C 里面有没有提供类似的线程通信的工具呢?
于是找阿找,发现了一个 Condition 工具类。Condition 是一个多线程协调通信的工具类,可以让某些线程一起等待某个条件(condition),只有满足条件时,线程才会被唤醒。
Condition 的基本使用
ConditionWait如下操作:
public class ConditionDemoWait implements Runnable {
private Lock lock;
private Condition condition;

public ConditionDemoWait(Lock lock, Condition condition) {
this.lock = lock;
this.condition = condition;
}

@Override
public void run() {
System.out.println("begin - ConditionDemoWait");
try {
lock.lock();
condition.await();
System.out.println("end - ConditionDemoWait");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}

ConditionSignal如下操作:
public class ConditionDemoSignal implements Runnable {
private Lock lock;
private Condition condition;

public ConditionDemoSignal(Lock lock,Condition condition) {
this.lock = lock;
this.condition = condition;
}

@Override
public void run() {
System.out.println("begin - ConditionDemoSignal");
try {
lock.lock();
condition.signal();
System.out.println("end - ConditionDemoSignal");
} finally {
lock.unlock();
}
}
}

通过这个案例简单实现了 wait 和 notify 的功能,当调用await 方法后,当前线程会释放锁并等待,而其他线程调用condition 对象的 signal 或者 signalall 方法通知并被阻塞的线程,然后自己执行 unlock 释放锁,被唤醒的线程获得之前的锁继续执行,最后释放锁。
所以,condition 中两个最重要的方法,一个是 await,一个是 signal 方法
await:把当前线程阻塞挂起 
signal:唤醒阻塞的线程
 
Condition 源码分析
调用 Condition,需要获得 Lock 锁,所以意味着会存在一个 AQS 同步队列,在上面那个案例中,假如两个线程同时运行的话,那么 AQS 的队列可能是下面这种情况:
技术图片
那么这个时候 ThreadA 调用了 condition.await 方法,它做了什么事情呢?
 
condition.await 源码分析如下:
调用 Condition 的 await()方法(或者以 await 开头的方法),会使当前线程进入等待队列并释放锁,同时线程状态变为等待状态。当从 await()方法返回时,当前线程一定获取了Condition 相关联的锁。
 
public final void await() throws InterruptedException {

if (Thread.interrupted()) throw new InterruptedException(); //表示 await 允许被中断
Node node = addConditionWaiter(); //创建一个新的节点,节点状态为 condition,采用的数据结构仍然是链表
int savedState = fullyRelease(node); //释放当前的锁,得到锁的状态,并唤醒 AQS 队列中的一个线程
int interruptMode = 0;
//如果当前节点没有在同步队列上,即还没有被 signal,则将当前线程阻塞
while (!isOnSyncQueue(node)) {//判断这个节点是否在 AQS 队列上,第一次判断的是 false,因为前面已经释放锁了
LockSupport.park(this); //通过 park 挂起当前线程
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break;
}
// 当这个线程醒来,会尝试拿锁, 当 acquireQueued 返回 false 就是拿到锁了.
// interruptMode != THROW_IE -> 表示这个线程没有成功将 node 入队,但 signal 执行了 enq 方法让其入队了.
// 将这个变量设置成 REINTERRUPT.
if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT;
// 如果 node 的下一个等待者不是 null, 则进行清理,清理 Condition 队列上的节点.
// 如果是 null ,就没有什么好清理的了.
if (node.nextWaiter != null) unlinkCancelledWaiters(); // clean up if cancelled
// 如果线程被中断了,需要抛出异常.或者什么都不做
if (interruptMode != 0) reportInterruptAfterWait(interruptMode);

}

addConditionWaiter源码分析如下:
这个方法的主要作用是把当前线程封装成 Node,添加到等待队列。这里的队列不再是双向链表,而是单向链表。

private Node addConditionWaiter() {
  Node t = lastWaiter;
  // 如 果 lastWaiter 不 等 于 空 并 且waitStatus 不等于 CONDITION 时,把冲好这个节点从链表中移除
  if (t != null && t.waitStatus != Node.CONDITION) {
    unlinkCancelledWaiters();
    t = lastWaiter;
  }
  //构建一个 Node,waitStatus=CONDITION。这里的链表是一个单向的,所以相比 AQS 来说会简单很多
  Node node = new Node(Thread.currentThread(), Node.CONDITION);
  if (t == null){
    firstWaiter = node;
  } else {
    t.nextWaiter = node;
    lastWaiter = node;
  }
  return node;
}
图解分析
执行完 addConditionWaiter 这个方法之后,就会产生一个这样的 condition 队列
技术图片
fullyRelease
fullRelease,就是彻底的释放锁,什么叫彻底呢,就是如果当前锁存在多次重入,那么在这个方法中只需要释放一次就会把所有的重入次数归零。

final int fullyRelease(Node node) {
  boolean failed = true;
  try {
    int savedState = getState();
    // 获得重入的次数
    if (release(savedState)) {// 释放锁并且唤醒下一个同步队列中的线程
      failed = false;
      return savedState;
    } else {
  throw new IllegalMonitorStateException();
    }
  } finally {
    if (failed)
    node.waitStatus = Node.CANCELLED;
  }
}

图解分析
此时,同步队列会触发锁的释放和重新竞争。ThreadB 获得了锁。
技术图片

 

isOnSyncQueue
判断当前节点是否在同步队列中,返回 false 表示不在,返回 true 表示在如果不在 AQS 同步队列,说明当前节点没有唤醒去争抢同步锁,所以需要把当前线程阻塞起来,直到其他的线程调用 signal 唤醒如果在 AQS 同步队列,意味着它需要去竞争同步锁去获得执行程序执行权限为什么要做这个判断呢?原因是在 condition 队列中的节点会重新加入到 AQS 队列去竞争锁。也就是当调用 signal的时候,会把当前节点从 condition 队列转移到 AQS 队列。
? 大家思考一下,基于现在的逻辑结构。如何去判断ThreadA 这个节点是否存在于 AQS 队列中呢?
1. 如果 ThreadA 的 waitStatus 的状态为 CONDITION,说明它存在于 condition 队列中,不在 AQS 队列。因为AQS 队列的状态一定不可能有 CONDITION
2. 如果 node.prev 为空,说明也不存在于 AQS 队列,原因是 prev=null 在 AQS 队列中只有一种可能性,就是它是head 节点,head 节点意味着它是获得锁的节点。
3. 如果 node.next 不等于空,说明一定存在于 AQS 队列中,因为只有 AQS 队列才会存在 next 和 prev 的关系
4. findNodeFromTail,表示从 tail 节点往前扫描 AQS 队列,一旦发现 AQS 队列的节点和当前节点相等,说明节点一定存在于 AQS 队列中

final boolean isOnSyncQueue(Node node) {
  if (node.waitStatus == Node.CONDITION || node.prev == null)
    return false;
  if (node.next != null) // 如果成功,必在队列里
    return true;
  return findNodeFromTail(node);
}

 到此Condition的await方法讲解结束,下一篇讲解signal方法。

以上是关于Condition用例源码分析详解(上)的主要内容,如果未能解决你的问题,请参考以下文章

详解AQS中的condition源码原理

Python Unittest - 根据不同测试环境跳过用例详解

Condition源码分析

Condition图解与源码分析

javajava 并发编程 Condition 源码分析

[源码分析]ReentrantLock & AbstractQueuedSynchronizer & Condition