ReentrantLock以及 Condition深度解析

Posted 小猪快跑22

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了ReentrantLock以及 Condition深度解析相关的知识,希望对你有一定的参考价值。

前言

之前写过关于AQS的文章,也讲了关于 ReetrantLock 的源码分析。这里主要是想结合源码分析下Condition 的 await 和 signal ,写ReentrantLock那么还是得提下 AQS 的。

一、关于AQS 中几个重要的属性

  1. AQS 中维护了一个很重要的变量 state, 它是int型的,表示加锁的状态,初始状态值为0;
  2. 另外 AQS 还维护了一个很重要的变量exclusiveOwnerThread,它表示的是获得锁的线程,也叫独占线程,ReentrantLock中的可重入就是用到该属性,当 state != 0 但是 exclusiveOwnerThread == 当前线程,那么就可以不需要再次走获取锁的逻辑即可重入。
  3. AQS中还有一个用来存储获取锁失败线程的队列,它是一个FIFO的双链表结构,以及 headtail 结点。
  4. 双链表的结点 Node 包含的主要属性如下:

NodeSHARED 和 EXCLUSIVE : 用于标识该结点是共享模式还是独占模式

int : waitStatus : 标识该结点的等待状态,有如下几种值:

CANCELLED:值为1,表示该结点被取消了,比如使用 tryLock(1000, TimeUnit.MILLISECONDS) 获取锁超时就会被取消

SIGNAL: 值为-1,表示此结点的后继结点的线程已经通过LockSupport.park()挂起了,需要前面的结点在释放锁或者取消的时候来唤醒该线程

CONDITION: 值为-2,表示该结点在condition队列中,一般是通过 condition.await() 操作,然后该结点插入 condition 的尾部

PROPAGATE: 值为-3,共享模式的头结点可能处于此状态,表示往下传播。假设有2个线程同时释放锁,通过竞争,其中一条负责唤醒后继结点,而另一条则将头结点设置为此状态,新结点唤醒后,直接根据头结点的此状态来唤醒 下下个结点

注意,当 new Node() 的时候 waitStatus 默认等于0。

二、ReetrantLock 的结构

  1. ReetrantLock 包含公平锁和非公平锁,默认的是非公平锁。
  2. ReetrantLock 包含一个抽象的内部类: Sync,Sync 继承了 AQS,如下:
 abstract static class Sync extends AbstractQueuedSynchronizer
  1. ReetrantLock 中的 公平和非公平锁分别继承自 Sync

三、ReetrantLock 的获取和释放锁的分析

这里就以默认的非公平锁的源码来分析,版本为 jdk1.8

ReentrantLock lock = new ReentrantLock()

try {
   lock.lock()
   ...
} finally {
  lock.unLock()
}

synchronized 不一样,ReentrantLock 是不会主动释放锁的,所以需要在 finally 块中主动调用 unLock 释放。

这里假设有A,B,C,D 4个线程同时调用 lock 获取锁,且每个线程获取锁之后执行的任务都非常耗时。

1. lock.lock()
  1. 会调用 ReentrantLock 的 lock 方法,如下:
    public void lock() {
        sync.lock();
    }

由于默认的是非公平锁,所以 调用的是 NonfairSync 的 lock 方法,如下:

final void lock() {
    // state 的默认值是0,所以第一个线程来获取锁的时候 cas 操作返回true
    // 线程释放锁的时候会把 state 再次设置为 0
    if (compareAndSetState(0, 1)) // 注释【1】
        // 把获取锁的线程设置为独占线程
        setExclusiveOwnerThread(Thread.currentThread());
    else
        acquire(1); // 注释【2】
 }

由于线程 A 是第一个请求锁的,所以通过 CAS 把 AQS 的 state 值设置为1,表示 线程A已经获取了锁, 然后

此时,线程B再来获取锁的时候 走到注释【1】,此时state的值为1,所以cas 操作会失败,接着走到 注释【2】acquire 是 AQS 里面的方法,如下:

   public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

上面的方法包含了3个重要的步骤:

  1. tryAcquire : 再次尝试去获取锁,该方法在是在 ReentrantLock 的内部类 NonfairSync 中。
  2. addWaiter :把当前线程转化为Node结点,且插入等待队列的末尾,如果是第一个插入等待队列的结点,还需要 new 一个空的结点作为 head
    结点。
  3. acquireQueued:这个方法 判断 node 结点的前驱结点是否是 head结点,如果是,那么再次尝试去获取锁,如果获取锁成功,那么设置当前的 node 为head结点;如果获取锁失败,那么会阻塞(挂起)当前线程。

tryAcquire

