22.读写锁ReetrantReadWriteLock
Posted 纵横千里,捭阖四方
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了22.读写锁ReetrantReadWriteLock相关的知识,希望对你有一定的参考价值。
前面我们介绍过读写锁的用法,本节我们详细分析读写锁的实现过程。ReetrantReadWriteLock由于要实现读读不互斥、读写互斥、写写互斥,根据前面对AQS的分析,貌似读读场景使用共享锁能搞定,而后两者基于独占锁能搞定。事实也确实如此,两个就是基于AQS实现的。具体来说为了实现读写锁,ReetrantReadWriteLock类的内部分别提供了两个实现类:
ReadLock:读锁
WriteLock:写锁。
分析ReetrantReadWriteLock类的关系图,会发现其的结构比较复杂:
图中除了AQS和两个接口Lock和ReadWriteLock,其他的类,例如Sync、ReadLock和Write类以及公平和非公平锁都是在ReetrantReadWriteLock类里实现的。
而ReadLock和WriteLock依赖于Sych同步类,从类的关系看,Sync中重写了AQS的独占和共享四个方法。
如果当前线程调用ReadLock.lock()方法,则实际会调用Sync中的tryAcquireShared()方法来实现共享锁竞争。如果当前线程调用WriteLock.lock(),则实际会调用到Sync中的tryAcquire()方法竞争独占锁。
1 WriteLock锁
接下来,我们就看一下写锁和读锁的实现原理,首先看写锁。参考ReadWriteLockExample例子,当我们执行writeLock.lock();方法时,最终也是会执行到AQS的如下位置:
public final void acquire(int arg)
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
这个代码与我们前面介绍的重入锁是一致的,但是里面的几个方法的实现有不少差异,因此还是需要再看一次。
1.1 锁竞争过程
tryAcquire()方法的实现是在ReadWriteLock类里:
protected final boolean tryAcquire(int acquires)
Thread current = Thread.currentThread();
int c = getState();//获取当前的互斥变量的值
//获得写线程的数量
int w = exclusiveCount(c);
if (c != 0)
// 如果 if c != 0 and w == 0 说明有其他线程获得了共享锁,此时写操作不能执行
if (w == 0 || current != getExclusiveOwnerThread())
return false;
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// 线程重入
setState(c + acquires);
return true;
//如果c==0,说明读锁和写锁都没有被线程获取
if (writerShouldBlock() ||
!compareAndSetState(c, c + acquires))
return false;
setExclusiveOwnerThread(current);
return true;
这里的核心思想与重入锁一致,都是通过state互斥量来竞争锁资源,但是具体实现过程有很大区别,具体过程是:
-
1.通过getState()方法获取当前的互斥变量的值。 如果 c != 0 and w == 0 说明有其他线程获得了共享锁,此时写操作不能执行 。
-
2.通过exclusiveCount()方法查找当前获得写锁的线程数量,由于写锁是互斥的,所以如果能够获得多个,就说明只能是重入的情况。则通过w+exclusiveCount(acquires)进行次数的累加,这里有一个最大重入允许次数65535。
-
3.如果c==0,说明读锁和写锁都没有被线程获取,通过writeShouldBlock()方法判断写锁是否应该阻塞,在非公平模式下,写锁不需要先阻塞,而是通过compareAndSetState()方法竞争锁。
1.2 写锁数量如何获得的
在上面的代码中,判断写锁的数量是通过这一行进行的:
if (w + exclusiveCount(acquires) > MAX_COUNT)
他的实现与其他的貌似不太一样:
static int exclusiveCount(int c) return c & EXCLUSIVE_MASK;
EXCLUSIVE_MASK又是什么,其定义是这样的:
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
这里其实就是定义EXCLUSIVE_MASK=2^16-1=01111111111111(二进制)= 65535。之所以使用移位,主要还是为了计算效率。
这里是如何获得的呢?这是因为state采用高低位分开存储读锁和写锁,如下图所示。
在上面的图中,高16位存储读锁,当前读锁状态为10,表示有两个线程获得了读锁。低16位存储写锁状态,当前锁状态为100,表示有一个线程重入了4次。
exclusiveCount()方法采用位运算得到state低16位的值,并以该值来判断当前state的重入次数,代码如下:
static int exclusiveCount(int c)return c&EXCLUSIVE_MASK;
这种计数方式在很多代码中都能看到,是个很常用的技巧。
1.3释放锁的过程
我们继续看锁释放的过程:
protected final boolean tryRelease(int releases)
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
int nextc = getState() - releases;
boolean free = exclusiveCount(nextc) == 0;
if (free)
setExclusiveOwnerThread(null);
setState(nextc);
return free;
释放过程与重入锁的基本一致,通过getState()-release来递减锁的次数,由于写锁的重入次数保存在低位,所以直接按十进制计算即可。通过exclusiveCount()方法计算写锁的重入次数,如果为0,则说明锁释放成功
WriteLock锁竞争失败的逻辑,和前文分析的ReetrantLock锁竞争逻辑也是一致,这里不再赘述。
2 ReadLock锁
分析完了写锁,我们再来看一下读锁是如何工作的。允许多个线程同时读,通过AQS的共享锁来实现。
2.1 锁竞争过程
共享锁的竞争会调用tryAcquireShared()方法,tryAcquireShared()方法如果返回-1,则表示需要等待其他写锁释放,否则表示当前没有线程持有锁,可以直接获得读锁。
protected final int tryAcquireShared(int unused)
Thread current = Thread.currentThread();//获得当前线程
int c = getState();//获取当前线程
//如果写锁或独占锁的持有者不是当前线程,则直接阻塞
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return -1;
int r = sharedCount(c);//读锁数量
if (!readerShouldBlock() &&
r < MAX_COUNT &&
compareAndSetState(c, c + SHARED_UNIT))
if (r == 0) //表示第一次获取读锁
//保存第一个获取读锁的线程
firstReader = current;
firstReaderHoldCount = 1;
//表示读锁的重入
else if (firstReader == current)
firstReaderHoldCount++;
else //保存每个线程读锁的重入次数
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
return 1;
return fullTryAcquireShared(current);
我们来分析一下上面的逻辑过程:
1.首先,(exclusiveCount(c) != 0 &&getExclusiveOwnerThread() != current)的作用是判断是否有其他线程获得写锁。如果有写操作则读线程直接进入等待状态。
2.通过sharedCount()方法获得读锁的数量。其实现也是通过位移,详见前面小节“写锁是如何获得的”。
3.接下来,如果当前读锁(通过readerShouldBlock()实现)不需要等待,并且读线程数量没超过65535,就直接通过compareAndSetState()方法如果readerShouldBlock返回false,表示当前锁不需要等待。
4.读锁获取成功后,处理还有些麻烦,这里会根据不同的条件进行处理:
-
r==0,表示第一次获得锁。此时保存第一次获得读锁的线程。
-
firstReader==current,表示当前读线程是第一个获得读锁的,此时需要增加重入次数。
-
如果是其他情况, 则用readHolds来保存每个线程获得锁的次数。如果查看readHolds的定义,这里使用的是ThreadLocal 来保存的,其功能是每个读线程可以独立管理自己的重入次数。该问题我们在ThreadLocal部分介绍。
5.如果CAS失败,则调用fullTryAcquireShared()方法尝试获取共享锁。
在ReetrantReadWriteLock读锁和写锁的获取过程中,在通过CAS修改互斥变量的状态之前,会分别调用readerShouldBlock()方法和writedShouldBlock()方法来判断是否可以通过CAS尝试获得锁,这两个方法的实现再公平和非公平模式中的实现不同。
对公平锁来说,readShouldBlock和writeShouldBlock都会通过hasQueuedPredecessor判断当前同步队列中年是否还有排队的线程,也就是对公平锁而言,只有同步队列中当前结点之前没有等待的线程 ,才会先尝试抢占锁,代码如下:
static final class FairSync extends Sync
private static final long serialVersionUID = -2274990926593161451L;
final boolean writerShouldBlock()
return hasQueuedPredecessors();
final boolean readerShouldBlock()
return hasQueuedPredecessors();
hasQueuedPredecessors()方法就是一个简单的链表判断,代码如下:
public final boolean hasQueuedPredecessors()
Node t = tail; // Read fields in reverse initialization order
Node h = head;
Node s;
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
对非公平锁来说,writerShouldBlock()方法直接返回false,也就是说在默认情况下都会先去抢占写锁,代码如下:
static final class NonfairSync extends Sync
private static final long serialVersionUID = -8159625535654395037L;
final boolean writerShouldBlock()
return false; // writers can always barge
final boolean readerShouldBlock()
return apparentlyFirstQueuedIsExclusive();
而readerShouldBlock()方法则使用apparentlyFirstQueuedIsExclusive()来判断:
final boolean apparentlyFirstQueuedIsExclusive()
Node h, s;
return (h = head) != null &&
(s = h.next) != null &&
!s.isShared() &&
s.thread != null;
这个方法的目的是避免写锁无限等待的问题,试想一下,如果一直有线程在获得读锁,那意味着写锁将一直无法获得,极端情况下将会一直导致写锁一直等待下去。为了避免这个问题,apparentlyFirstQueuedIsExclusive是这么做的:
-
如果当前同步队列head节点的下一个结点是独占锁结点,那么该方法会返回true,表示当前来获取锁的线程需要排队。
-
如果当前同步队列head结点的下一个结点是共享锁结点,那么该方法会返回false,表示当前来获得读锁的线程允许通过CAS修改互斥锁的状态。
这种设计在一定程度上避免了写锁无限等待的情况。
另外,在tryAcquireShard()方法中,当通过CAS抢占到读锁时,出了通过state变量记录总的读锁次数,还使用HoldCounter以线程为单位记录每个线程获得读锁的次数。之所以要这样设计,是因为state只能记录读线程和写线程的总数,但是无法记录每个线程获得读锁的重入次数,代码如下:
protected final int tryAcquireShared(int unused)
....
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
...
HoldCounter保存了count和tid,其中count用来记录数量,而tid用来记录当前线程的id,所以一个HoldCounter可以表示某个线程对应的重入次数,代码如下:
static final class HoldCounter
int count = 0;
final long tid = getThreadId(Thread.currentThread());
但是如果要实现线程的隔离,也就是说每个线程都有一个独立的HoldCounter实例,那么要怎么样实现呢?我们看到再记录重入数量的代码中有这样一行代码:
cachedHoldCounter = rh = readHolds.get();
每个线程进行数量统计时,都是从readHolds.get()一个HoldCounter实例,代码如下:
static final class ThreadLocalHoldCounter
extends ThreadLocal<HoldCounter>
public HoldCounter initialValue()
return new HoldCounter();
这里采用ThreadLocal来进行线程隔离,也就是每个线程调用readHolds.get()方法,都会得到一个和当前绑定的HoldCounter对象实例,也就能实现针对每个线程记录读锁的重入次数的功能。
如果通过tryAcquireShared尝试抢占读锁失败,则还需要调用fullTryAcquireShared()方法,该方法的整体逻辑和tryAcquireShared()类似,只不过增加了自旋等待来保证读锁成功,代码如下:
final int fullTryAcquireShared(Thread current)
HoldCounter rh = null;
for (;;) //自旋
int c = getState();
//情况1:如果当前有其他线程获得写锁,并且获得写锁的咸亨不是当前线程,则返回-1
if (exclusiveCount(c) != 0)
if (getExclusiveOwnerThread() != current)
return -1;
//情况2:如果返回true,则表示当前抢占读锁的线程要等待
else if (readerShouldBlock())
// Make sure we're not acquiring read lock reentrantly
if (firstReader == current)
// assert firstReaderHoldCount > 0;
else
if (rh == null)
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
if (rh.count == 0)
readHolds.remove();
if (rh.count == 0)
return -1;
//情况3:判断读锁的总数是否大于最大值
if (sharedCount(c) == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
//情况4
if (compareAndSetState(c, c + SHARED_UNIT))
if (sharedCount(c) == 0)
firstReader = current;
firstReaderHoldCount = 1;
else if (firstReader == current)
firstReaderHoldCount++;
else
if (rh == null)
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
cachedHoldCounter = rh; // cache for release
return 1;
上述代码中,
-
情况1里的getExclusiveOwnerThread() != current是为了避免死锁的,如果一个线程先获得锁,在没释放写锁之前再舱室获得锁,并且直接返回-1,那么获得写锁的线程会一直无法唤醒,从而进入死锁状态。
-
情况2位置表示当前读锁应该先阻塞,再判断当前读锁是否是重入,如果是则直接抢占锁,否则阻塞。
-
情况4位置,通过cas抢占读读锁资源,成功后使用HoldCounter记录当前线程的重入次数,其代码实现与tryAcquireShared()方法相同。
上述代码中,我们要注意firstReader和firstReaderHoldCount两个字段,这两个字段会记录第一个获得读锁的先生以及该线程的重入次数,如果是第一个先生就没有必要添加到HoldCounter了,为什么要这样呢?这是一种优化,如果获得读锁的线程只有一个,就没必要从HoldCounter中查找了,这可以提供性能。
但是这里的firstReader并不是全局的第一个线程,当原本的第一个线程释放了锁之后,后序来获得读锁的线程会占用这个firstReader属性,firstReader和firstReaderHoldCount可以在读锁不产生竞争的情况下快速记录读锁的重入次数。
最后,在fullTryAcquireShared()方法中,在如下两个情况下需要加入到同步队列进行等待:当前有其他线程获得了写锁并且当前线程不是重入。readerShouldBlock()方法返回true并不是重入。
3 读写锁过程总结
我们接下来从整体上看一下读写锁的基本过程。假设两个线程ThreadA和ThreadB先去获得读锁。此时使用firstReader和firstReaderHoldCount分别记录第一个获得读锁的线程以及线程重入的次数。ThreadB获得读锁,用HoldCounter记录当前线程的重入次数。
接着ThreadC来抢占写锁,由于此时有ThreadA和ThreadB持有读锁,因此ThreadC抢占写锁失败,直接加到同步阻塞队列中。此时假如又来了D和E来抢占写线程。
假如此时再有两个线程ThreadD和ThreadE来抢占读锁,由于不满足直接读锁的条件,所以需要加入到同步队列。注意读锁加入队列中的节点类型是SHARED,表示共享节点。SHARED节点有个特点:如果其中一个被唤醒,那么其他类型的SHARED结点也都会被唤醒,也就是允许多个线程同时抢读锁。这也是读写锁的特性,当写锁释放之后,要唤醒所有等待的读锁来读取最新的数据。
如果ThreadA和ThreadB这两个获得读锁的线程释放了锁,就会从AQS队列中唤醒头部结点的下一个结点,也就是ThreadC线程,该线程是来抢占锁的,当该线程获得锁之后的结构如下。ThreadC这个Node结点变成Head结点,原来的head结点从链表中移除,然后ThreadC竞争到互斥锁,所以state的低位为1,exclusiveOwnerThread=ThreadC。
最后,如果ThreadC的读锁也释放了,那么需要从AQS的同步队列中继续唤醒head结点的next结点,也就是ThreadD线程所在的结点,在唤醒的过程中发现该结点类型是SHARED,由于共享锁的结点允许多个线程竞争到锁,所以继续往后查找类型为SHARED的结点进行唤醒,如果下一个结点的类型是EXCLUSIVE(独占锁),则中断共享锁的传递,不再往后继续唤醒SHARED结点。
以上是关于22.读写锁ReetrantReadWriteLock的主要内容,如果未能解决你的问题,请参考以下文章