AQS源码剖析第二篇--公平与非公平,条件队列和线程中断

Posted 热爱编程的大忽悠

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了AQS源码剖析第二篇--公平与非公平,条件队列和线程中断相关的知识,希望对你有一定的参考价值。

AQS源码剖析第二篇--公平与非公平,条件队列和线程中断


系列文章:

AQS源码剖析第一篇—全貌概览


公平锁和非公平锁

ReentrantLock 默认采用非公平锁,除非你在构造方法中传入参数 true 。

    public ReentrantLock() 
        sync = new NonfairSync();
    
    
    public ReentrantLock(boolean fair) 
        sync = fair ? new FairSync() : new NonfairSync();
    

公平锁和非公平锁的的区别在tryAcquire方法

    public void lock() 
        sync.acquire(1);
    
     //aqs框架为我们提供的模板方法实现
     public final void acquire(int arg) 
         //tryAcquire是抽象方法,需要子类实现
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    
  • 公平锁的实现
    static final class FairSync extends Sync 
        private static final long serialVersionUID = -3000897897090466540L;
      
        @ReservedStackAccess
        protected final boolean tryAcquire(int acquires) 
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) 
                //1. 和非公平锁相比,这里多了一个判断:是否有线程在等待
                if (!hasQueuedPredecessors() &&
                    compareAndSetState(0, acquires)) 
                    setExclusiveOwnerThread(current);
                    return true;
                
            
            else if (current == getExclusiveOwnerThread()) 
                int nextc = c + acquires;
                if (nextc < 0)
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            
            return false;
        
    
  • 非公平锁的实现
    static final class NonfairSync extends Sync 
        private static final long serialVersionUID = 7316153563782823691L;
        protected final boolean tryAcquire(int acquires) 
            return nonfairTryAcquire(acquires);
        
    
    
            @ReservedStackAccess
        final boolean nonfairTryAcquire(int acquires) 
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) 
             // 这里没有对阻塞队列进行判断
                if (compareAndSetState(0, acquires)) 
                    setExclusiveOwnerThread(current);
                    return true;
                
            
            else if (current == getExclusiveOwnerThread()) 
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            
            return false;
        

公平锁就比非公平锁多了一个在抢锁前,是否需要让存在于已经存在于阻塞队列中的线程具有更高优先级。

相对来说,非公平锁会有更好的性能,因为它的吞吐量比较大。当然,非公平锁让获取锁的时间变得更加不确定,可能会导致在阻塞队列中的线程长期处于饥饿状态。


Condition

我们先来看看 Condition 的使用场景,Condition 经常可以用在生产者-消费者的场景中,这里简单给出一个例子:

package com.aqs;

import lombok.extern.slf4j.Slf4j;

import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

@Slf4j
class BoundedBuffer 
    final Lock lock = new ReentrantLock();

    /**
     * 当队列满的时候,通知生产者等待; 队列不满的时候,通知消费者消费
     */
    final Condition notFull = lock.newCondition();

    /**
     * 当队列为空,通知消费者等待; 队列不为空,通知生产者生产
     */
    final Condition notEmpty = lock.newCondition();

    /**
     * 资源
     */
    final List<Long> resources=new ArrayList<>();

    /**
     * 可保留未被消费的最大资源数
     */
    final Integer MAX_RESOURCES_NUM=3;



    public void put() throws InterruptedException 
        lock.lock();
        try 
            Thread.sleep(5000L);

            while (MAX_RESOURCES_NUM.equals(resources.size()))
                notFull.await();  // 队列已满,等待,直到 not full 才能继续生产
            

            Long res=System.currentTimeMillis();

            log.info("生产者当前生产的资源为: ",res);

            resources.add(res);

            notEmpty.signal(); // 生产成功,队列已经 not empty 了,发个通知出去
         finally 
            lock.unlock();
        
    


    public void take() throws InterruptedException 
        lock.lock();
        try 
            Thread.sleep(5000L);

            while (resources.size() == 0)
                notEmpty.await(); // 队列为空,等待,直到队列 not empty,才能继续消费
            

            Long res = resources.remove(0);

            log.info("消费者当前消费掉的资源为: ",res);

            notFull.signal(); // 被我消费掉一个,队列 not full 了,发个通知出去

         finally 
            lock.unlock();
        
    

