ReentrantReadWriteLock 源码分析
Posted zhuxudong
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了ReentrantReadWriteLock 源码分析相关的知识,希望对你有一定的参考价值。
ReentrantReadWriteLock
1)获取顺序:
非公平模式(默认):连续竞争的非公平锁可能无限期地推迟一个或多个 reader 或 writer 线程,但吞吐量通常要高于公平锁。
公平模式:当某个线程释放当前保持的锁时,可以为等待时间最长的单个 writer 线程分配写入锁,如果有一组等待时间大于所有正在等待的 writer 线程的 reader 线程,则将为该组分配读取锁。
2)重入:此锁允许 reader 和 writer 按照 ReentrantLock 的样式重新获取读取锁或写入锁。
3)锁降级:重入还允许从写入锁降级为读取锁,其实现方式是:先获取写入锁,然后获取读取锁,最后释放写入锁。但是,从读取锁升级到写入锁是不可能的。
4)锁获取的中断:读取锁和写入锁都支持锁获取期间的中断。
5)同步状态值的高 16 位保存读取锁被持有的次数,低 16 位保存写入锁被持有的次数。
创建实例
/** 内部类实现的读锁 */
private final ReentrantReadWriteLock.ReadLock readerLock;
/** 内部类实现的写锁 */
private final ReentrantReadWriteLock.WriteLock writerLock;
/** 实现锁操作的同步器 */
final Sync sync;
/**
* 创建一个非公平的读写锁实例
*/
public ReentrantReadWriteLock() {
this(false);
}
/**
* 1)fair=true,创建一个公平的读写锁实例。
* 2)fair=false,创建一个非公平的读写锁实例。
*/
public ReentrantReadWriteLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
// 基于当期实例创建读锁和写锁
readerLock = new ReadLock(this);
writerLock = new WriteLock(this);
}
读锁获取:ReadLock#lock
/**
* 1)如果写锁没有被其他线程持有,则成功获取读锁并返回
* 2)写锁被其他线程持有,则进入阻塞状态。
*/
@Override
public void lock() {
sync.acquireShared(1);
}
AbstractQueuedSynchronizer#acquireShared
/**
* 在共享模式下获取锁,忽略线程中断。
*/
public final void acquireShared(int arg) {
// 尝试获取共享锁
if (tryAcquireShared(arg) < 0) {
// 再次获取共享锁
doAcquireShared(arg);
}
}
ReentrantReadWriteLock#Sync
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 6317671515068378041L;
/**
* 高 16 位记录写锁持有次数,低 16 位记录读锁持有次数
*/
static final int SHARED_SHIFT = 16;
static final int SHARED_UNIT = 1 << Sync.SHARED_SHIFT;
static final int MAX_COUNT = (1 << Sync.SHARED_SHIFT) - 1;
static final int EXCLUSIVE_MASK = (1 << Sync.SHARED_SHIFT) - 1;
/** 读锁被持有的计数值 */
static int sharedCount(int c) { return c >>> Sync.SHARED_SHIFT; }
/** 写锁被持有的计数值 */
static int exclusiveCount(int c) { return c & Sync.EXCLUSIVE_MASK; }
/**
* 每个线程的读锁保持计数器
*/
static final class HoldCounter {
int count; // initially 0
// Use id, not reference, to avoid garbage retention
final long tid = LockSupport.getThreadId(Thread.currentThread());
}
/**
* 持有读锁计数器的线程局部对象
*/
static final class ThreadLocalHoldCounter
extends ThreadLocal<HoldCounter> {
@Override
public HoldCounter initialValue() {
return new HoldCounter();
}
}
/**
* 当前线程持有的读锁计数器对象,在创建时初始化,当读锁释放时移除
*/
private transient ThreadLocalHoldCounter readHolds;
/**
* 最近一个成功获取读锁的线程的读锁持有计数器
*/
private transient HoldCounter cachedHoldCounter;
/**
* 第一个获取读锁的线程
*/
private transient Thread firstReader;
/**
* 第一个获取读锁的线程持有读锁的计数值
*/
private transient int firstReaderHoldCount;
Sync() {
readHolds = new ThreadLocalHoldCounter();
setState(getState()); // ensures visibility of readHolds
}
/**
* 非公平同步器
*/
static final class NonfairSync extends Sync {
private static final long serialVersionUID = -8159625535654395037L;
@Override
boolean writerShouldBlock() {
return false; // writers can always barge
}
@Override
boolean readerShouldBlock() {
// 避免获取写锁的线程饥饿
return apparentlyFirstQueuedIsExclusive();
}
}
AbstractQueuedSynchronizer#apparentlyFirstQueuedIsExclusive
/**
* 同步队列中第一个等待获取锁的线程,需要获取独占的写锁,则返回 true
*/
final boolean apparentlyFirstQueuedIsExclusive() {
Node h, s;
// 头结点、第二个节点都不为 null,节点处于独占模式,并且节点上有驻留线程,表示有线程在等待获取写锁。
return (h = head) != null &&
(s = h.next) != null &&
!s.isShared() &&
s.thread != null;
}
ReentrantReadWriteLock#Sync#tryAcquireShared
@Override
@ReservedStackAccess
protected final int tryAcquireShared(int unused) {
// 读取当前线程
final Thread current = Thread.currentThread();
// 读取同步状态
final int c = getState();
/**
* 1)如果写锁已经被线程持有,并且不是当前线程,则获取失败,返回 -1。
*/
if (Sync.exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current) {
return -1;
}
/**
* 2)写锁未被任何线程持有,或写锁被当前线程持有。
*/
// 读取读锁计数值
final int r = Sync.sharedCount(c);
/**
* 获取读锁的线程是否应该被阻塞【同步队列第一个阻塞线程在等待获取写锁】
* && 读锁的占用计数值 < 65535
* && 比较设置读锁的新计数值
*/
if (!readerShouldBlock() &&
r < Sync.MAX_COUNT &&
compareAndSetState(c, c + Sync.SHARED_UNIT)) {
// 1)如果是第一个获取读锁的线程
if (r == 0) {
firstReader = current;
firstReaderHoldCount = 1;
// 2)当前线程是第一个获取读锁的线程,并在重复获取读锁
} else if (firstReader == current) {
// 累积读锁持有计数值
firstReaderHoldCount++;
// 3)当前线程不是第一个获取读锁的线程
} else {
// 读取最近一个成功获取读锁的线程的读锁持有计数器
HoldCounter rh = cachedHoldCounter;
// 如果最近获取读锁的线程不是当前线程
if (rh == null ||
rh.tid != LockSupport.getThreadId(current)) {
// 获取当前线程的持有计数器
cachedHoldCounter = rh = readHolds.get();
} else if (rh.count == 0) {
readHolds.set(rh);
}
// 递增计数值
rh.count++;
}
return 1;
}
// 写锁已经被当前线程持有,正在获取读锁
return fullTryAcquireShared(current);
}
/**
* Full version of acquire for reads, that handles CAS misses
* and reentrant reads not dealt with in tryAcquireShared.
*/
final int fullTryAcquireShared(Thread current) {
HoldCounter rh = null;
for (;;) {
// 读取同步状态
final int c = getState();
// 1)有线程持有写锁
if (Sync.exclusiveCount(c) != 0) {
// 如果不是当前线程持有,则返回 -1
if (getExclusiveOwnerThread() != current)
{
return -1;
// else we hold the exclusive lock; blocking here
// would cause deadlock.
}
// 2)写锁没有被任何线程持有
} else if (readerShouldBlock()) {
// Make sure we‘re not acquiring read lock reentrantly
if (firstReader == current) {
// assert firstReaderHoldCount > 0;
} else {
if (rh == null) {
rh = cachedHoldCounter;
if (rh == null ||
rh.tid != LockSupport.getThreadId(current)) {
rh = readHolds.get();
// 读锁持有计数值为 0,则移除计数器
if (rh.count == 0) {
readHolds.remove();
}
}
}
if (rh.count == 0) {
return -1;
}
}
}
// 读锁的获取计数值已经为最大值
if (Sync.sharedCount(c) == Sync.MAX_COUNT) {
throw new Error("Maximum lock count exceeded");
}
// 比较更新读锁的计数值
if (compareAndSetState(c, c + Sync.SHARED_UNIT)) {
// 1)当前线程是第一个获取读锁的线程
if (Sync.sharedCount(c) == 0) {
firstReader = current;
firstReaderHoldCount = 1;
// 2)当前线程是第一个获取读锁的线程,重复获取
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
if (rh == null) {
rh = cachedHoldCounter;
}
if (rh == null ||
rh.tid != LockSupport.getThreadId(current)) {
rh = readHolds.get();
} else if (rh.count == 0) {
readHolds.set(rh);
}
rh.count++;
cachedHoldCounter = rh; // cache for release
}
return 1;
}
}
}
AbstractQueuedSynchronizer#doAcquireShared
/**
* 在共享模式下获取锁,线程能响应中断
*/
private void doAcquireShared(int arg) {
// 创建一个共享节点
final Node node = addWaiter(Node.SHARED);
boolean interrupted = false;
try {
for (;;) {
// 读取前置节点
final Node p = node.predecessor();
// 前置节点是头节点
if (p == head) {
// 尝试获取锁
final int r = tryAcquireShared(arg);
/**
* 第一个等待获取写锁的线程已经成功获取写锁,并且已经使用完毕而释放,
* 当前线程成功获取读锁。
*/
if (r >= 0) {
// 设置当前节点为新的头节点,并尝试将信号往后传播,唤醒等待获取读锁的线程
setHeadAndPropagate(node, r);
p.next = null; // help GC
return;
}
}
// 当前节点的驻留线程需要被阻塞,则阻塞当前线程
if (AbstractQueuedSynchronizer.shouldParkAfterFailedAcquire(p, node)) {
interrupted |= parkAndCheckInterrupt();
}
}
} catch (final Throwable t) {
cancelAcquire(node);
throw t;
} finally {
if (interrupted) {
AbstractQueuedSynchronizer.selfInterrupt();
}
}
}
AbstractQueuedSynchronizer#setHeadAndPropagate
/**
* 设置头节点,如果同步队列中有等待获取读锁的线程,则尝试唤醒
*/
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
setHead(node);
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
// 读取当前节点的后置节点,如果其为 null 或处于共享模式【等待获取读锁】
final Node s = node.next;
if (s == null || s.isShared()) {
// 尝试释放后继节点
doReleaseShared();
}
}
}
AbstractQueuedSynchronizer#doReleaseShared
/**
* Release action for shared mode -- signals successor and ensures
* propagation. (Note: For exclusive mode, release just amounts
* to calling unparkSuccessor of head if it needs signal.)
*/
private void doReleaseShared() {
for (;;) {
// 读取头节点
final Node h = head;
// 头结点和尾节点不相等,表示有线程在等待获取锁
if (h != null && h != tail) {
// 读取头节点的同步状态
final int ws = h.waitStatus;
// 后继节点需要被唤醒
if (ws == Node.SIGNAL) {
// 比较更新同步状态
if (!h.compareAndSetWaitStatus(Node.SIGNAL, 0))
{
// 更新失败则再次确认
continue; // loop to recheck cases
}
// 同步状态成功更新为 0,则唤醒后继节点的驻留线程
unparkSuccessor(h);
}
// 同步状态为 0,并且比较更新为 PROPAGATE 失败,则继续循环
else if (ws == 0 &&
!h.compareAndSetWaitStatus(0, Node.PROPAGATE))
{
continue; // loop on failed CAS
}
}
/**
* 1)后继节点被唤醒并且还未成功获取到锁,则直接退出循环,此时只唤醒了一个线程。
* 2)被唤醒的后继节点成功获取到读锁,驻留线程已经被释放,此时头节点已经改变,则进行重试。
*/
if (h == head) {
break;
}
}
}
读锁释放:ReadLock#unlock
/**
* 释放读锁,如果读锁的持有计数值为 0,则写锁可以被获取。
*/
@Override
public void unlock() {
sync.releaseShared(1);
}
@Override
@ReservedStackAccess
protected final boolean tryReleaseShared(int unused) {
// 读取当前线程
final Thread current = Thread.currentThread();
// 1)当前线程是获取读锁的第一个线程
if (firstReader == current) {
// 读锁持有计数为 1,则释放后为 0,
if (firstReaderHoldCount == 1) {
firstReader = null;
} else {
// 读锁被当前线程多次持有,则递减读锁持有计数值
firstReaderHoldCount--;
}
// 2)当前线程不是持有读锁的线程
} else {
// 最近持有读锁的线程计数值
HoldCounter rh = cachedHoldCounter;
if (rh == null ||
rh.tid != LockSupport.getThreadId(current)) {
// 读取当前线程的读锁持计数值对象
rh = readHolds.get();
}
final int count = rh.count;
if (count <= 1) {
// 如果为 1,则移除线程局部对象
readHolds.remove();
if (count <= 0) {
throw Sync.unmatchedUnlockException();
}
}
// 递减计数值
--rh.count;
}
// 原子更新同步状态值
for (;;) {
final int c = getState();
final int nextc = c - Sync.SHARED_UNIT;
if (compareAndSetState(c, nextc)) {
// Releasing the read lock has no effect on readers,
// but it may allow waiting writers to proceed if
// both read and write locks are now free.
return nextc == 0;
}
}
}
写锁获取:WriteLock#lock
ReentrantReadWriteLock#Sync
@Override
@ReservedStackAccess
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
// 读取同步状态值
final int c = getState();
// 写锁被持有的计数值
final int w = Sync.exclusiveCount(c);
// 1)读锁或写锁至少有一个被线程持有
if (c != 0) {
/**
* 2)读锁被线程持有,或当前线程不是写锁持有线程。
* 目标线程不允许先获取读锁,后获取写锁,即 ReentrantReadWriteLock 不支持锁升级。
*/
if (w == 0 || current != getExclusiveOwnerThread()) {
// 尝试获取失败
return false;
}
// 写锁重入次数超出最大值
if (w + Sync.exclusiveCount(acquires) > Sync.MAX_COUNT) {
throw new Error("Maximum lock count exceeded");
}
// 更新同步状态,写锁获取成功
setState(c + acquires);
return true;
}
// 读锁和写锁都未被线程持有,则原子更新同步状态
if (writerShouldBlock() ||
!compareAndSetState(c, c + acquires)) {
return false;
}
// 设置写锁的独占线程为当前线程
setExclusiveOwnerThread(current);
return true;
}
写锁释放:WriteLock#unlock
/**
* 释放锁,写锁已经被释放,则返回 true
*/
@Override
@ReservedStackAccess
protected final boolean tryRelease(int releases) {
// 写锁是否被当前线程持有
if (!isHeldExclusively()) {
throw new IllegalMonitorStateException();
}
// 计算同步状态值,写锁计数值保存在低 16 位
final int nextc = getState() - releases;
// 新的写锁计数值为 0,则表示读写锁已经自由了
final boolean free = Sync.exclusiveCount(nextc) == 0;
if (free) {
// 清空独占锁持有线程
setExclusiveOwnerThread(null);
}
setState(nextc);
return free;
}
以上是关于ReentrantReadWriteLock 源码分析的主要内容,如果未能解决你的问题,请参考以下文章
Java多线程——ReentrantReadWriteLock源码阅读