深入浅出ReentrantReadWriteLock源码解析

Posted pinxiong

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了深入浅出ReentrantReadWriteLock源码解析相关的知识,希望对你有一定的参考价值。

读写锁实现逻辑相对比较复杂,但是却是一个经常使用到的功能,希望将我对ReentrantReadWriteLock的源码的理解记录下来,可以对大家有帮助

前提条件

在理解ReentrantReadWriteLock时需要具备一些基本的知识

理解AQS的实现原理

之前有写过一篇《深入浅出AQS源码解析》关于AQS的文章,对AQS原理不了解的同学可以先看一下

理解ReentrantLock的实现原理

ReentrantLock的实现原理可以参考《深入浅出ReentrantLock源码解析》

什么是读锁和写锁

对于资源的访问就两种形式:要么是读操作,要么是写操作。读写锁是将被锁保护的临界资源的读操作和写操作分开,允许同时有多个线程同时对临界资源进行读操作,任意时刻只允许一个线程对资源进行写操作。简单的说,对与读操作采用的是共享锁,对于写操作采用的是排他锁

读写状态的设计

ReentrantReadWriteLock是用state字段来表示读写锁重复获取资源的次数,高16位用来标记读锁的同步状态,低16位用来标记写锁的同步状态

// 划分的边界线,用16位来划分
static final int SHARED_SHIFT   = 16;
// 读锁的基本单位,也就是读锁加1或者减1的基本单位(1左移16位后的值)
static final int SHARED_UNIT    = (1 << SHARED_SHIFT);
// 读写锁的最大值(在计算读锁的时候需要先右移16位)
static final int MAX_COUNT      = (1 << SHARED_SHIFT) - 1;
// 写锁的掩码,state值与掩码做与运算后得到写锁的真实值
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;

// 获取资源被读锁占用的次数
static int sharedCount(int c){
      return c >>> SHARED_SHIFT;
}

// 获取资源被写锁占用的次数
static int exclusiveCount(int c){
      return c & EXCLUSIVE_MASK;
}

在统计读锁被每个线程持有的次数时,ReentrantReadWriteLock采用的是HoldCounter来实现的,具体如下:

// 持有读锁的线程重入的次数
static final class HoldCounter {
    // 重入的次数
    int count = 0;
    // 持有读锁线程的线程id
    final long tid = getThreadId(Thread.currentThread());
}

/**
 * 采用ThreadLocal机制,做到线程之间的隔离
 */
static final class ThreadLocalHoldCounter
    extends ThreadLocal<HoldCounter> {
    public HoldCounter initialValue() {
        return new HoldCounter();
    }
}

/**
 * 线程持有可重入读锁的次数
 */
private transient ThreadLocalHoldCounter readHolds;

/**
 * 缓存最后一个成功获取读锁的线程的重入次数,有两方面的好处:
 * 1、避免了通过访问ThreadLocal来获取读锁的信息,这个优化的前提是
 *    假设多数情况下,一个获取读锁的线程,使用完以后就会释放读锁,
 *    也就是说最后获取读锁的线程和最先释放读锁的线程大多数情况下是同一个线程
 * 2、因为ThreadLocal中的key是一个弱引用类型,当有一个变量持有HoldCounter对象时,
 *    ThreadLocalHolderCounter中最后一个获取锁的线程信息不会被GC回收掉
 */
private transient HoldCounter cachedHoldCounter;

/**
 * 第一个获取读锁的线程,有两方面的考虑:
 * 1、记录将共享数量从0变成1的线程
 * 2、对于无竞争的读锁来说进行线程重入次数数据的追踪的成本是比较低的
 */
private transient Thread firstReader = null;

/**
 * 第一个获取读锁线程的重入次数,可以参考firstReader的解析
 */
private transient int firstReaderHoldCount;

ReentrantReadWriteLock 源码解析

ReentrantReadWriteLock 一共有5个内部类,具体如下:

  • Sync:公平锁和非公平锁的抽象类
  • NonfairSync:非公平锁的具体实现
  • FairSync:公平锁的具体实现
  • ReadLock:读锁的具体实现
  • WriteLock:写锁的具体实现

我们从读锁ReadLock和写锁WriteLock的源码开始分析,然后顺着这个思路将整个ReentrantReadWriteLock中所有的核心源码(所有的包括内部类)进行分析。

ReadLock类源码解析

public static class ReadLock implements Lock, java.io.Serializable {
    
    private final Sync sync;

    /**
     * 通过ReentrantReadWriteLock中的公平锁或非公平锁来初始化sync变量
     */
    protected ReadLock(ReentrantReadWriteLock lock) {
          sync = lock.sync;
    }