1、我们可以看到,在使用 condition 时,必须先持有相应的锁。这个和 Object 类中的方法有相似的语义,需要先持有某个对象的监视器锁才可以执行 wait(), notify() 或 notifyAll() 方法。

2、ArrayBlockingQueue 采用这种方式实现了生产者-消费者,所以请只把这个例子当做学习例子,实际生产中可以直接使用 ArrayBlockingQueue

我们常用 obj.wait(),obj.notify() 或 obj.notifyAll() 来实现相似的功能,但是,它们是基于对象的监视器锁的。而这里说的 Condition 是基于 ReentrantLock 实现的,而 ReentrantLock 是依赖于 AbstractQueuedSynchronizer 实现的。

在往下看之前,读者心里要有一个整体的概念。condition 是依赖于 ReentrantLock 的,不管是调用 await 进入等待还是 signal 唤醒,都必须获取到锁才能进行操作。

每个 ReentrantLock 实例可以通过调用多次 newCondition 产生多个 ConditionObject 的实例:

final ConditionObject newCondition() 
    // 实例化一个 ConditionObject
    return new ConditionObject();

我们首先来看下我们关注的 Condition 的实现类 AbstractQueuedSynchronizer 类中的 ConditionObject。

public class ConditionObject implements Condition, java.io.Serializable 
        private static final long serialVersionUID = 1173984872572414699L;
        // 条件队列的第一个节点
          // 不要管这里的关键字 transient,是不参与序列化的意思
        private transient Node firstWaiter;
        // 条件队列的最后一个节点
        private transient Node lastWaiter;
        ......

在上一篇介绍 AQS 的时候,我们有一个阻塞队列,用于保存等待获取锁的线程的队列。这里我们引入另一个概念,叫条件队列(condition queue),我画了一张简单的图用来说明这个。

这里的阻塞队列如果叫做同步队列(sync queue)其实比较贴切,不过为了和前篇呼应,我就继续使用阻塞队列了。记住这里的两个概念,阻塞队列和条件队列。


这里,我们简单回顾下 Node 的属性:

volatile int waitStatus; // 可取值 0、CANCELLED(1)、SIGNAL(-1)、CONDITION(-2)、PROPAGATE(-3)
volatile Node prev;
volatile Node next;
volatile Thread thread;
Node nextWaiter;

prev 和 next 用于实现阻塞队列的双向链表,这里的 nextWaiter 用于实现条件队列的单向链表


基本上,把这张图看懂,你也就知道 condition 的处理流程了。所以,我先简单解释下这图,然后再具体地解释代码实现。

  • 条件队列和阻塞队列的节点,都是 Node 的实例,因为条件队列的节点是需要转移到阻塞队列中去的;
  • 我们知道一个 ReentrantLock 实例可以通过多次调用 newCondition() 来产生多个 Condition实例,这里对应 condition1 和 condition2。注意,ConditionObject 只有两个属性firstWaiter 和 lastWaiter;
  • 每个 condition 有一个关联的条件队列,如线程 1 调用 condition1.await() 方法即可将当前线程 1
    包装成 Node 后加入到条件队列中,然后阻塞在这里,不继续往下执行,条件队列是一个单向链表;
  • 调用condition1.signal() 触发一次唤醒,此时唤醒的是队头,会将condition1 对应的条件队列的
    firstWaiter(队头) 移到阻塞队列的队尾,等待获取锁,获取锁后 await 方法才能返回,继续往下执行。

接下来,我们一步步按照流程来走代码分析,我们先来看看 await 方法:

// 首先,这个方法是可被中断的,不可被中断的是另一个方法 awaitUninterruptibly()
// 这个方法会阻塞,直到调用 signal 方法(指 signal() 和 signalAll(),下同),或被中断
public final void await() throws InterruptedException 
    // 老规矩,既然该方法要响应中断,那么在最开始就判断中断状态
    if (Thread.interrupted())
        throw new InterruptedException();

    // 添加到 condition 的条件队列中
    Node node = addConditionWaiter();

    // 释放锁,返回值是释放锁之前的 state 值
    // await() 之前,当前线程是必须持有锁的,这里肯定要释放掉
    int savedState = fullyRelease(node);

    int interruptMode = 0;
    // 这里退出循环有两种情况,之后再仔细分析
    // 1. isOnSyncQueue(node) 返回 true,即当前 node 已经转移到阻塞队列了
    // 2. checkInterruptWhileWaiting(node) != 0 会到 break,然后退出循环,代表的是线程中断
    while (!isOnSyncQueue(node)) 
        LockSupport.park(this);
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    
    // 被唤醒后,将进入阻塞队列,等待获取锁
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null) // clean up if cancelled
        unlinkCancelledWaiters();
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);

