JDK类库源码分析系列2-AbstractQueuedSynchronizer-ReentrantReadWriteLock
Posted _微风轻起
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了JDK类库源码分析系列2-AbstractQueuedSynchronizer-ReentrantReadWriteLock相关的知识,希望对你有一定的参考价值。
一、简介说明
这一篇我们来介绍下AbstractQueuedSynchronizer
的另一个应用类ReentrantReadWriteLock
。与之对应的是类似前面一篇的ReentrantLock
,不过ReentrantLock
其是以独占的模式获取锁,目前我们这个ReentrantReadWriteLock
是有两用的,一种是读锁,也就是共享锁,另一种是Write
写锁,也就是独占锁。
下面我们就来具体梳理下这个类。
这里我们可以通过一个很简单的例子来梳理逻辑。
public class ReentrantWriteReadLockMain {
public static void main(String[] args) {
ReentrantReadWriteLock reentrantReadWriteLock = new ReentrantReadWriteLock();
ReentrantReadWriteLock.ReadLock readLock = reentrantReadWriteLock.readLock();
ReentrantReadWriteLock.WriteLock writeLock = reentrantReadWriteLock.writeLock();
new Thread(new Runnable() {
@Override
public void run() {
readLock.lock();
try {
Thread.sleep(5000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
new Thread(new Runnable() {
@Override
public void run() {
readLock.lock();
try {
Thread.sleep(5000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
new Thread(new Runnable() {
@Override
public void run() {
readLock.lock();
try {
Thread.sleep(5000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
new Thread(new Runnable() {
@Override
public void run() {
readLock.lock();
try {
Thread.sleep(5000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
System.out.println("Hello ...........");
}
}
在这个基础上,可以看自己想梳理什么过程,再进行相应的变动。
一、基本结构
1、ReentrantReadWriteLock
public class ReentrantReadWriteLock
implements ReadWriteLock, java.io.Serializable {
其有实现ReadWriteLock
接口。
2、ReadWriteLock
public interface ReadWriteLock {
Lock readLock();
Lock writeLock();
}
这个接口其定义了两个方法,一个获取读锁,一个获取写锁。
二、变量
1、readerLock
private final ReentrantReadWriteLock.ReadLock readerLock;
读锁对象。
2、writerLock
private final ReentrantReadWriteLock.WriteLock writerLock;
写锁对象。
3、sync
final Sync sync;
Sync
同步对象。
三、Sync子类
1、结构
abstract static class Sync extends AbstractQueuedSynchronizer {
其也是直接继承的AQS
。
2、变量
1)、计算值
static final int SHARED_SHIFT = 16;
static final int SHARED_UNIT = (1 << SHARED_SHIFT);
static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
/** Returns the number of shared holds represented in count. */
static int sharedCount(int c) { return c >>> SHARED_SHIFT; }
/** Returns the number of exclusive holds represented in count. */
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
我们将这4个变量与两个获取值的方法放在一起,这里的SHARED_SHIFT
是一个转换基数。
SHARED_UNIT
这个是被ReadLock
读锁用来计数的,也就是设置status
的值,每个获取锁的线程分别计数。其的初始值是1 << SHARED_SHIFT
,也就是往前移动16未,然后其是往前高位累加的,也就是说其初始值是1 << SHARED_SHIFT
,然后同一线程获取第二次就是SHARED_UNIT + SHARED_UNIT
,也就是高位+1
,这里要理解,然后要获取其的计算就是sharedCount(int c)
方法,也就是c >>> SHARED_SHIFT
将其再往后移16位。
MAX_COUNT
是单个线程最大的获取status
。
EXCLUSIVE_MASK
这个是用于独占锁计数的,其的初始值是c & EXCLUSIVE_MASK
,也就是其从低位到高位都是1
,然后其status
的获取是通过exclusiveCount(int c)
(c & EXCLUSIVE_MASK
),这里是&
运算,其是从低位0
到高位。
这里用左移、右移、且这些计算是能加快速度,然后共享与独占一个高位计数一个低位,这样就能通过status
完成两种模式即Read
、Write
的总计数了。
2)、readHolds
private transient ThreadLocalHoldCounter readHolds;
这个就是前面的ThreadLocalHoldCounter
,被读锁使用,如果线程的status
为0,就会将其remove
。
3)、cachedHoldCounter
private transient HoldCounter cachedHoldCounter;
缓存值,指向最后使用的线程对应的HoldCounter
。
4)、firstReader
private transient Thread firstReader;
指向队首的线程。
5)、firstReaderHoldCount
private transient int firstReaderHoldCount;
队首的计数。
3、方法
1)、readerShouldBlock()
abstract boolean readerShouldBlock();
读是否需要被阻塞。
2)、writerShouldBlock()
abstract boolean writerShouldBlock();
写是否需要被阻塞。
3)、tryAcquire(int acquires)
protected final boolean tryAcquire(int acquires) {
Thread current = Thread.currentThread();
int c = getState();
int w = exclusiveCount(c);
if (c != 0) {
// (Note: if c != 0 and w == 0 then shared count != 0)
if (w == 0 || current != getExclusiveOwnerThread())
return false;
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// Reentrant acquire
setState(c + acquires);
return true;
}
if (writerShouldBlock() ||
!compareAndSetState(c, c + acquires))
return false;
setExclusiveOwnerThread(current);
return true;
}
尝试获取独占锁,整体与我们前面文章的流程差不多。
这里是先判断c != 0
也就是有没有已经获取锁,如果有就是这个分支:判断w == 0
,这里我们要注意为什么c !=0
但w == 0
,因为我们有提到,高位计数是用于共享的,而低位才是用于独占的,这个分支就是如果共享有获取,但目前独占锁还没有获取、或当前线程并不是获取锁的线程就返回false
获取失败。
如果独占已经有获取了,并且目前独占线程就是当前线程,判断是否有超过最大限制MAX_COUNT
,没有超过就通过setState(c + acquires)
设置status
。返回获取成功。
如果最开始c ==0
,也就是共享独占都没有获取。先通过writerShouldBlock()
判断是否阻塞,如果true
或者CAS
设置status
竞争失败,我们返回false
。如果不满足这些条件,就表示我们CAS
设置status
成功了,就可以设置独占线程为当前线程了,因为我们是最开始获取的c = = 0
。
4)、tryRelease(int releases)
protected final boolean tryRelease(int releases) {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
int nextc = getState() - releases;
boolean free = exclusiveCount(nextc) == 0;
if (free)
setExclusiveOwnerThread(null);
setState(nextc);
return free;
}
尝试释放锁,首先是通过isHeldExclusively()
,判断当前线程是不是占有线程,然后判断本次释放后status
释放为0,如果是就表示释放完成,就通过setExclusiveOwnerThread(null)
将独占线程置为null
,最后通过setState(nextc)
设置status
。
protected final boolean isHeldExclusively() {
return getExclusiveOwnerThread() == Thread.currentThread();
}
5)、tryAcquireShared(int unused)
protected final int tryAcquireShared(int unused) {
Thread current = Thread.currentThread();
int c = getState();
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return -1;
int r = sharedCount(c);
if (!readerShouldBlock() &&
r < MAX_COUNT &&
compareAndSetState(c, c + SHARED_UNIT)) {
if (r == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null ||
rh.tid != LockSupport.getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
return 1;
}
return fullTryAcquireShared(current);
}
这里我们首先明白:
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return -1;
这里的exclusiveCount(c) != 0
,也就是独占不为0
(共享是高位,则低位&
运算必定为0)。也就是说,如果有独占写锁,并且这个写锁线程不是当前线程,则返回-1
表示获取失败。如果当前ReentrantReadWriteLock
还没有用到写锁
,则就往后继续。
判断如果不需要readerShouldBlock()
并且目前的共享计数sharedCount(c)
没有超过MAX_COUNT
,就能通过compareAndSetState(c, c + SHARED_UNIT)
设置status
了,c + SHARED_UNIT
就是高位计数,用于共享读锁计数统计。然后如果设置成功,则判断,如果其sharedCount(c)
为0
,也就是还没有共享获取,就将其设置为firstReader
指向头,firstReaderHoldCount
设置为1
。如果其不为0
,也就是现在有共享获取,就判断头线程是不是当前线程,是的话就自增。如果也不是本身,就获取本次的上次计数缓存cachedHoldCounter
,如果最开始为null
(rh == null
),或者不是当前线程,则通过readHolds
(ThreadLocal
)获取到当前线程的HoldCounter
,然后计数rh.count++
,最后返回1
表示获取成功。
同时我们看到上面这些是单个流程,如果这个单个获取失败,例如CAS
设置status
获取失败,就通过下面的fullTryAcquireShared(current)
来满获取,也就是for(;;)
。
6)、fullTryAcquireShared(Thread current)
final int fullTryAcquireShared(Thread current) {
HoldCounter rh = null;
for (;;) {
int c = getState();
if (exclusiveCount(c) != 0) {
if (getExclusiveOwnerThread() != current)
return -1;
// else we hold the exclusive lock; blocking here
// would cause deadlock.
} else if (readerShouldBlock()) {
// Make sure we're not acquiring read lock reentrantly
if (firstReader == current) {
// assert firstReaderHoldCount > 0;
} else {
if (rh == null) {
rh = cachedHoldCounter;
if (rh == null ||
rh.tid != LockSupport.getThreadId(current)) {
rh = readHolds.get();
if (rh.count == 0)
readHolds.remove();
}
}
if (rh.count == 0)
return -1;
}
}
if (sharedCount(c) == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
if (compareAndSetState(c, c + SHARED_UNIT)) {
if (sharedCount(c) == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
if (rh == null)
rh = cachedHoldCounter;
if (rh == null ||
rh.tid != LockSupport.getThreadId(current))
rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
cachedHoldCounter = rh; // cache for release
}
return 1;
}
}
}
这个大概的与上面是相似的,主要是for (;;)
,这里就不具体分析了。
7)、tryReleaseShared(int unused)
protected final boolean tryReleaseShared(int unused) {
Thread current = Thread.currentThread();
if (firstReader == current) {
// assert firstReaderHoldCount > 0;
if (firstReaderHoldCount == 1)
firstReader = null;
else
firstReaderHoldCount--;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null ||
rh.tid != LockSupport.getThreadId(current))
rh = readHolds.get();
int count = rh.count;
if (count <= 1) {
readHolds.remove();
if (count <= 0)
throw unmatchedUnlockException();
}
--rh.count;
}
for (;;) {
int c = getState();
int nextc = c - SHARED_UNIT;
if (compareAndSetState(c, nextc))
// Releasing the read lock has no effect on readers,
// but it may allow waiting writers to proceed if
// both read and write locks are now free.
return nextc == 0;
}
}
共享释放,这个流程也比较简单,如果头线程是当前线程,当firstReaderHoldCount
计数为1,就表示其已经要释放完了,就置为null
,不是就自减。
不是当前线程。就是cachedHoldCounter
、readHolds
使用,也是判断是否需要readHolds.remove()
,并--rh.count
。最后再是for(;;)
设置status
。
8)、tryWriteLock()
final boolean tryWriteLock() {
Thread current = Thread.currentThread();
int c = getState();
if (c != 0) {
int w = exclusiveCount(c);
if (w == 0 || current != getExclusiveOwnerThread())
return false;
if (w == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
}
if (!compareAndSetState(c, c + 1))
return false;
setExclusiveOwnerThread(current);
return true;
}
尝试写锁获取,写锁是独占锁,所以如果current != getExclusiveOwnerThread()
,或还没有独占,就返回false
。如果为status == 0
,就CAS
设置status
如果成功就设置当前线程为独占线程,如果失败就false
。
9)、tryReadLock()
final boolean tryReadLock() {
Thread current = Thread.currentThread();
for (;;) {
int c = getState();
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return false;
int r = sharedCount(c);
if (r == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
if (compareAndSetState(c, c + SHARED_UNIT)) {
if (r == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null ||
rh.tid != LockSupport.getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
return true;
}
}
}
尝试读锁获取,这里大体与上面的 以上是关于JDK类库源码分析系列2-AbstractQueuedSynchronizer-ReentrantReadWriteLock的主要内容,如果未能解决你的问题,请参考以下文章 JDK类库源码分析系列2-AbstractQueuedSynchronizer-CountDownLatch JDK类库源码分析系列2-AbstractQueuedSynchronizer-ReentrantReadWriteLocktryAcquireSh