protected final boolean tryAcquire(int acquires) {
     return nonfairTryAcquire(acquires);
}
       final boolean nonfairTryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            // 表示之前获取锁的线程已经释放了锁
            if (c == 0) {
                if (compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            // 表示之前获取锁的线程未释放锁,然后判断当前线程是否就是之前获取锁的线程
            // 如果是,那么state + 1,这里就是锁的重入了
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }

tryAcquire 获取锁成功则返回 true,否则返回 false 。下面看看addWaiter(Node.EXCLUSIVE), Node.EXCLUSIVE 前面也说过,指的是独占锁,代码如下:

    private Node addWaiter(Node mode) {
        // new Node ,Node 的thread 赋值为当前线程
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        // 第一个获取锁失败的线程走到这里时,head 和 tail 都等于null
        if (pred != null) { 
        // 不等于null,说明等待队列里面有等待的线程了,
        // 下面的操作就是把 node 插入到队列的而尾部然后返回 node 
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        // 走到这里,说明等待队列为空
        enq(node);
        return node;
    }

由于 线程B 是第一个未获取到锁的线程,此时 headtail 都是 null,所以走到 enq(node):

    private Node enq(final Node node) {
        for (;;) {
            Node t = tail; //注释【1】
            if (t == null) { // 注释【2】
                if (compareAndSetHead(new Node())) //注释【3】
                    tail = head; //注释【4】
            } else {
                node.prev = t; //注释【5】
                if (compareAndSetTail(t, node)) { //注释【6】
                    t.next = node; //注释【7】
                    return t; //注释【8】
                }
            }
        }
    }

这里画图来展示吧:
注释【1】:由于此时 tail == null ,所以注释【2】成立;
注释【3】:new 一个空的结点,然后通过 cas 操作把这个空结点设置为 head,此时,head 结点的 waitStatus 值为0(默认值)
注释【4】:让 tail 指向 head,执行完 的效果如下:

此时 head 和 tail 都指向的是一个空结点,里面对应的线程null,且 waitStatus = 0;
for 循环会再次走到 注释【1】,t 赋值为 tail,所以注释【2】条件不成立,走到 注释【5】,即 线程B 的prev指向t也就是指向 tail,如下图:

接着走到注释【6】 通过cas 操作让 tail 指向 线程B对应的 node 结点。
注释【7】的作用就是 让 tnext 指针指向 结点 node,然后返回 node 结点,如下:


到此,addWaiter(Node mode) 分析完成了,下面就说说acquireQueued(final Node node, int arg),代码如下:

   final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor(); // 注释【1】
                if (p == head && tryAcquire(arg)) { // 注释【2】
                    setHead(node); // 注释【3】
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())// 注释【4】
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

注意,这里面也是一个无限循环操作,此时线程B已经在等待队列里面了:

注释【1】:此时 node 结点的前驱结点是 head ,所以执行 tryAcquire,由于线程A执行耗时的操作,暂时不会释放锁,所以 注释【2】的 tryAcquire 条件不满足。走到注释【4】。

注释【4】:包含 shouldParkAfterFailedAcquireparkAndCheckInterrupt 我们逐一分析,先看 shouldParkAfterFailedAcquire,方法如下:

   private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL) // 注释【1】
            /***
            * 表示已经设置为SIGNAL,那么在这个pred结点释放锁的时候
            * 能够唤醒pred的后继结点对应的线程
            */
            return true;
        if (ws > 0) {
            // 表示前驱结点被取消了,那么从pred结点开始往前,
            // 从等待队列中删除所有已经取消的结点
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else { // 注释【2】
            // 通过 cas 把pred结点的waitStatus 设置为 SIGNAL
            // 只有设置为SIGNAL后,才能唤醒后续结点
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }

接着来, 线程B 对应结点的 前驱结点是 head 结点,前面说过了,head 结点的 waitStatus 的值为0,所有会走到 注释【2】,设置 head 的 waitStatus = -1(SIGNAL = -1),返回 false,返回到 acquireQueued 方法中,继续执行 for 循环,由于线程A 依然没有释放锁,还是走到 shouldParkAfterFailedAcquire方法,此时 pred 的waitStatus = SIGNAL,所已返回true,接着执行 acquireQueued 中的 parkAndCheckInterrupt 方法了,如下:

    private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
    }

这个方法很简单,调用LockSupport.park阻塞 线程B
以上,执行完成后,head 的 waitStatus 的值为 -1。如下图:

到此,线程B 的 获取锁的操作执行完毕,被阻塞了,现在轮到 线程C 来请求锁,流程是一样的,还是最终走到这里:

    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

由于线程A未释放锁,所以 线程C 尝试获取锁tryAcquire失败,接着走 addWaiter,此时就是将 线程C 对应的结点插入到 等待队列的尾部,插入后如下图:

然后 走到 acquireQueued,前面分析过过代码,这里就简单说下结果。这里会先判断 线程C结点的前驱结点是不是 head 结点,显然不是,线程C结点的前驱结点是 线程B对应的结点,所以会走到 shouldParkAfterFailedAcquire(pred, node),同样经过2次循环把 线程B对应的结点的waitStatus值设置为1,然后阻塞线程B。如下图所示:

同样,线程D的操作和线程C的操作是一样的,执行完获取锁的流程后如下图:

从上面的分析可知,ReentrantLock 所形成的等待队列非 tail结点的 waitStatus = -1

ReentrantLocklock方法总结:

  1. 当一个线程获取锁后,会将 state 置为1,然后 设置 exclusiveOwnerThread 为当前线程
  2. 如果当前线程再去获取锁的话,会把 state 加1 ,即锁重入
  3. 其他线程再去获取锁,如果锁未被其他线程释放,那么会把此线程对应的结点插入等待队列的末尾,然后判断此线程对应结点的前驱结点是否是 head 结点,如果是 head 结点,那么会尝试获取锁,获取到锁就直接return,未获取到锁,会把前驱结点的waitStatus值设置为SIGNAL(-1),然后阻塞此线程;如果不是head结点,同样的逻辑 ,把前驱结点的waitStatus值设置为SIGNAL(-1),然后阻塞此线程,对应如下的流程图:

以上是关于ReentrantLock以及 Condition深度解析的主要内容,如果未能解决你的问题,请参考以下文章

ReentrantLock及Condition原理解析

ReentrantLock与Condition构造有界缓存队列与数据栈

ReentrantLock 和 Condition的使用

从使用角度看 ReentrantLock 和 Condition

Java ReEntrantLock 之 Condition条件(Java代码实战-002)

多线程(十一AQS原理-ReentrantLock的条件队列Condition)