JUC之ReadWriteLockReentrantReadWriteLock读写锁

Posted fondwang

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了JUC之ReadWriteLockReentrantReadWriteLock读写锁相关的知识,希望对你有一定的参考价值。

读写锁简介

  对共享资源有读和写的操作,且写操作没有读操作那么频繁。在没有写操作的时候,多个线程同时读一个资源没有任何问题,所以应该允许多个线程读取共享资源;但是如果一个线程想去写这些共享资源,就不应该允许其他线程对该资源进行读和写操作了。

  读写锁ReentrantReadWriteLock,它表示两个锁,一个是读操作相关的锁,称为共享锁;一个是写相关的锁,称为排他锁,描述如下:

读锁的条件:

  1. 没有其他线程的写锁;

  2. 对写锁请求的线程必须是同一个。

写锁的条件:

  1. 没有其他线程的读锁;

  2. 没有其他线程的写锁。

   读写锁的三个重要特性:

    ①. 公平选择权:支持非公平(默认)和公平的锁获取方式,非公平锁吞吐量由于公平锁。

    ②. 重进入:读锁和写锁都支持线程重进入。

    ③. 锁降级:遵循获取写锁、获取读锁、释放写锁的次序,写锁能够降级成为读锁。

源码解读

ReentrantReadWriteLock类的整体结构:

public class ReentrantReadWriteLock implements ReadWriteLock, java.io.Serializable {

    // 读锁 
    private final ReentrantReadWriteLock.ReadLock readerLock;

    // 写锁 
    private final ReentrantReadWriteLock.WriteLock writerLock;

    final Sync sync;
    
    // 使用默认(非公平)的排序属性创建一个新的 ReentrantReadWriteLock 
    public ReentrantReadWriteLock() {
        this(false);
    }

    // 使用给定的公平策略创建一个新的 ReentrantReadWriteLock 
    public ReentrantReadWriteLock(boolean fair) {
        sync = fair ? new FairSync() : new NonfairSync();
        readerLock = new ReadLock(this);
        writerLock = new WriteLock(this);
    }

    // 返回用于写入操作的锁 
    public ReentrantReadWriteLock.WriteLock writeLock() { return writerLock; }
    
    // 返回用于读取操作的锁
    public ReentrantReadWriteLock.ReadLock  readLock()  { return readerLock; }
   // 继承AQS
    abstract static class Sync extends AbstractQueuedSynchronizer {}
   // 非公平锁
    static final class NonfairSync extends Sync {}
   // 公平锁
    static final class FairSync extends Sync {}
   // 读锁
    public static class ReadLock implements Lock, java.io.Serializable {}
   // 写锁
    public static class WriteLock implements Lock, java.io.Serializable {}
}

类的继承关系

  public class ReentrantReadWriteLock implements ReadWriteLock, java.io.Serializable {}

  ReentrantReadWriteLock实现了ReadWriteLock接口,ReadWriteLock接口定义了获取读锁和写锁的规范,具体需要实现类去实现;同时其还实现了Serializable接口,表示可以进行序列化。

类的内部类

ReentrantReadWriteLock有五个内部类,五个内部类之间也是相互关联的。

技术图片

 

 

说明:如上图所示,Sync继承AQS、NonfairSync和FairSync继承自Sync类;ReadLock和WriteLock实现了Lock接口。


 Sync类

(1)类的继承关系

 abstract static class Sync extends AbstractQueuedSynchronizer {}

  Sync抽象类继承自AQS抽象类,Sync类提供了对ReentrantReadWriteLock的支持

(2)类的构造器