其实,我大体上也把整个 await 过程说得十之八九了,下面我们分步把上面的几个点用源码说清楚。

1. 将节点加入到条件队列

addConditionWaiter() 是将当前节点加入到条件队列,看图我们知道,这种条件队列内的操作是线程安全的。

// 将当前线程对应的节点入队,插入队尾
private Node addConditionWaiter() 
    Node t = lastWaiter;
    // 如果条件队列的最后一个节点取消了,将其清除出去
    // 为什么这里把 waitStatus 不等于 Node.CONDITION,就判定为该节点发生了取消排队?
    if (t != null && t.waitStatus != Node.CONDITION) 
        // 这个方法会遍历整个条件队列,然后会将已取消的所有节点清除出队列
        unlinkCancelledWaiters();
        t = lastWaiter;
    
    // node 在初始化的时候,指定 waitStatus 为 Node.CONDITION
    Node node = new Node(Thread.currentThread(), Node.CONDITION);

    // t 此时是 lastWaiter,队尾
    // 如果队列为空
    if (t == null)
        firstWaiter = node;
    else
        t.nextWaiter = node;
    lastWaiter = node;
    return node;

上面的这块代码很简单,就是将当前线程进入到条件队列的队尾。

在addWaiter 方法中,有一个 unlinkCancelledWaiters() 方法,该方法用于清除队列中已经取消等待的节点。

当 await 的时候如果发生了取消操作(这点之后会说),或者是在节点入队的时候,发现最后一个节点是被取消的,会调用一次这个方法。

// 等待队列是一个单向链表,遍历链表将已经取消等待的节点清除出去
// 纯属链表操作,很好理解,看不懂多看几遍就可以了
private void unlinkCancelledWaiters() 
    Node t = firstWaiter;
    Node trail = null;
    while (t != null) 
        Node next = t.nextWaiter;
        // 如果节点的状态不是 Node.CONDITION 的话,这个节点就是被取消的
        if (t.waitStatus != Node.CONDITION) 
            t.nextWaiter = null;
            if (trail == null)
                firstWaiter = next;
            else
                trail.nextWaiter = next;
            if (next == null)
                lastWaiter = trail;
        
        else
            trail = t;
        t = next;
    

2. 完全释放独占锁

回到 wait 方法,节点入队了以后,会调用 int savedState = fullyRelease(node); 方法释放锁,注意,这里是完全释放独占锁(fully release),因为 ReentrantLock 是可以重入的。

考虑一下这里的 savedState。如果在 condition1.await() 之前,假设线程先执行了 2 次 lock() 操作,那么 state 为 2,我们理解为该线程持有 2 把锁,这里 await() 方法必须将 state 设置为 0,然后再进入挂起状态,这样其他线程才能持有锁。当它被唤醒的时候,它需要重新持有 2 把锁,才能继续下去。

// 首先,我们要先观察到返回值 savedState 代表 release 之前的 state 值
// 对于最简单的操作:先 lock.lock(),然后 condition1.await()。
//         那么 state 经过这个方法由 1 变为 0,锁释放,此方法返回 1
//         相应的,如果 lock 重入了 n 次,savedState == n
// 如果这个方法失败,会将节点设置为"取消"状态,并抛出异常 IllegalMonitorStateException
final int fullyRelease(Node node) 
    boolean failed = true;
    try 
        int savedState = getState();
        // 这里使用了当前的 state 作为 release 的参数,也就是完全释放掉锁,将 state 置为 0
        if (release(savedState)) 
            failed = false;
            return savedState;
         else 
            throw new IllegalMonitorStateException();
        
     finally 
        if (failed)
            node.waitStatus = Node.CANCELLED;
    

考虑一下,如果一个线程在不持有 lock 的基础上,就去调用 condition1.await() 方法,它能进入条件队列,但是在上面的这个方法中,由于它不持有锁,release(savedState) 这个方法肯定要返回 false,进入到异常分支,然后进入 finally 块设置 node.waitStatus = Node.CANCELLED,这个已经入队的节点之后会被后继的节点”请出去“。