    /**
     * 阻塞的方式获取锁,因为读锁是共享锁,所以调用acquireShared方法
     */
    public void lock() {
          sync.acquireShared(1);
    }

    /**
     * 可中断且阻塞的方式获取锁
     */
    public void lockInterruptibly() throws InterruptedException {
          sync.acquireSharedInterruptibly(1);
    }

    /**
     * 超时尝试获取锁,非阻塞的方式
     */
    public boolean tryLock(long timeout, TimeUnit unit)
          throws InterruptedException {
          return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
    }
    /**
     * 尝试获取写锁,非阻塞的方式
     */
    public boolean tryLock() {
          return sync.tryReadLock();
    }

    /**
     * 释放锁
     */
    public void unlock() {
          sync.releaseShared(1);
    }
}

接下来,我们重点看一下在公平锁和非公平锁下Sync.acquireSharedSync.releaseSharedSync.tryLock这3个方法的实现(acquireSharedInterruptiblytryAcquireSharedNanos是AQS中的方法,这里就不在讨论了,具体可以参考《深入浅出AQS源码解析》),其中Sync.acquireShared中核心调用的方法是Sync.tryAcquireSharedSync. releaseShared中核心调用的方法是Sync.tryReleaseSharedSync. tryLock中核心调用的方法是Sync.tryReadLock,所以我们重点分析Sync.tryAcquireShared方法、Sync.tryReleaseShared方法和sync.tryReadLock方法

Sync.tryAcquireShared方法

protected final int tryAcquireShared(int unused) {
    /**
     * 以共享锁的方式尝试获取读锁,步骤如下:
     * 1、如果资源已经被写锁获取了,直接返回失败
     * 2、如果读锁不需要等待(公平锁和非公平锁的具体实现有区别)、
     *    并且读锁未超过上限、同时设置读锁的state值成功,则返回成功
     * 3、如果步骤2失败了,需要进入fullTryAcquireShared函数再次尝试获取读锁
     */
    Thread current = Thread.currentThread();
    int c = getState();
    /**
     * 资源已经被写锁独占,直接返回false
     */
    if (exclusiveCount(c) != 0 &&
          getExclusiveOwnerThread() != current)
          return -1;
    int r = sharedCount(c);
    /**
     * 1、读锁不需要等待
     * 2、读锁未超过上限
     * 3、设置读锁的state值成功
     * 则返回成功
     */
    if (!readerShouldBlock() &&
          r < MAX_COUNT &&
          compareAndSetState(c, c + SHARED_UNIT)) {
          if (r == 0) {
            // 记录第一个获取读锁的线程信息
            firstReader = current;
            firstReaderHoldCount = 1;
          } else if (firstReader == current) {
            // 第一个获取读锁的线程再次获取锁(重入)
            firstReaderHoldCount++;
          } else {
            // 修改获取锁的线程的重入的次数
            HoldCounter rh = cachedHoldCounter;
            if (rh == null ||
                rh.tid != LockSupport.getThreadId(current))
                cachedHoldCounter = rh = readHolds.get();
            else if (rh.count == 0)
                readHolds.set(rh);
            rh.count++;
          }
          return 1;
    }
    /**
     * 如果CAS失败再次获取读锁
     */
    return fullTryAcquireShared(current);
}

接下来看一下fullTryAcquireShared方法:

final int fullTryAcquireShared(Thread current) {
    /**
     * 调用该方法的线程都是希望获取读锁的线程,有3种情况:
     * 1、在尝试通过CAS操作修改state时由于有多个竞争读锁的线程导致CAS操作失败
     * 2、需要排队等待获取读锁的线程(公平锁)
     * 3、超过读锁限制的最大申请次数的线程
     */
    HoldCounter rh = null;
    for (;;) { // 无限循环获取锁
        int c = getState();
        // 已经被写线程获取锁了,直接返回
        if (exclusiveCount(c) != 0) {
            if (getExclusiveOwnerThread() != current)
                return -1;
        // 需要被block的读线程(公平锁)
        } else if (readerShouldBlock()) {
            // 如果时当前线程
            if (firstReader == current) {
                // assert firstReaderHoldCount > 0;
            } else {
                // 清理当前线程中重入次数为0的数据
                if (rh == null) {
                    rh = cachedHoldCounter;
                    if (rh == null || rh.tid != getThreadId(current)) {
                        rh = readHolds.get();
                        if (rh.count == 0)
                            readHolds.remove();
                    }
                }
                // 当前线程获取锁失败
                if (rh.count == 0)
                    return -1;
            }
        }
        // 判断是否超过读锁的最大值
        if (sharedCount(c) == MAX_COUNT)
            throw new Error("Maximum lock count exceeded");
        // 修改读锁的state值
        if (compareAndSetState(c, c + SHARED_UNIT)) {
            // 最新获取到读锁的线程设置相关的信息
            if (sharedCount(c) == 0) {
                firstReader = current;
                firstReaderHoldCount = 1;
            } else if (firstReader == current) {
                firstReaderHoldCount++; // 当前线程重复获取锁(重入)
            } else {
                // 在readHolds中记录获取锁的线程的信息
                if (rh == null)
                    rh = cachedHoldCounter;
                if (rh == null || rh.tid != getThreadId(current))
                    rh = readHolds.get();
                else if (rh.count == 0)
                    readHolds.set(rh);
                rh.count++;
                cachedHoldCounter = rh; // cache for release
            }
            return 1;
        }
    }
}

