AQS源码剖析第二篇--公平与非公平,条件队列和线程中断
Posted 热爱编程的大忽悠
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了AQS源码剖析第二篇--公平与非公平,条件队列和线程中断相关的知识,希望对你有一定的参考价值。
AQS源码剖析第二篇--公平与非公平,条件队列和线程中断
系列文章:
公平锁和非公平锁
ReentrantLock 默认采用非公平锁,除非你在构造方法中传入参数 true 。
public ReentrantLock()
sync = new NonfairSync();
public ReentrantLock(boolean fair)
sync = fair ? new FairSync() : new NonfairSync();
公平锁和非公平锁的的区别在tryAcquire方法
public void lock()
sync.acquire(1);
//aqs框架为我们提供的模板方法实现
public final void acquire(int arg)
//tryAcquire是抽象方法,需要子类实现
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
- 公平锁的实现
static final class FairSync extends Sync
private static final long serialVersionUID = -3000897897090466540L;
@ReservedStackAccess
protected final boolean tryAcquire(int acquires)
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0)
//1. 和非公平锁相比,这里多了一个判断:是否有线程在等待
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires))
setExclusiveOwnerThread(current);
return true;
else if (current == getExclusiveOwnerThread())
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
return false;
- 非公平锁的实现
static final class NonfairSync extends Sync
private static final long serialVersionUID = 7316153563782823691L;
protected final boolean tryAcquire(int acquires)
return nonfairTryAcquire(acquires);
@ReservedStackAccess
final boolean nonfairTryAcquire(int acquires)
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0)
// 这里没有对阻塞队列进行判断
if (compareAndSetState(0, acquires))
setExclusiveOwnerThread(current);
return true;
else if (current == getExclusiveOwnerThread())
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
return false;
公平锁就比非公平锁多了一个在抢锁前,是否需要让存在于已经存在于阻塞队列中的线程具有更高优先级。
相对来说,非公平锁会有更好的性能,因为它的吞吐量比较大。当然,非公平锁让获取锁的时间变得更加不确定,可能会导致在阻塞队列中的线程长期处于饥饿状态。
Condition
我们先来看看 Condition 的使用场景,Condition 经常可以用在生产者-消费者的场景中,这里简单给出一个例子:
package com.aqs;
import lombok.extern.slf4j.Slf4j;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@Slf4j
class BoundedBuffer
final Lock lock = new ReentrantLock();
/**
* 当队列满的时候,通知生产者等待; 队列不满的时候,通知消费者消费
*/
final Condition notFull = lock.newCondition();
/**
* 当队列为空,通知消费者等待; 队列不为空,通知生产者生产
*/
final Condition notEmpty = lock.newCondition();
/**
* 资源
*/
final List<Long> resources=new ArrayList<>();
/**
* 可保留未被消费的最大资源数
*/
final Integer MAX_RESOURCES_NUM=3;
public void put() throws InterruptedException
lock.lock();
try
Thread.sleep(5000L);
while (MAX_RESOURCES_NUM.equals(resources.size()))
notFull.await(); // 队列已满,等待,直到 not full 才能继续生产
Long res=System.currentTimeMillis();
log.info("生产者当前生产的资源为: ",res);
resources.add(res);
notEmpty.signal(); // 生产成功,队列已经 not empty 了,发个通知出去
finally
lock.unlock();
public void take() throws InterruptedException
lock.lock();
try
Thread.sleep(5000L);
while (resources.size() == 0)
notEmpty.await(); // 队列为空,等待,直到队列 not empty,才能继续消费
Long res = resources.remove(0);
log.info("消费者当前消费掉的资源为: ",res);
notFull.signal(); // 被我消费掉一个,队列 not full 了,发个通知出去
finally
lock.unlock();
1、我们可以看到,在使用 condition 时,必须先持有相应的锁。这个和 Object 类中的方法有相似的语义,需要先持有某个对象的监视器锁才可以执行 wait(), notify() 或 notifyAll() 方法。
2、ArrayBlockingQueue 采用这种方式实现了生产者-消费者,所以请只把这个例子当做学习例子,实际生产中可以直接使用 ArrayBlockingQueue
我们常用 obj.wait(),obj.notify() 或 obj.notifyAll() 来实现相似的功能,但是,它们是基于对象的监视器锁的。而这里说的 Condition 是基于 ReentrantLock 实现的,而 ReentrantLock 是依赖于 AbstractQueuedSynchronizer 实现的。
在往下看之前,读者心里要有一个整体的概念。condition 是依赖于 ReentrantLock 的,不管是调用 await 进入等待还是 signal 唤醒,都必须获取到锁才能进行操作。
每个 ReentrantLock 实例可以通过调用多次 newCondition 产生多个 ConditionObject 的实例:
final ConditionObject newCondition()
// 实例化一个 ConditionObject
return new ConditionObject();
我们首先来看下我们关注的 Condition 的实现类 AbstractQueuedSynchronizer 类中的 ConditionObject。
public class ConditionObject implements Condition, java.io.Serializable
private static final long serialVersionUID = 1173984872572414699L;
// 条件队列的第一个节点
// 不要管这里的关键字 transient,是不参与序列化的意思
private transient Node firstWaiter;
// 条件队列的最后一个节点
private transient Node lastWaiter;
......
在上一篇介绍 AQS 的时候,我们有一个阻塞队列,用于保存等待获取锁的线程的队列。这里我们引入另一个概念,叫条件队列(condition queue),我画了一张简单的图用来说明这个。
这里的阻塞队列如果叫做同步队列(sync queue)其实比较贴切,不过为了和前篇呼应,我就继续使用阻塞队列了。记住这里的两个概念,阻塞队列和条件队列。
这里,我们简单回顾下 Node 的属性:
volatile int waitStatus; // 可取值 0、CANCELLED(1)、SIGNAL(-1)、CONDITION(-2)、PROPAGATE(-3)
volatile Node prev;
volatile Node next;
volatile Thread thread;
Node nextWaiter;
prev 和 next 用于实现阻塞队列的双向链表,这里的 nextWaiter 用于实现条件队列的单向链表
基本上,把这张图看懂,你也就知道 condition 的处理流程了。所以,我先简单解释下这图,然后再具体地解释代码实现。
- 条件队列和阻塞队列的节点,都是 Node 的实例,因为条件队列的节点是需要转移到阻塞队列中去的;
- 我们知道一个 ReentrantLock 实例可以通过多次调用 newCondition() 来产生多个 Condition实例,这里对应 condition1 和 condition2。注意,ConditionObject 只有两个属性firstWaiter 和 lastWaiter;
- 每个 condition 有一个关联的条件队列,如线程 1 调用 condition1.await() 方法即可将当前线程 1
包装成 Node 后加入到条件队列中,然后阻塞在这里,不继续往下执行,条件队列是一个单向链表; - 调用condition1.signal() 触发一次唤醒,此时唤醒的是队头,会将condition1 对应的条件队列的
firstWaiter(队头) 移到阻塞队列的队尾,等待获取锁,获取锁后 await 方法才能返回,继续往下执行。
接下来,我们一步步按照流程来走代码分析,我们先来看看 await 方法:
// 首先,这个方法是可被中断的,不可被中断的是另一个方法 awaitUninterruptibly()
// 这个方法会阻塞,直到调用 signal 方法(指 signal() 和 signalAll(),下同),或被中断
public final void await() throws InterruptedException
// 老规矩,既然该方法要响应中断,那么在最开始就判断中断状态
if (Thread.interrupted())
throw new InterruptedException();
// 添加到 condition 的条件队列中
Node node = addConditionWaiter();
// 释放锁,返回值是释放锁之前的 state 值
// await() 之前,当前线程是必须持有锁的,这里肯定要释放掉
int savedState = fullyRelease(node);
int interruptMode = 0;
// 这里退出循环有两种情况,之后再仔细分析
// 1. isOnSyncQueue(node) 返回 true,即当前 node 已经转移到阻塞队列了
// 2. checkInterruptWhileWaiting(node) != 0 会到 break,然后退出循环,代表的是线程中断
while (!isOnSyncQueue(node))
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
// 被唤醒后,将进入阻塞队列,等待获取锁
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
其实,我大体上也把整个 await 过程说得十之八九了,下面我们分步把上面的几个点用源码说清楚。
1. 将节点加入到条件队列
addConditionWaiter() 是将当前节点加入到条件队列,看图我们知道,这种条件队列内的操作是线程安全的。
// 将当前线程对应的节点入队,插入队尾
private Node addConditionWaiter()
Node t = lastWaiter;
// 如果条件队列的最后一个节点取消了,将其清除出去
// 为什么这里把 waitStatus 不等于 Node.CONDITION,就判定为该节点发生了取消排队?
if (t != null && t.waitStatus != Node.CONDITION)
// 这个方法会遍历整个条件队列,然后会将已取消的所有节点清除出队列
unlinkCancelledWaiters();
t = lastWaiter;
// node 在初始化的时候,指定 waitStatus 为 Node.CONDITION
Node node = new Node(Thread.currentThread(), Node.CONDITION);
// t 此时是 lastWaiter,队尾
// 如果队列为空
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
上面的这块代码很简单,就是将当前线程进入到条件队列的队尾。
在addWaiter 方法中,有一个 unlinkCancelledWaiters() 方法,该方法用于清除队列中已经取消等待的节点。
当 await 的时候如果发生了取消操作(这点之后会说),或者是在节点入队的时候,发现最后一个节点是被取消的,会调用一次这个方法。
// 等待队列是一个单向链表,遍历链表将已经取消等待的节点清除出去
// 纯属链表操作,很好理解,看不懂多看几遍就可以了
private void unlinkCancelledWaiters()
Node t = firstWaiter;
Node trail = null;
while (t != null)
Node next = t.nextWaiter;
// 如果节点的状态不是 Node.CONDITION 的话,这个节点就是被取消的
if (t.waitStatus != Node.CONDITION)
t.nextWaiter = null;
if (trail == null)
firstWaiter = next;
else
trail.nextWaiter = next;
if (next == null)
lastWaiter = trail;
else
trail = t;
t = next;
2. 完全释放独占锁
回到 wait 方法,节点入队了以后,会调用 int savedState = fullyRelease(node);
方法释放锁,注意,这里是完全释放独占锁(fully release),因为 ReentrantLock 是可以重入的。
考虑一下这里的 savedState。如果在 condition1.await() 之前,假设线程先执行了 2 次 lock() 操作,那么 state 为 2,我们理解为该线程持有 2 把锁,这里 await() 方法必须将 state 设置为 0,然后再进入挂起状态,这样其他线程才能持有锁。当它被唤醒的时候,它需要重新持有 2 把锁,才能继续下去。
// 首先,我们要先观察到返回值 savedState 代表 release 之前的 state 值
// 对于最简单的操作:先 lock.lock(),然后 condition1.await()。
// 那么 state 经过这个方法由 1 变为 0,锁释放,此方法返回 1
// 相应的,如果 lock 重入了 n 次,savedState == n
// 如果这个方法失败,会将节点设置为"取消"状态,并抛出异常 IllegalMonitorStateException
final int fullyRelease(Node node)
boolean failed = true;
try
int savedState = getState();
// 这里使用了当前的 state 作为 release 的参数,也就是完全释放掉锁,将 state 置为 0
if (release(savedState))
failed = false;
return savedState;
else
throw new IllegalMonitorStateException();
finally
if (failed)
node.waitStatus = Node.CANCELLED;
考虑一下,如果一个线程在不持有 lock 的基础上,就去调用 condition1.await() 方法,它能进入条件队列,但是在上面的这个方法中,由于它不持有锁,release(savedState) 这个方法肯定要返回 false,进入到异常分支,然后进入 finally 块设置
node.waitStatus = Node.CANCELLED
,这个已经入队的节点之后会被后继的节点”请出去“。
3. 等待进入阻塞队列
释放掉锁以后,接下来是这段,这边会自旋,如果发现自己还没到阻塞队列,那么挂起,等待被转移到阻塞队列。
int interruptMode = 0;
// 如果不在阻塞队列中,注意了,是阻塞队列
while (!isOnSyncQueue(node))
// 线程挂起
LockSupport.park(this);
// 这里可以先不用看了,等看到它什么时候被 unpark 再说
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
isOnSyncQueue(Node node) 用于判断节点是否已经转移到阻塞队列了:
// 在节点入条件队列的时候,初始化时设置了 waitStatus = Node.CONDITION
// 前面我提到,signal 的时候需要将节点从条件队列移到阻塞队列,
// 这个方法就是判断 node 是否已经移动到阻塞队列了
final boolean isOnSyncQueue(Node node)
// 移动过去的时候,node 的 waitStatus 会置为 0,这个之后在说 signal 方法的时候会说到
// 如果 waitStatus 还是 Node.CONDITION,也就是 -2,那肯定就是还在条件队列中
// 如果 node 的前驱 prev 指向还是 null,说明肯定没有在 阻塞队列(prev是阻塞队列链表中使用的)
if (node.waitStatus == Node.CONDITION || node.prev == null)
return false;
// 如果 node 已经有后继节点 next 的时候,那肯定是在阻塞队列了
if (node.next != null)
return true;
// 下面这个方法从阻塞队列的队尾开始从后往前遍历找,如果找到相等的,说明在阻塞队列,否则就是不在阻塞队列
// 可以通过判断 node.prev() != null 来推断出 node 在阻塞队列吗?答案是:不能。
// 这个可以看上篇 AQS 的入队方法,首先设置的是 node.prev 指向 tail,
// 然后是 CAS 操作将自己设置为新的 tail,可是这次的 CAS 是可能失败的。
return findNodeFromTail(node);
// 从阻塞队列的队尾往前遍历,如果找到,返回 true
private boolean findNodeFromTail(Node node)
Node t = tail;
for (;;)
if (t == node)
return true;
if (t == null)
return false;
t = t.prev;
回到前面的循环,isOnSyncQueue(node) 返回 false 的话,那么进到 LockSupport.park(this);
这里线程挂起。
4. signal 唤醒线程,转移到阻塞队列
为了大家理解,这里我们先看唤醒操作,因为刚刚到 LockSupport.park(this);
把线程挂起了,等待唤醒。
唤醒操作通常由另一个线程来操作,就像生产者-消费者模式中,如果线程因为等待消费而挂起,那么当生产者生产了一个东西后,会调用 signal 唤醒正在等待的线程来消费。
// 唤醒等待了最久的线程
// 其实就是,将这个线程对应的 node 从条件队列转移到阻塞队列
public final void signal()
// 调用 signal 方法的线程必须持有当前的独占锁
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first);
// 从条件队列队头往后遍历,找出第一个需要转移的 node
// 因为前面我们说过,有些线程会取消排队,但是可能还在队列中
private void doSignal(Node first)
do
// 将 firstWaiter 指向 first 节点后面的第一个,因为 first 节点马上要离开了
// 如果将 first 移除后,后面没有节点在等待了,那么需要将 lastWaiter 置为 null
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
// 因为 first 马上要被移到阻塞队列了,和条件队列的链接关系在这里断掉
first.nextWaiter = null;
while (!transferForSignal(first) &&
(first = firstWaiter) != null以上是关于AQS源码剖析第二篇--公平与非公平,条件队列和线程中断的主要内容,如果未能解决你的问题,请参考以下文章