ReentrantReadWriteLock详解
Posted truestoriesavici01
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了ReentrantReadWriteLock详解相关的知识,希望对你有一定的参考价值。
ReentrantReadWriteLock详解
简介
特点:
- ReentrantReadWriteLock允许多个读线程同时访问,不允许写线程和读线程,写线程和写线程同时访问.
- 一般情况下,共享数据的读操作远多于写操作,比ReentrantLock提供更好的并发性和吞吐量.
- 读写锁内部维护两个锁:
- 读操作
- 写操作
必须保证获取读锁的线程可以看到写入锁之前版本所做的更新.
功能:
- 支持公平和非公平的获取锁的方式.
- 支持可重入.读线程在获取读锁后还可以获得读锁.写线程获取写锁后可以再次获得写锁或者读锁.
- 允许从写入锁降级为读取锁:先获取写锁,再获取读锁,最后释放写锁.不允许从读锁升级到写锁.
- 读锁和写锁都支持锁获取期间的中断.
- Condition支持.写入锁提供一个Condition实现,读取锁不支持Condition(抛出UnsupportedOperationException).
使用示例
在更新缓存后进行锁降级操作
class CachedData{
Object data; // 缓存的数据
volatile boolean cacheValid; // 缓存有效性标识
final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
void processCachedData(){
rwl.readLock().lock(); // 先获取读锁
if(!cacheValid){ // 若缓存过期,释放读锁,获取写锁
rwl.readLock().unlock();
rwl.writeLock().lock();
try{
if(!cacheValid){ // 对缓存进行更新
// 数据更新操作
cacheValid = true;
}
rwl.readLock().lock(); // 获得读锁
} finally{
rwl.writeLock().unlock(); // 释放写锁
}
}
try{
// 使用缓存数据
} finally{
rwl.readLock().unlock(); // 获取读锁
}
}
}
用来提高并发性能
在使用集合的情况下,当集合很大,并且读线程远多于写线程时.
class RWDictoary{
private final Map<String,Data> m = new TreeMap<String,Data>();
private final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
private final Lock r = rwl.readLock();
private final Lock w = rwl.writeLock();
public Data get(String key){
r.lock();
try{ return m.get(key);}
finally{r.unlock();}
}
public String[] allKeys(){
r.lock();
try{ return m.keySet().toArray();}
finally { r.unlock();}
}
public Data put(String key, Data value){
w.lock();
try{return m.put(key,value);}
finally{w.unlock();}
}
public void clear(){
w.lock();
try{m.clear();}
finally{w.unlock();}
}
}
实现原理
- 基于AQS实现.
- 自定义同步器使用同步状态(整型变量
state
)维护多个读线程和一个写线程的状态. - 使用按位切割使用状态变量:
state
的高16位表示读,低16位表示写.
域
包含两把锁:
readerLock
:读锁writerLock
:写锁
写锁的获取与释放
获取
// ReentrantReadWriteLock中的内部类中的lock()
public void lock() {
sync.acquire(1);
}
// AbstractQueuedSynchronizer中的acquire()
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
// ReentrantReadWriteLock中内部类Sync的tryAcquire()
protected final boolean tryAcquire(int acquires) {
Thread current = Thread.currentThread();
int c = getState();
int w = exclusiveCount(c);
if (c != 0) {
if (w == 0 || current != getExclusiveOwnerThread())
return false;
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// Reentrant acquire
setState(c + acquires);
return true;
}
if (writerShouldBlock() ||
!compareAndSetState(c, c + acquires))
return false;
setExclusiveOwnerThread(current);
return true;
}
流程:
- 若当前有线程获取读锁或写锁,则检查:
- 若没有线程获取写锁(即:有线程已获取读锁)或者当前线程非独占写锁 == > 获取写锁失败
- 否则检查获取写锁后写锁总量有没有超过限定,超过抛出异常.
- 若没有超过,则更新锁的状态,获取锁成功.
- 若当前没有线程获取读锁或写锁,则:
- 检查写线程是否需要阻塞,若要阻塞,则阻塞写线程,获取写锁失败.
- 否则将当前线程设为独占写锁 == > 获取写锁成功.
获取写锁:
注:
对于writeShouldBlock()
表示是否需要阻塞,NofairSync
和FairSync
的实现不同:
NofairSync
:直接返回false,不需要阻塞,可以插队.FairSync
:若有前驱节点,则返回true,需要阻塞,否则返回false.
释放
// ReentrantReadWriteLock中WriteLock的unlock()
public void unlock() {
sync.release(1);
}
// AbstractQueuedSynchronizer中的release()
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
// ReentrantReadWriteLock中的内部类Sync中的tryRelease()
protected final boolean tryRelease(int releases) {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
int nextc = getState() - releases;
boolean free = exclusiveCount(nextc) == 0;
if (free)
setExclusiveOwnerThread(null);
setState(nextc);
return free;
}
// ReentrantReadWriteLock中内部类Sync中的unparkSuccessor()
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
流程:
尝试释放锁:
- 释放成功 == > 唤醒后继线程 == > 释放锁成功
- 释放失败
尝试释放锁:
- 当前线程是否为独占线程:
- 否 == > 释放失败
- 是 == > 检查队列状态
state
:
- 队列状态为0:
- 为0(无线程持有锁) == > 设独占线程为null == > 释放成功
- 非0 == > 释放成功
唤醒后继线程:
- head线程是否可运行
- 可运行 == > 设为待运行
- head线程的后继节点为null或被取消请求:
- 从后先前寻找最靠前的非null节点,作为后继节点.
- 若后继节点非null,则唤醒该节点
释放写锁:
读锁的获取和释放
获取
// ReentrantReadWriteLock中内部类ReadLock中的lock()
public void lock() {
sync.acquireShared(1);
}
// AQS的acquireShared()
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
// ReentrantReadWriteLock的内部类Sync的tryAcquireShared()
protected final int tryAcquireShared(int unused) {
Thread current = Thread.currentThread();
int c = getState();
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return -1;
int r = sharedCount(c);
if (!readerShouldBlock() &&
r < MAX_COUNT &&
compareAndSetState(c, c + SHARED_UNIT)) {
if (r == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
return 1;
}
return fullTryAcquireShared(current);
}
final int fullTryAcquireShared(Thread current) {
HoldCounter rh = null;
for (;;) {
int c = getState();
if (exclusiveCount(c) != 0) {
if (getExclusiveOwnerThread() != current)
return -1;
// else we hold the exclusive lock; blocking here
// would cause deadlock.
} else if (readerShouldBlock()) {
// Make sure we‘re not acquiring read lock reentrantly
if (firstReader == current) {
// assert firstReaderHoldCount > 0;
} else {
if (rh == null) {
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current)) {
rh = readHolds.get();
if (rh.count == 0)
readHolds.remove();
}
}
if (rh.count == 0)
return -1;
}
}
if (sharedCount(c) == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
if (compareAndSetState(c, c + SHARED_UNIT)) {
if (sharedCount(c) == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
if (rh == null)
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
cachedHoldCounter = rh; // cache for release
}
return 1;
}
}
}
// AQS的doAcquireShared()
private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
流程:
首先试图获取读锁:
- 若存在写锁并且当前线程为持有写锁 == > 获取读锁失败
- 若当前线程无需阻塞并且持有的读锁小于最大值并且CAS更新状态:
- 若为第一个读线程,设置其为
firstReader
. - 若当前线程已获取读锁,则更新线程状态(+1).
- 若非第一个读线程并且首次获取读锁,则更新读线程缓存和线程状态.
- 若为第一个读线程,设置其为
- 若获取锁失败,则:循环上述步骤尝试获取读锁。
若读锁获取失败,则:
- 检查前驱节点是否是head,若不是则,阻塞自己,等待唤醒后重试。若被中断,则取消请求锁。
- 若前驱节点是head,则尝试获取锁。若获取成功,则设置下一节点为待处理的线程。若获取失败,则循环重试。
释放
// ReentrantReadWriteLock的内部类ReadLock中的unlock()
public void unlock() {
sync.releaseShared(1);
}
// AQS中的releaseShared()
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
// ReentrantReadWriteLock的内部类Sync中的tryReleaseShared()
protected final boolean tryReleaseShared(int unused) {
Thread current = Thread.currentThread();
if (firstReader == current) {
// assert firstReaderHoldCount > 0;
if (firstReaderHoldCount == 1)
firstReader = null;
else
firstReaderHoldCount--;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
int count = rh.count;
if (count <= 1) {
readHolds.remove();
if (count <= 0)
throw unmatchedUnlockException();
}
--rh.count;
}
for (;;) {
int c = getState();
int nextc = c - SHARED_UNIT;
if (compareAndSetState(c, nextc))
// 释放读锁对于读线程没有影响,但是若读写锁均被释放,则写线程可以得到处理
return nextc == 0;
}
}
// AQS的doReleaseShared()
private void doReleaseShared() {
/*
* Ensure that a release propagates, even if there are other
* in-progress acquires/releases. This proceeds in the usual
* way of trying to unparkSuccessor of head if it needs
* signal. But if it does not, status is set to PROPAGATE to
* ensure that upon release, propagation continues.
* Additionally, we must loop in case a new node is added
* while we are doing this. Also, unlike other uses of
* unparkSuccessor, we need to know if CAS to reset status
* fails, if so rechecking.
*/
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
流程:
- 若当前线程为第一个读线程,则
- 若读线程数为1,则设置第一个读线程为null
- 否则第一个读线程数减一
- 若非第一个读线程,则更新缓存中的信息
- 循环尝试:更新锁状态
state
,更新成功则返回。
锁升降级
锁降级:写锁降级为读锁。
流程:线程持有写锁的同时,获取到读锁,获取成功后释放写锁。
必要性:为了保证数据的可见性。若当前线程不获取读锁而是直接释放写锁,则其他线程获取写锁并修改数据时,当前线程无法感知到数据的修改。而使用锁降级,则其他线程会被阻塞,直到当前线程已经获取读锁后才能有可能获得写锁。
注:ReentrantReadWriteLock不支持锁升级(读锁==>写锁),为了保证数据的可见性。
参考:
以上是关于ReentrantReadWriteLock详解的主要内容,如果未能解决你的问题,请参考以下文章