Sync.tryReleaseShared方法

tryReleaseShared方法的实现逻辑比较简单,我们直接看代码中的注释

protected final boolean tryReleaseShared(int unused) {
    Thread current = Thread.currentThread();
    /**
     * 如果当前线程是第一个获取读锁的线程,有两种情况:
     * 1、如果持有锁的次数为1,直接释放成功
     * 2、如果持有锁的次数大于1,说明有重入的情况,需要次数减1
     */
    if (firstReader == current) {
        // assert firstReaderHoldCount > 0;
        if (firstReaderHoldCount == 1)
            firstReader = null;
        else
            firstReaderHoldCount--;
    } else {
      /**
       * 如果当前线程不是第一个获取读锁的线程
       * 需要更新线程持有锁的重入次数
       * 如果次数小于等于0说明有异常,因为只有当前线程才会出现持有锁的重入次数等于0或者1
       */
        HoldCounter rh = cachedHoldCounter;
        if (rh == null || rh.tid != getThreadId(current))
            rh = readHolds.get();
        int count = rh.count;
        if (count <= 1) {
            readHolds.remove();
            if (count <= 0)
                throw unmatchedUnlockException();
        }
        --rh.count;
    }
    // 修改state的值
    for (;;) {
        int c = getState();
        int nextc = c - SHARED_UNIT;
        if (compareAndSetState(c, nextc))
            // 如果是最后一个释放读锁的线程nextc为0,否则不是
            return nextc == 0;
    }
}

sync.tryReadLock方法

tryReadLock的代码比较简单,就直接在将解析过程在注释中描述

final boolean tryReadLock() {
    Thread current = Thread.currentThread();
    for (;;) { // 无限循环获取读锁
        int c = getState();
        // 当前线程不是读线程,直接返回失败
        if (exclusiveCount(c) != 0 &&
            getExclusiveOwnerThread() != current)
            return false;
        int r = sharedCount(c);
        // 读锁的总重入次数是否超过最大次数限制
        if (r == MAX_COUNT)
            throw new Error("Maximum lock count exceeded");
        /**
         * 通过CAS操作设置state的值,如果成功表示尝试获取读锁成功,需要做以下几件事情:
         * 1、如果是第一获取读锁要记录第一个获取读锁的线程信息
         * 2、如果是当前获取锁的线程和第一次获取锁的线程相同,需要更新第一获取线程的重入次数
         * 3、更新获取读锁线程相关的信息
         */
        if (compareAndSetState(c, c + SHARED_UNIT)) {
            if (r == 0) {
                firstReader = current;
                firstReaderHoldCount = 1;
            } else if (firstReader == current) {
                firstReaderHoldCount++;
            } else {
                HoldCounter rh = cachedHoldCounter;
                if (rh == null || rh.tid != getThreadId(current))
                    cachedHoldCounter = rh = readHolds.get();
                else if (rh.count == 0)
                    readHolds.set(rh);
                rh.count++;
            }
            return true;
        }
    }
}

WriteLock类源码解析

public static class WriteLock implements Lock, java.io.Serializable {
    private final Sync sync;

    /**
     * 通过ReentrantReadWriteLock中的公平锁或非公平锁来初始化sync变量
     */
    protected WriteLock(ReentrantReadWriteLock lock) {
        sync = lock.sync;
    }

    /**
     * 阻塞的方式获取写锁
     */
    public void lock() {
        sync.acquire(1);
    }

    /**
     * 中断的方式获取写锁
     */
    public void lockInterruptibly() throws InterruptedException {
        sync.acquireInterruptibly(1);
    }

    /**
     * 尝试获取写锁
     */
    public boolean tryLock( ) {
        return sync.tryWriteLock();
    }

    /**
     * 超时尝试获取写锁
     */
    public boolean tryLock(long timeout, TimeUnit unit)
            throws InterruptedException {
        return sync.tryAcquireNanos(1, unit.toNanos(timeout));
    }