Sync() {
    // 本地线程计数器 readHolds
= new ThreadLocalHoldCounter();
    // 设置AQS的状态 setState(getState());
// 确保readhold的可见性 }

 (3)类的属性

abstract static class Sync extends AbstractQueuedSynchronizer {
    // 版本序列号
    private static final long serialVersionUID = 6317671515068378041L;        
    // 高16位为读锁,低16位为写锁
    static final int SHARED_SHIFT   = 16;
    // 读锁单位。SHARED_SHIFT * 2
    static final int SHARED_UNIT    = (1 << SHARED_SHIFT);
    // 读锁最大数量
    static final int MAX_COUNT      = (1 << SHARED_SHIFT) - 1;
    // 写锁最大数量
    static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
    // 本地线程计数器
    private transient ThreadLocalHoldCounter readHolds;
    // 缓存的计数器
    private transient HoldCounter cachedHoldCounter;
    // 第一个读线程
    private transient Thread firstReader = null;
    // 第一个读线程的计数
    private transient int firstReaderHoldCount;
}

 (4)内部类

// 计数器
static
final class HoldCounter {
     // 计数
int count = 0; // Use id, not reference, to avoid garbage retention
     // 获取当前线程的TID属性的值 final long tid = getThreadId(Thread.currentThread()); }
// 本地线程计数器
static final class ThreadLocalHoldCounter extends ThreadLocal<HoldCounter> {
    // 重写初始化方法,在没有进行set的情况,获取的都是该HoldCounter值
public HoldCounter initialValue() { return new HoldCounter(); } }

 HoldCounter主要有两个属性,count和tid,其中count表示某个读线程重入的次数,tid表示该线程的tid字段的值,该字段可以用来唯一标识一个线程。

ThreadLocalHoldCounter重写了ThreadLocal的initialValue方法,ThreadLocal类可以将线程与对象相关联。在没有进行set的情况下,get到的均是initialValue方法里面生成的那个HolderCounter对象。

(5)类的方法


// 返回读锁线程数量
static int sharedCount(int c) { return c >>> SHARED_SHIFT; }
// 返回写锁线程数量
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
说明:直接将state右移16位,就可以得到读锁的线程数量,因为state的高16位表示读锁,对应的第十六位表示写锁数量。

 写锁的获取

protected final boolean tryAcquire(int acquires) {
            Thread current = Thread.currentThread(); //当前线程
            int c = getState();  //获取状态
            int w = exclusiveCount(c);  //写线程数量
        // 当同步状态state != 0,则已有线程获取读锁或写锁
            if (c != 0) {
                // 如果写锁状态为0,说明读锁此时被占用 则返回false;如果写锁状态不为0,且写锁没有被当前线程持有 则返回false
                if (w == 0 || current != getExclusiveOwnerThread())
                    return false;
                if (w + exclusiveCount(acquires) > MAX_COUNT) //判断同一线程获取写锁是否超过最大次数(65535),也算可重入
                    throw new Error("Maximum lock count exceeded");
                // 更新状态
                setState(c + acquires);
                return true;
            }
         // 到这里说明c=0,读/写锁都没有被获取。 判断是否正在阻塞 或 CAS更新状态失败
            if (writerShouldBlock() || !compareAndSetState(c, c + acquires))
                return false;
            setExclusiveOwnerThread(current);  //设置锁为当前线程所有
            return true;
        }

 获取写锁的步骤如下:

(1)首先获取c、w。c表示当前锁状态;w表示写线程数量。然后判断同步状态state是否为0。如果state!=0,说明已经有其他线程获取了读锁或写锁,执行(2);否则执行(5)。

(2)如果锁状态不为零(c != 0),而写锁的状态为0(w = 0),说明读锁此时被其他线程占用,所以当前线程不能获取写锁,自然返回false。或者锁状态不为零,而写锁的状态也不为0,但是获取写锁的线程不是当前线程,则当前线程也不能获取写锁。

(3)判断当前线程获取写锁是否超过最大次数,若超过,抛异常,反之更新同步状态(此时当前线程已获取写锁,更新是线程安全的),返回true。

(4)如果state为0,此时读锁或写锁都没有被获取,判断是否需要阻塞(公平和非公平方式实现不同),在非公平策略下总是不会被阻塞,在公平策略下会进行判断(判断同步队列中是否有等待时间更长的线程,若存在,则需要被阻塞,否则,无需阻塞),如果不需要阻塞,则CAS更新同步状态,若CAS成功则返回true,失败则说明锁被别的线程抢去了,返回false。如果需要阻塞则也返回false。

(5)成功获取写锁后,将当前线程设置为占有写锁的线程,返回true。

方法流程图:

技术图片


 写锁的释放

protected final boolean tryRelease(int releases) {
        // 锁不是当前线程持有者
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
        // 写锁的新线程数。如果重入了几次,就要执行几次释放
            int nextc = getState() - releases;
        // 如果写(独占)模式重入数为0了,说明独占模式被释放
            boolean free = exclusiveCount(nextc) == 0;
            if (free)
                setExclusiveOwnerThread(null); //写锁释放完成,设置锁的持有者为null
            setState(nextc);  //更新重入数
            return free;
        }

写锁释放过程:

  1. 首先查看当前线程是否为写锁的持有者,如果不是抛出异常。

  2. 然后检查释放后写锁的线程数是否为0,如果为0则表示写锁空闲了,释放锁资源将锁的持有线程设置为null,否则释放仅仅只是一次重入锁而已,并不能将写锁的线程清空。

说明:此方法用于释放写锁资源,首先会判断该线程是否为独占线程,若不为独占线程,则抛出异常,否则,计算释放资源后的写锁的数量,若为0,表示成功释放,资源不将被占用,否则,表示资源还被占用。

其方法流程图如下。

 技术图片


读锁的获取

// 从名称可见,读锁为共享锁,可被多个线程持有
protected final int tryAcquireShared(int unused) {
            // 当前线程
            Thread current = Thread.currentThread();
        // 获取状态
            int c = getState();
        // 如果写锁线程数 !=0,且独占锁不是当前线程 返回false。 因为存在锁降级
            if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current)
                return -1;
        // 读锁数量
            int r = sharedCount(c);
         // 读锁是否被阻塞 && 线程数小于最大值 && CAS设置成功
            if (!readerShouldBlock() && r < MAX_COUNT && compareAndSetState(c, c + SHARED_UNIT)) {
          // r == 0, 表示第一个读锁线程,首个读锁firstRead不会加入到readHolds中
                if (r == 0) {  //读锁数量0
                    firstReader = current; //设置第一个线程
                    firstReaderHoldCount = 1;  //读锁占用资源数为1
                } else if (firstReader == current) {  //当前线程为第一个读线程,即线程重入
                    firstReaderHoldCount++; //占用资源数加1
                } else { //读锁数量不为0,且不是当前线程
             // 获取计数器
                    HoldCounter rh = cachedHoldCounter;
             //计数器为空 || 计数器tid不是当前线程的tid
                    if (rh == null || rh.tid != getThreadId(current))
                        cachedHoldCounter = rh = readHolds.get();  //获取当前线程的计数器
                    else if (rh.count == 0)  //计数为0
                        readHolds.set(rh);  //加入readHolds中
                    rh.count++;
                }
                return 1;
            }
         //三个条件不满足(读线程是否应该被阻塞、小于最大值、比较设置成功)则会进行fullTryAcquireShared函数中,它用来保证相关操作可以成功。
            return fullTryAcquireShared(current);
        }

fullTryAcquireShared()方法

final int fullTryAcquireShared(Thread current) {
            //
            HoldCounter rh = null;
            for (;;) {
                int c = getState(); //获取状态
          //写线程数不为0
                if (exclusiveCount(c) != 0) { 
             // 不为当前线程 
                    if (getExclusiveOwnerThread() != current)
                        return -1;
                } else if (readerShouldBlock()) { // 写线程数量为0,且读线程被阻塞
                    // 当前线程是第一个度线程
                    if (firstReader == current) {
                        // assert firstReaderHoldCount > 0;
                    } else { //当前线程不是第一个读线程
                        if (rh == null) { //计数器为空
                            rh = cachedHoldCounter; 
                  // 计数器为空 或者 计数器的tid不为当前正在运行的线程的tid
                            if (rh == null || rh.tid != getThreadId(current)) { 
                                rh = readHolds.get();
                                if (rh.count == 0)
                                    readHolds.remove();
                            }
                        }
                        if (rh.count == 0)
                            return -1;
                    }
                }
                if (sharedCount(c) == MAX_COUNT)  // 读锁数为最大值,异常
                    throw new Error("Maximum lock count exceeded");
          // CAS成功
                if (compareAndSetState(c, c + SHARED_UNIT)) {
             // 读数量为0
                    if (sharedCount(c) == 0) {
                        firstReader = current; // 设置第一个读线程
                        firstReaderHoldCount = 1; //读线程占用资源数
                    } else if (firstReader == current) { // 读线程重入
                        firstReaderHoldCount++;
                    } else { //读锁数量不为0,并且不为当前线程
                        if (rh == null)
                            rh = cachedHoldCounter;
                        if (rh == null || rh.tid != getThreadId(current)) // 计数器为空 || 计数器的tid不为当前线程的tid
                            rh = readHolds.get();  //获取当前线程的计数器
                        else if (rh.count == 0)  //计数为0
                            readHolds.set(rh);  //加入到readHolds中
                        rh.count++;
                        cachedHoldCounter = rh; // cache for release
                    }
                    return 1;
                }
            }
        }

读写锁取锁过程:

  1. 首先判断写锁是否为0,,且当前线程不占有独占锁(写锁),之间返回;

  2. 否则,判断读线程是否被阻塞 && 读锁数小于最大值 && CAS成功,若当前没有读锁,则设置第一个读线程firstReader和firstReaderHoldCount;

  3. 若当前线程线程为第一个读线程,则增加firstReaderHoldCount;

  4. 否则,将设置当前线程对应的HoldCounter对象的值。

流程图:

技术图片

注意:更新成功后会在firstReaderHoldCount中或readHolds(ThreadLocal类型的)的本线程副本中记录当前线程重入数(23行至43行代码),这是为了实现jdk1.6中加入的getReadHoldCount()方法的,这个方法能获取当前线程重入共享锁的次数(state中记录的是多个线程的总重入次数),加入了这个方法让代码复杂了不少,但是其原理还是很简单的:如果当前只有一个线程的话,还不需要动用ThreadLocal,直接往firstReaderHoldCount这个成员变量里存重入数,当有第二个线程来的时候,就要动用ThreadLocal变量readHolds了,每个线程拥有自己的副本,用来保存自己的重入数。


读锁的释放

protected final boolean tryReleaseShared(int unused) {
            Thread current = Thread.currentThread(); //当前线程
            if (firstReader == current) { // 当前线程是否为第一个读线程
                // assert firstReaderHoldCount > 0;
                if (firstReaderHoldCount == 1)  //读线程占用资源数为1
                    firstReader = null;
                else  //减少占用的资源
                    firstReaderHoldCount--;
            } else {  // 当前线程不是第一个读线程
          // 获取缓存的计数器
                HoldCounter rh = cachedHoldCounter;
                if (rh == null || rh.tid != getThreadId(current))  //计数器为空 || 计数器的tid不为当前正在运行的线程的tid
                    rh = readHolds.get(); // 获取当前线程的计数器
                int count = rh.count;  // 获取计数
                if (count <= 1) {  // 计数小于等于1
                    readHolds.remove(); //移除
                    if (count <= 0) //计数小于等于0,异常
                        throw unmatchedUnlockException();
                }
                --rh.count;  // 减少计数
            }
            for (;;) {
                int c = getState();  // 获取状态
                int nextc = c - SHARED_UNIT;  
                if (compareAndSetState(c, nextc))
                    return nextc == 0;
            }
        }

读锁释放过程:

  1.  首先判断当前线程是否为第一个读线程firstReader,若是,则判断第一个读线程占有的资源数firstReaderHoldCount是否为1,若是,则设置第一个读线程firstReader为空。

    否则,将第一个读线程占有的资源数firstReaderHoldCount减1;

  2.  若当前线程不是第一个读线程,那么首先会获取缓存计数器(上一个读锁线程对应的计数器 ),若计数器为空或者tid不等于当前线程的tid值,则获取当前线程的计数器.

       如果计数器的计数count小于等于1,则移除当前线程对应的计数器;如果计数器的计数count小于等于0,则抛出异常,之后再减少计数即可。

流程图

技术图片

在读锁的获取、释放过程中,总是会有一个对象存在着,同时该对象在获取线程获取读锁是+1,释放读锁时-1,该对象就是HoldCounter。

要明白HoldCounter就要先明白读锁。前面提过读锁的内在实现机制就是共享锁,对于共享锁其实我们可以稍微的认为它不是一个锁的概念,它更加像一个计数器的概念。

一次共享锁操作就相当于一次计数器的操作,获取共享锁计数器+1,释放共享锁计数器-1。只有当线程获取共享锁后才能对共享锁进行释放、重入操作。

所以HoldCounter的作用就是当前线程持有共享锁的数量,这个数量必须要与线程绑定在一起,否则操作其他线程锁就会抛出异常。


对于非公平/公平内部类、读/写锁内部类的方法,大多都会转到调用Sync内部类的方法。

除内部类的方法外的其他方法,都是一些基本信息:读线程数、写线程数、是否被加读/写锁等等,不是很难 结合源码自行查看。


图解重要函数及对象关系

AQS图解

技术图片

 

 

 读写锁的加锁解锁操作

 

技术图片从图中可见操作最终都是调用ReentrantReadWriteLock类的内部类Sync提供的方法。


AQS无锁状态

 技术图片


 AQS写锁无等待状态                                      

技术图片  

AQS写锁重入状态

技术图片

 AQS写锁等待状态

技术图片


 AQS读锁无等待状态(首节点)                                

技术图片  

AQS读锁重入状态(首节点)

技术图片

AQS读锁无等待状态(非首节点)                                

 

技术图片  

AQS读锁等待状态(非首节点)

技术图片

读锁获取添加等待队列                                    写锁获取添加等待队列

技术图片  技术图片

 

 

 

 

 

参考:https://segmentfault.com/a/1190000015768003

以上是关于JUC之ReadWriteLockReentrantReadWriteLock读写锁的主要内容,如果未能解决你的问题,请参考以下文章

JUC系列Executor框架之概览

JUC系列Executor框架之CompletionFuture

JUC系列Executor框架之FutureTask

JUC系列Executor框架之线程池执行器

Java并发编程系列之三JUC概述

Java - "JUC"之Condition源码解析