ReentrantLock源码分析
Posted 无虑的小猪
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了ReentrantLock源码分析相关的知识,希望对你有一定的参考价值。
一、ReentrantLock介绍
ReentrantLock是JDK1.5引入的,实现Lock接口的互斥锁。保证多线程的环境下,共享资源的原子性。与Synchronized的非公平锁不同,ReentrantLock的实现公平锁、非公平锁。ReentrantLock是重入锁,重入是指,同一个线程可以重复执行加锁的代码段。
二、ReentrantLock使用
ReentrantLock功能与Synchronized一样,使用起来比Synchronized更加灵活。与Synchronized自动加锁、释放锁不同,ReentrantLock需要手动加锁、释放锁。使用示例如下:
import java.util.concurrent.locks.ReentrantLock; public class TestReentrantLock public static void main(String[] args) ReentrantLock lock = new ReentrantLock(); lock.lock(); try // todo finally lock.unlock();
三、ReentrantLock原理
Synchronized基于对象实现;ReentrantLock基于 AQS + CAS 实现。
3.1、lock()流程图
ReentrantLock基于抽象队列同步器AQS + CAS 实现的加锁、释放锁。ReentrantLock实现了公平锁、非公平锁,公平锁与非公平锁唯一的区别在于,非公平锁不会判断等待队列中是否节点等待获取锁,而是直接尝试获取锁,获取不到,再将当前线程节点添加进等待队列的尾节点,判断当前线程节点是否挂起。
3.2、unlock()流程图
四、ReentrantLock源码分析
1、构造函数
private final Sync sync; // 默认使用非公平锁 public ReentrantLock() sync = new NonfairSync(); // fair=true,公平锁;否则,非公平锁 public ReentrantLock(boolean fair) sync = fair ? new FairSync() : new NonfairSync();
Sync是ReentrantLock的抽象静态内部类,继承自AQS(AbstractQueuedSynchronizer) - 抽象队列同步器,AQS中定义了锁的基本行为,AQS中用volatile修饰的state表示当前锁重入的次数。
01 Sync类图:TODO
NonfairSync、FairSync是ReentrantLock的静态内部类,继承ReentrantLock$Sync,NonfairSync实现非公平锁,FairSync实现公平锁。
2、加锁
1、lock()
private final Sync sync; // 加锁 public void lock() sync.lock();
1.1、公平锁
调用AQS的acquire方法。ReentrantLock$FairSync#lock() 核心代码:
// 加锁 final void lock() acquire(1);
1.2、非公平锁
通过CAS尝试获取锁(将AQS的state由0修改为1),若成功,代表当前线程获取锁资源成功;若失败调用AQS的acquire方法。ReentrantLock$NonfairSync#lock() 核心代码:
// 加锁 final void lock() // 获取锁资源,CAS 修改 AQS 的 state 属性值,,获取成功,设置当前线程 if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); // 获取失败,执行AQS的acquire else acquire(1);
2、acquire()
acquire()方法是Sync父类AQS中的方法,AbstractQueuedSynchronizer#acquire() 核心代码:
// 获取锁资源 public final void acquire(int arg) // 尝试获取锁资源 if (!tryAcquire(arg) && // 当前线程为获取到锁资源,加入等待队列,同时挂起线程,等待唤醒 acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt();
1、调用tryAcquire方法:
2、调用addWaiter方法
3、调用acquireQueued方法
2.1、tryAcquire()
tryAcquire()方法在FairSync、NonFairSync中均有实现,尝试获取锁资源,核心代码如下:
// 公平锁 FairSync#tryAcquire() 方法 protected final boolean tryAcquire(int acquires) // 获取当前线程 final Thread current = Thread.currentThread(); // 获取AQS的 state int c = getState(); // state == 0 当前没有线程占用锁资源 if (c == 0) // 判断是否有线程在排队,若有线程在排队,返回true if (!hasQueuedPredecessors() && // 尝试抢锁 compareAndSetState(0, acquires)) // 无线程排队,将线程属性设置为当前线程 setExclusiveOwnerThread(current); return true; // state != 0 有线程占用锁资源 // 占用锁资源的线程是否为当前线程 else if (current == getExclusiveOwnerThread()) // state + 1 int nextc = c + acquires; // 锁重入超出最大限制 (int的最大值),抛异常 if (nextc < 0) throw new Error("Maximum lock count exceeded"); // 将 state + 1 设置给 state setState(nextc); // 当前线程拿到锁资源,返回true return true; return false; // 非公平锁 NonFairSync#tryAcquire() 方法 protected final boolean tryAcquire(int acquires) return nonfairTryAcquire(acquires); // 非公平锁 Sync#nonfairTryAcquire() 方法 final boolean nonfairTryAcquire(int acquires) // 获取当前线程 final Thread current = Thread.currentThread(); // 获取AQS的 state int c = getState(); // 无线程占用锁资源 if (c == 0) // CAS 修改 state 的值,修改成功,设置线程属性为当前线程,返回占用锁资源标识 if (compareAndSetState(0, acquires)) setExclusiveOwnerThread(current); return true; // 有线程占用锁资源 // 占用锁资源的线程是当前线程(重入) else if (current == getExclusiveOwnerThread()) // AQS 的 state + acquires int nextc = c + acquires; // 超出锁重入的上限(int的最大值),抛异常 if (nextc < 0) throw new Error("Maximum lock count exceeded"); // 将 state + acquires 设置到 state 属性 setState(nextc); return true; return false;
获取当前线程、AQS的state。AQS的state属性值为0,表示无线程占用锁资源,判断等待队列中是否有线程在排队,若有线程在排队,返回尝试抢锁失败标识,将线程添加进等待队列中。
2.2、addWaiter()
为当前线程创建入队节点AbstractQueuedSynchronizer$Node,入参mode表示锁类型,在AQS的静态内部类Node中有SHARE、EXCLUSIVE两个属性,SHARE代表共享锁、EXCLUSIVE代表排它锁。
AbstractQueuedSynchronizer#addWaiter() 核心代码:
// 等待队列的尾节点,懒加载,只能通过enq方法添加节点 private transient volatile Node tail; private Node addWaiter(Node mode) // 当前线程、获取的锁类型封装为Node对象 Node node = new Node(Thread.currentThread(), mode); // 获取等待队列的尾节点 Node pred = tail; // 尾节点不为null if (pred != null) // 将当前节点设置为等待队列的尾节点 node.prev = pred; if (compareAndSetTail(pred, node)) pred.next = node; return node; // 等待队列为空,初始化等待队列节点信息 enq(node); // 返回当前线程节点 return node;
等待队列不为空,将当前线程封装的Node节点添加进队列尾部;若等待队列为空,先初始化等待队列,然后在将Node节点添加进队列尾部。
2.2.1、enq()
等待队列尾节点为空时,执行enq()方法初始化等待队列,并将Node节点添加进等待队列中。
private Node enq(final Node node) for (;;) // 获取等待队列的尾节点 Node t = tail; // 等待队列为空,初始化等待队列 if (t == null) // 初始化等待队列头尾节点 if (compareAndSetHead(new Node())) tail = head; else // 当前线程的Node添加到等待队列中 node.prev = t; if (compareAndSetTail(t, node)) t.next = node; return t;
2.3、acquireQueued()
当前线程是否挂起,AbstractQueuedSynchronizer#acquireQueued() 核心代码:
final boolean acquireQueued(final Node node, int arg) // 获取锁资源标识 boolean failed = true; try boolean interrupted = false; // 自旋 for (;;) // 获取当前节点的前驱节点 final Node p = node.predecessor(); // 当前节点的前驱节点为头节点,并获取锁资源成功 if (p == head && tryAcquire(arg)) // 将当前节点设置到head - 头节点 setHead(node); // 原头节点的下一节点指向设置为null,GC回收 p.next = null; // 设置获取锁资源成功 failed = false; // 不管线程GC return interrupted; // 如果当前节点不是head的下一节点,获取锁资源失败,尝试将线程挂起 if (shouldParkAfterFailedAcquire(p, node) && // 线程挂起, UNSAFE.park() parkAndCheckInterrupt()) interrupted = true; finally if (failed) cancelAcquire(node);
查看当前排队的Node是否是head的next, 如果是,尝试获取锁资源, 如果不是或者获取锁资源失败那么就尝试将当前Node的线程挂起(unsafe.park())。
shouldParkAfterFailedAcquire检查并更新未成功获取锁资源的状态,返回true表示线程被挂起。
AbstractQueuedSynchronizer#shouldParkAfterFailedAcquire() 核心代码:
static final class Node // 线程被取消 static final int CANCELLED = 1; // 等待队列中存在待被唤醒的挂起线程 static final int SIGNAL = -1; // 当前线程在Condition队列中,未在AQS对列中 static final int CONDITION = -2; // 解决JDK1.5的BUG。共享锁在释放资源后,若头节点为0,无法确定真的没有后继节点 // 如果头节点为0,需要将头节点的状态改为 -3 ,当最新拿到锁资源的线程查看 // 是否有后继节点并且为当前锁为共享锁,需唤醒排队的线程。 static final int PROPAGATE = -3; // 获取锁资源失败,挂起线程 private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) // 获取当前节点的上一个节点的状态 int ws = pred.waitStatus; // 上一节点被挂起 if (ws == Node.SIGNAL) // 返回true,挂起当前线程 return true; if (ws > 0) // 上一节点被取消,获取最近的线程挂起节点, // 并将当前节点的上一节点指向最近的线程挂起节点 do node.prev = pred = pred.prev; while (pred.waitStatus > 0); // 最近线程挂起节点的下一节点指向当前节点 pred.next = node; else // 上一节点状态小于等于0,存在线程处于等待状态,但未被挂起的场景 // 通过CAS将处于等待的线程挂起,避免在挂起前节点获取到锁资源 compareAndSetWaitStatus(pred, ws, Node.SIGNAL); // 返回true,不挂起当前线程 return false;
在挂起线程前,确认当前节点的上一个节点的状态。若为1,代表是取消的节点,不能挂起;若为-1,代表后续节点中有挂起的线程;若为-2 (线程在等待队列 - Condition队列中)、-3 (避免线程无法唤醒的一个状态),需要将状态改为-1之后,才能挂起当前线程。
3、释放锁
1、unlock()
释放锁,ReentrantLock#unlock() 核心代码:
// 释放锁 public void unlock() sync.release(1);
unlock方法实际调用的是AQS的release方法,AbstractQueuedSynchronizer#release() 核心代码:
// 等待队列的头节点,懒加载,通过setHead方法初始化 private transient volatile Node head; // 释放锁 public final boolean release(int arg) // 当前线程释放锁资源的计数值 if (tryRelease(arg)) // 当前线程玩去释放锁资源,获取等待队列头节点 Node h = head; if (h != null && h.waitStatus != 0) // 唤醒等待队列中待唤醒的节点 unparkSuccessor(h); // 完全释放锁资源 return true; // 当前线程未完全释放锁资源 return false;
1.1、tryRelease()
释放锁,Reenttrant$Sync#tryRelease()的核心代码
// 释放锁 protected final boolean tryRelease(int releases) // 修改 AQS 的 state int c = getState() - releases; // 当前线程不是持有锁的线程,抛出异常 if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); // 是否成功的将锁资源完全释放标识 (state == 0) boolean free = false; // 锁资源完全释放 if (c == 0) // 修改标识 free = true; // 将占用锁资源的属性设置为null setExclusiveOwnerThread(null); // state赋值 setState(c); // 返回true表示当前线程完全释放锁资源; // 返回false标识当前线程是由锁资源,持有计数值减少 return free;
源码分析ReentrantLock实现原理
ReentrantLock基本用法
ReentrantLock重入锁,区别于Synchronized关键,ReentrantLock是在Java Api层面上的锁实现。和Synchronized关键相比,ReentrantLock提供了更高级的功能,例如指定是否为公平锁、限时等待、响应中断等。
常用方法
ReentrantLock lock = new ReentrantLock(); // 构造函数,默认非公平锁;如要指定公平,传入true
lock.lock(); // 获取锁,如果没有获取成功,则进入等待
lock.unlock(); // 释放锁,需要事先获取到锁
lock.lockInterruptibly(); // 可中断的获取锁,需要接收一个中断通知
lock.tryLock(); // 尝试获取锁,获取成功返回true,失败立即返回false,不进行等待
lock.tryLock(1000, TimeUnit.MILLISECONDS); // 带时间的尝试,获取失败,等待指定时间
lock.isLocked(); // 判断该锁失败是锁定状态,state!=0
lock.isHeldByCurrentThread(); // 判断锁是否被当前线程持有 exclusiveOwnerThread == Thread.currentThread
lock.getHoldCount(); // 锁进入的次数,获取的是state的值,每进入一次,state+1
lock.isFair(); // 是否为公平
类结构
ReentrantLock的源码实现
下面将从非公平锁、公平锁、释放锁等实现源码来分析。
非公平锁获取锁的实现
ReentrantLock的默认实现是非公平锁。
public ReentrantLock() {
sync = new NonfairSync();
}
所以非公平锁的lock
方法实现,在NonfairSync
类中。
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;
/**
* Performs lock. Try immediate barge, backing up to normal
* acquire on failure.
*/
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
NonfairSync
的lock
方法实现中,首先会通过一个CAS的操作设置state的值,如果此时刚好有线程释放锁且当前线程仍然持有cpu时间片,那么就会进行插队(非公平性)拿到这边锁,并将锁的独占线程设置为当前线程。如果获取锁失败,则通过acquire
方法再次进行尝试。
acquire实现
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
acquire
方法有好几个逻辑,包括tryAcquire
尝试获取锁、addWaiter
创建等待节点、acquireQueued
在队列内获取锁等操作。
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) { // 1
if (compareAndSetState(0, acquires)) { // 2
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) { // 3
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
1、首先判断锁的状态是否为未锁定状态,
2、如果是,则通过CAS操作,设置锁的状态值为acquires。并设置锁的独占线程为当前线程。
3、如果锁已经被占,则判锁的独占线程是否为当前线程。如果是,则锁的状态值累加acquires。因为ReentrantLock是可重入锁,所以会判断锁的状态值是否溢出,防止某个线程陷入死循环的加锁。
tryAcquire
方法会返回获取锁成功或失败,如果成功,那么线程获取到锁。lock
方法结束。否则就需要将当前线程入队,进行排队等待获取锁。
acquireQueued实现
acquireQueued
方法是在队列内部获取锁的实现,在执行它的逻辑之前, 先要执行addWaiter
方法,创建等待节点。
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
Node
是AbstractQueuedSynchronizer
的内部类。AQS是抽象队列同步器,它在内部维护了一个称之为CLH的双向队列,队列的每个元素是Node,Node中包含了当前线程、等待状态、前驱后继节点等信息。
static final class Node {
static final Node SHARED = new Node(); // 表示节点是共享模式
static final Node EXCLUSIVE = null; // 表示节点是独占模式
static final int CANCELLED = 1; // 表示该节点的线程处于取消状态
static final int SIGNAL = -1; // 表示该节点的下一个节点需要被挂起
static final int CONDITION = -2; // 表示该节点等待一个condition唤醒
static final int PROPAGATE = -3; // 表示下一个acquireShared无条件传播
volatile int waitStatus; // 等待状态,如果是Sync节点,默认值为0,如果是condition队列的节点,默认职位CONDITION(-2)。使用CAS进行原子的更新
volatile Node prev; // 指向前驱节点,当前节点依赖它检查状态
volatile Node next; // 指向后继节点,当前节点释放是需要把它唤醒
volatile Thread thread; // 当前Thread
Node nextWaiter; // 执行下一个等待condition条件的节点,
}
Node的状态说明:
SINGLE
后继节点处于阻塞状态(被挂起),当前节点在释放锁或者取消获取锁后,必须唤醒后继节点。为了避免竞争,acquire方法必须首先表明需要一个single信息,然后原子的去重试,如果重试失败,继续阻塞。
CANCELLED
当前节点由于超时或者中断被取消。所有的节点不会丢弃这个状态,而且,一个处于取消状态的节点永远不会再次进入阻塞。
CONDITION
当前节点处于一个condition队列,不会用于sync队列。
PROPAGATE
releaseShared需要传播到其它节点,这是在
doReleaseShared
方法中设置的,以确保能传播。
关于prev和next
prev
prev指向前驱节点,当前节点依赖它做状态检查。在入队的时候被赋值,出队的时候被置为null。如果prev节点被取消,需要为当前节点往前找一个未取消的节点,而且一定是可以找到的,因为头节点不可能处于Cancelled状态。一个节点只有成功获取到了锁,才能成为头节点。一个被取消的节点不可能成功的拿到锁,一个线程只能取消自身,而不能取消其它节点。
next
next指向后继节点,当前节点的线程释放锁的时候,需要唤醒它。入队时的时候被赋值,当要避开被取消的节点时需要调整next指向,出队时被置为null。
上面介绍了AQS中Node类的重要属性,我们继续回到addWaiter
方法:
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode); // 1
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) { // 2
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node); // 3
return node;
}
addWaiter
方法接收一个表示Node模式的操作,这里讲的是ReentrantLock,所以mode是独占模式。
1、首先会创建一个Node节点,thread指向当前线程,mode为独占模式。
2、将tail为节点赋值给pred,如果pred不为null,那么就表示当前等待队列已经被初始化,并且已经设置好了header节点。因为是处于多线程环境,所以会通过CAS的操作去设置尾节点(tail)的值。
3、如果pred为null,也就是尾节点tail为null,那么就表示当前等待队列还未被初始化,那么调用enq
方法进行队列的创建。
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node())) // 1
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) { // 2
t.next = node;
return t;
}
}
}
}
这里使用了自旋的操作,在for循环里不断的尝试,把node节点入队。这里使用自旋,是因为是多线程环境,可能存在多个线程都在对队列进行初始化。
1、使用乐观锁进行设置队列的头。注意,队列的头节点没有存储Thread信息,头节点是一个虚拟节点,不参与锁竞争,初始状态值waitStatus=0;
2、如果在第一步设置头节点失败,那么就意味着其它线程在当前线程之前初始化好了队列,那么当前线程就进入队列尾部。
当前线程入队之后,需要在队列内部继续进行获取锁的操作,这个逻辑在acquireQueued
方法中。
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true; // 表示获取锁成功与否
try {
boolean interrupted = false; // 表示线程是否被中断
for (;;) {
final Node p = node.predecessor(); // 当前线程所在节点的前驱节点
if (p == head && tryAcquire(arg)) { // 1
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt()) // 2
interrupted = true;
}
} finally {
if (failed) // 3
cancelAcquire(node);
}
}
1、判断p是否为头节点,前面讲到头节点是一个虚拟节点,不参与锁竞争。然后调用tryAcquire
方法尝试获取锁。如果获取锁成功,设置头节点为当前线程的节点,在设置之前,会将node中的Thread信息置为nul。
2、如果上一步获取锁还是失败(因为是非公平锁,有可能比新来的线程抢占了),那么调用shouldParkAfterFailedAcquire
方法看是否需要将当前线程挂起。如果不需要挂起,那么继续进入自旋尝试获取锁。如果需要被挂起,那么调用parkAndCheckInterrupt
方法将线程挂起,并检测线程是否中断。
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL) // 1
return true;
if (ws > 0) { // 2
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else { // 3
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
1、只有当当前节点的前驱节点的waitStatus=Node.SINGAL时,返回true,表示当前线程需要等待一个唤醒条件,那么当前节点的线程可以挂起。
2、如果waitStatus>0,那么表示当前节点的前驱节点状态为CANCELLED,那么需要为当前节点需要一个非CANCELLED的节点作为前驱节点。
3、表示waitStatus要么是0,要么是PROPAGATE. 表示当前线程需要一个信号,但是先不挂起,在挂起之前再重试一次。
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
在执行了LockSupport.park(this)
这行代码之后,线程就会进入阻塞。要么被唤醒,要么被中断。如果被唤醒,返回false,否则返回true;Thread.interrupted()
会返回中断标识并清除中断标识。
我们再回到acquireQueued
方法。
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) {
interrupted = true;
}
如果线程被唤醒,那么interrupted还是false,否则会被置为true,表示当前线程被中断,但是当前线程并不退出锁的竞争,而是继续尝试获取锁(不响应中断),在成功获取锁之后,在返回中断标识。
最后在回到acquire
方法
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
在tryAcquire
获取锁失败,并且acquireQueued
方法返回true(中断标识为true),则执行selfInterrupt
方法,中断线程。
注意,Thread.interrupted
方法只是返回中断标识,且会清除中断标识,真正中断线程是调用Thread类的interrupt
方法。
cancelAcquire实现
acquireQueued
方法在finally里面还有判断failed的逻辑,如果failed为true,那么就会执行cancelAcquire
方法,那么来看下该方法做了那么工作。
private void cancelAcquire(Node node) {
// Ignore if node doesn't exist
if (node == null)
return;
node.thread = null; // 1
Node pred = node.prev;
while (pred.waitStatus > 0) // 2
node.prev = pred = pred.prev;
Node predNext = pred.next; // 3
node.waitStatus = Node.CANCELLED; // 4
if (node == tail && compareAndSetTail(node, pred)) { // 5
compareAndSetNext(pred, predNext, null);
} else {
int ws;
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) { // 6
Node next = node.next;
if (next != null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next);
} else { // 7
unparkSuccessor(node);
}
node.next = node; // help GC // 8
}
}
1、首先将当前节点的thread置为null。
2、然后从当前节点往前找,找到状态不为cancelled的节点,将当前节点的prev指向该节点。
3、取出剔除cancelled状态之后的前驱节点的后继节点,其实predNext指向的是当前节点。
4、将当前节点的waitStatus值置为CANCELLED。
前4步执行完之后,当前节点所处的位置就有三种可能,在头节点的下一个节点、在尾节点、既不是头节点的下一个节点也不是尾节点。5,6,7的是这三种情况的说明。
5、是尾节点,并且通过CAS操作尾节点成功(因为是多线程环境,可能存在新的线程入队了,那么当前节点就不会是尾节点)。然后将pred的next节点置为null。
6、既不是头节点的后继节点,也不是尾节点。那么需要确保将当前节点的前驱节点状态为SIGNAL,且前驱节点的Thread信息不能为null。取出当前节点的后继节点,赋值给next,如果next不为null且next的状态值不为CANCELLED,那么就设置当前节点的前驱节点的后继节点为next。最后设置将当前节点的后继节点指向自己。
7、是头节点的后继节点。因为当前节点是头节点的后继节点,当前节点要取消获取锁操作,那么就需要唤醒它的后继节点。调用unparkSuccessor
方法。最后设置将当前节点的后继节点指向自己。
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0); // 如果ws<0将当前node节点的状态值设为0
Node s = node.next; // s指向当前节点的后继节点
if (s == null || s.waitStatus > 0) { // 如果节点为空,或者后继节点也是CANCELLED状态
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0) // 从尾节点开始往前找,找到第一个状态不为CANCELLED的节点,赋值给s
s = t;
}
if (s != null) // 如果找到的节点不为空,则唤醒该节点的Thread
LockSupport.unpark(s.thread);
}
从上面的对节点的处理可以看出,每种情况都不会对操作当前节点的前驱节点。这是因为在前面分析的
shouldParkAfterFailedAcquire
方法,判断节点是否需要挂起的逻辑中,如果当前处理的节点的前驱节点状态为CANCELLED,将CANCELLED状态的节点处理掉。这样的话,就降低了cancelAcquire
方法的实现复杂度。
非公平锁流程概括
公平锁的实现
ReentrantLock要指定为公平锁,需要在构造函数传入执行公平性参数。
ReentrantLock fairLock = new ReentrantLock(true); // 默认为非公平锁,true表示为公平锁
公平锁的lock方法内部实现是调用了acquire方法。
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
这个和非公平锁一样,但是不同点是tryAcquire
方法的实现,在AQS中,该方法抛出一个UnsupportedOperationException,所以要看其子类的具体实现,这里是公平锁,所以看FairSync
的tryAcquire
的实现。
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) { // 1
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) { // 2
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) { // 3
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
1、判断锁的状态值是否是未锁定。
2、判断当前队列里是否存在排队的线程,如果当前线程前没有排队线程,则尝试通过CAS获取锁,如果获取成功,设置锁的独占线程为当前线程。
3、如果锁已经处于锁定状态,则判断占有锁的线程是否为当前线程,如果是当前线程,则对锁的state值累加acquires。表示重入过程。
和非公平锁最重要的区别在于,公平锁在获取锁之前,会判断队列中是否存在排队线程。
hasQueuedPredecessors实现
public final boolean hasQueuedPredecessors() {
Node t = tail; // Read fields in reverse initialization order
Node h = head;
Node s;
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}
在分析非公平锁入队过程中,我们知道其实在初始化头节点的时候,头节点是一个虚节点,不存储线程信息,只是用于通知下一个线程。所以只有当 h != t 成立,才能说明队列中可能存在等待的线程,具体是否存在就需要看 ((s = h.next) == null || s.thread != Thread.currentThread()) 的判断:
当(s=h.next)==null 成立,说明有线程正在对队列进行初始化,因为队列是双向的,在入队的时候,新node的前驱指针先指向尾节点,在通过cas更新尾节点为新node,最后原尾节点的后继指针指向节点。所以才会出现h.next为空。
如果h.next不为空,那么就需要看h.next节点的线程是不是当前线程。
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
入队分析:
公平锁的剩余实现就和非公平锁的实现是一样的了,最重要的就是在获取锁之前,公平锁会判断当前队列是否存在排队的线程。
释放锁的实现
在使用ReentrantLock时,需要显示的释放锁,而且一般都是在finally代码块中释放。如果没有正确的释放锁,那么就会产生死锁。
ReentrantLock lock = new ReentrantLock();
try {
lock.lock();
// do somthing...
} finally {
lock.unlock();
}
下面就从源码的角度分析释放锁的过程。ReentrantLock的lock方法会调用AQS的release方法实现。
public final boolean release(int arg) {
if (tryRelease(arg)) { // 1
Node h = head;
if (h != null && h.waitStatus != 0) // 2
unparkSuccessor(h);
return true;
}
return false;
}
1、首先会调用tryRelease方法更新锁的状态值。tryRelease具体有AQS的子类Sync实现。
protected final boolean tryRelease(int releases) {
int c = getState() - releases; // 将锁的状态值减掉对应释放锁传的值
if (Thread.currentThread() != getExclusiveOwnerThread()) // 判断释放锁的线程是不是占有锁的线程,如果不是就抛异常,防止错误的释放锁,所以只有当占用锁了,才能进行释放锁
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) { // 只有当锁的状态值为0时,才表示锁被释放了。ReentrantLock是可重入锁,所以在使用时重入了多少次,就需要释放多少次。
free = true;
setExclusiveOwnerThread(null); // 将锁的占有线程置为null
}
setState(c); // 更新锁的值
return free;
}
2、当在队列中获取到锁之后,那么头节点会更新为获取到锁的线程节点,并将线程信息置为null。现在当前线程要释放锁,所以要唤醒它的下一个节点。
unparkSuccessor
方法在上面已经分析过了,简单来说就是找到队列中一个状态waitStatus值不为CANCELLED的节点,然后将它唤醒。
可中断的获取锁实现
在获取锁的过程中,如果线程被中断了,线程会感知到,然后退出锁的竞争,并抛出异常。
ReentrantLock lock = new ReentrantLock();
lock.lockInterruptibly();
lockInterruptibly
方法会调用AQS的acquireInterruptibly
实现:
public final void acquireInterruptibly(int arg) throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (!tryAcquire(arg))
doAcquireInterruptibly(arg);
}
tryAcquire
方法在上面已经分析过了,这里不在说明。主要看doAcquireInterruptibly
方法:
private void doAcquireInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException(); // 响应中断
}
} finally {
if (failed)
cancelAcquire(node);
}
}
和非响应中断不同的区别在于,如果线程被中断了,就会抛出一个InterruptedException
。当然在抛出异常之前,会执行finally代码块的逻辑,此时failed的值就是true了,那么就会执行cancelAcquire
方法,该方法在上面也分析过了。
tryLock的实现
tryLock
表示仅仅是try一下,不管能不能获取到锁都会立即返回。带时间参数的表示带时间的尝试,获取到锁立即返回,没有则等待一段时间再返回。
ReentrantLock lock = new ReentrantLock();
lock.tryLock();
lock.tryLock(1000, TimeUnit.MILLISECONDS);
tryLock
方法调用 Sync的nonfairTryAcquire
实现。
public boolean tryLock() {
return sync.nonfairTryAcquire(1);
}
NonfairTryAcquire
是非公平锁的获取实现,和非公平锁不同的时候,如果获取失败就返回,不入队。
带时间的tryLock方法调用AQS的tryAcquireNanos
方法实现
public final boolean tryAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
return tryAcquire(arg) ||
doAcquireNanos(arg, nanosTimeout);
}
当线程被中断,抛出InterruptedException
。tryAcquire的实现,要看锁是公平的还是非公平锁,具体就是FairSync和NonFairSync的实现。如果tryAcquire获取失败就看doAcquireNanos
能否成功。
private boolean doAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (nanosTimeout <= 0L) // 如果等待时间已经到了,直接返回false
return false;
final long deadline = System.nanoTime() + nanosTimeout; // 记录等待截止时间
final Node node = addWaiter(Node.EXCLUSIVE); // 将当前线程入队等待
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return true;
}
nanosTimeout = deadline - System.nanoTime();
if (nanosTimeout <= 0L) // 等待的时间已经到了,返回获取锁失败,failed=true
return false;
if (shouldParkAfterFailedAcquire(p, node) &&
nanosTimeout > spinForTimeoutThreshold) // 如果需要挂起并且剩余等待时间大于最大自旋时间。就将线程挂起
LockSupport.parkNanos(this, nanosTimeout);
if (Thread.interrupted()) // 如果线程被中断了,那就抛出异常
throw new InterruptedException();
}
} finally {
if (failed) // 线程超时获取锁失败,或者被中断,那么就要退出队列
cancelAcquire(node);
}
}
END。
以上是关于ReentrantLock源码分析的主要内容,如果未能解决你的问题,请参考以下文章