    /**
     * 释放写锁
     */
    public void unlock() {
        sync.release(1);
    }
}

接下来,我们重点看一下在公平锁和非公平锁下Sync.tryAcquireSync.tryReleaseSync.tryWriteLock这几个核心方法是如何实现写锁的功能

Sync.tryAcquire方法

Sync.tryAcquire方法的逻辑比较简单,就直接在代码中注释,具体如下:

protected final boolean tryAcquire(int acquires) {
    Thread current = Thread.currentThread();
    // 读写锁的次数
    int c = getState();
    // 写锁的次数
    int w = exclusiveCount(c);
    /*
     * 如果读写锁的次数不为0,说明锁可能有以下3中情况:
     * 1、全部是读线程占用资源
     * 2. 全部是写线程占用资源
     * 3. 读写线程都占用了资源(锁降级:持有写锁的线程可以去持有读锁),但是读写线程都是同一个线程
     */
    if (c != 0) {
        // 写线程不占用资源,第一个获取锁的线程也不是当前线程,直接获取失败
        if (w == 0 || current != getExclusiveOwnerThread())
            return false;
        // 检查获取写锁的线程是否超过了最大的重入次数
        if (w + exclusiveCount(acquires) > MAX_COUNT)
            throw new Error("Maximum lock count exceeded");
        // 修改state的状态,之所以没有用CAS操作来修改,是因为写线程只有一个,是独占的
        setState(c + acquires);
        return true;
    }
    /*
     * 写线程是第一个竞争锁资源的线程
     * 如果写线程需要等待(公平锁的情况),或者
     * 写线程的state设置失败,直接返回false
     */
    if (writerShouldBlock() ||
        !compareAndSetState(c, c + acquires))
        return false;
    // 设置当前线程为owner
    setExclusiveOwnerThread(current);
    return true;
}

Sync.tryRelease方法

Sync.tryRelease方便的代码很简单,直接看代码中的注释

protected final boolean tryRelease(int releases) {
    // 如果释放锁的线程不持有锁,返回失败
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    // 获取写锁的重入的次数
    int nextc = getState() - releases;
    // 如果次数为0,需要释放锁的owner
    boolean free = exclusiveCount(nextc) == 0;
    if (free)
        setExclusiveOwnerThread(null); // 释放锁的owner
    setState(nextc);
    return free;
}

Sync.tryWriteLock方法

Sync.tryWriteLock这个方法也比较简单,就直接上代码了

final boolean tryWriteLock() {
    Thread current = Thread.currentThread();
    int c = getState();
    // 读锁或者写锁已经被线程持有
    if (c != 0) {
        int w = exclusiveCount(c);
        // 写锁第一次获取锁或者当前线程不是第一次获取写锁的线程(也就是不是owner),直接失败
        if (w == 0 || current != getExclusiveOwnerThread())
            return false;
        // 超出写锁的最大次数,直接失败
        if (w == MAX_COUNT)
            throw new Error("Maximum lock count exceeded");
    }
    // 竞争写锁的线程修改state,
    // 如果成功将自己设置成锁的owner,
    // 如果失败直接返回
    if (!compareAndSetState(c, c + 1))
        return false;
    setExclusiveOwnerThread(current); // 设置当前线程持有锁
    return true;
}

总结

  • 读锁和写锁的占用(重入)次数都是共用state字段,高位记录读锁,地位记录写锁,所以读锁和写锁的最大占用次数为2^16
  • 读锁和写锁都是可重入的
  • 读锁是共享锁,允许多个线程获取
  • 写锁是排他锁,只允许一个线程获取
  • 一个线程获取了读锁,在非公平锁的情况下,其他等待获取读锁的线程都可以尝试获取读锁,在公平锁的情况下,按照AQS同步队列的顺利来获取,如果队列前面有一个等待写锁的线程在排队,则后面所有等待获取读锁的线程都将无法获取读锁
  • 获取读锁的线程,不能再去申请获取写锁
  • 一个获取了写锁的线程,在持有锁的时候可以去申请获取读锁,在释放写锁以后,还会继续持有读锁,这就是所谓的锁降级
  • 读锁无法升级为写锁,原因是获取读锁的线程可能是多个,而写锁是独占的,不能多个线程持有,也就是说不支持锁升级

以上是关于深入浅出ReentrantReadWriteLock源码解析的主要内容,如果未能解决你的问题,请参考以下文章

深入浅出的分析 Set集合

深入浅出Vue.js--变化侦测

深入浅出node.js

【Gradle深入浅出】——Gradle配置(一)

深入浅出MyBatis

深入浅出RxJava