3. 等待进入阻塞队列

释放掉锁以后,接下来是这段,这边会自旋,如果发现自己还没到阻塞队列,那么挂起,等待被转移到阻塞队列。

int interruptMode = 0;
// 如果不在阻塞队列中,注意了,是阻塞队列
while (!isOnSyncQueue(node)) 
    // 线程挂起
    LockSupport.park(this);

    // 这里可以先不用看了,等看到它什么时候被 unpark 再说
    if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
        break;

isOnSyncQueue(Node node) 用于判断节点是否已经转移到阻塞队列了:

// 在节点入条件队列的时候,初始化时设置了 waitStatus = Node.CONDITION
// 前面我提到,signal 的时候需要将节点从条件队列移到阻塞队列,
// 这个方法就是判断 node 是否已经移动到阻塞队列了
final boolean isOnSyncQueue(Node node) 

    // 移动过去的时候,node 的 waitStatus 会置为 0,这个之后在说 signal 方法的时候会说到
    // 如果 waitStatus 还是 Node.CONDITION,也就是 -2,那肯定就是还在条件队列中
    // 如果 node 的前驱 prev 指向还是 null,说明肯定没有在 阻塞队列(prev是阻塞队列链表中使用的)
    if (node.waitStatus == Node.CONDITION || node.prev == null)
        return false;
    // 如果 node 已经有后继节点 next 的时候,那肯定是在阻塞队列了
    if (node.next != null) 
        return true;

    // 下面这个方法从阻塞队列的队尾开始从后往前遍历找,如果找到相等的,说明在阻塞队列,否则就是不在阻塞队列

    // 可以通过判断 node.prev() != null 来推断出 node 在阻塞队列吗?答案是:不能。
    // 这个可以看上篇 AQS 的入队方法,首先设置的是 node.prev 指向 tail,
    // 然后是 CAS 操作将自己设置为新的 tail,可是这次的 CAS 是可能失败的。

    return findNodeFromTail(node);


// 从阻塞队列的队尾往前遍历,如果找到,返回 true
private boolean findNodeFromTail(Node node) 
    Node t = tail;
    for (;;) 
        if (t == node)
            return true;
        if (t == null)
            return false;
        t = t.prev;
    

回到前面的循环,isOnSyncQueue(node) 返回 false 的话,那么进到 LockSupport.park(this); 这里线程挂起。

4. signal 唤醒线程,转移到阻塞队列

为了大家理解,这里我们先看唤醒操作,因为刚刚到 LockSupport.park(this); 把线程挂起了,等待唤醒。

唤醒操作通常由另一个线程来操作,就像生产者-消费者模式中,如果线程因为等待消费而挂起,那么当生产者生产了一个东西后,会调用 signal 唤醒正在等待的线程来消费。

// 唤醒等待了最久的线程
// 其实就是,将这个线程对应的 node 从条件队列转移到阻塞队列
public final void signal() 
    // 调用 signal 方法的线程必须持有当前的独占锁
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    Node first = firstWaiter;
    if (first != null)
        doSignal(first);


// 从条件队列队头往后遍历,找出第一个需要转移的 node
// 因为前面我们说过,有些线程会取消排队,但是可能还在队列中
private void doSignal(Node first) 
    do 
          // 将 firstWaiter 指向 first 节点后面的第一个,因为 first 节点马上要离开了
        // 如果将 first 移除后,后面没有节点在等待了,那么需要将 lastWaiter 置为 null
        if ( (firstWaiter = first.nextWaiter) == null)
            lastWaiter = null;
        // 因为 first 马上要被移到阻塞队列了,和条件队列的链接关系在这里断掉
        first.nextWaiter = null;
     while (!transferForSignal(first) &&
             (first = firstWaiter) != null以上是关于AQS源码剖析第二篇--公平与非公平,条件队列和线程中断的主要内容,如果未能解决你的问题,请参考以下文章

ReentrantLock源码探究探究公平锁与非公平锁背后的奥秘

AQS源码剖析第一篇---全貌概览

公平锁与非公平锁源码对比

公平锁与非公平锁

公平锁与非公平锁

源码剖析:AQS-AbstractQueuedSynchronizer