JDK类库源码分析系列2-AbstractQueuedSynchronizer-ReentrantReadWriteLock

Posted _微风轻起

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了JDK类库源码分析系列2-AbstractQueuedSynchronizer-ReentrantReadWriteLock相关的知识,希望对你有一定的参考价值。

一、简介说明

​ 这一篇我们来介绍下AbstractQueuedSynchronizer的另一个应用类ReentrantReadWriteLock。与之对应的是类似前面一篇的ReentrantLock,不过ReentrantLock其是以独占的模式获取锁,目前我们这个ReentrantReadWriteLock是有两用的,一种是读锁,也就是共享锁,另一种是Write写锁,也就是独占锁。

​ 下面我们就来具体梳理下这个类。

​ 这里我们可以通过一个很简单的例子来梳理逻辑。

public class ReentrantWriteReadLockMain {

    public static void main(String[] args) {

        ReentrantReadWriteLock reentrantReadWriteLock = new ReentrantReadWriteLock();
        ReentrantReadWriteLock.ReadLock readLock = reentrantReadWriteLock.readLock();
        ReentrantReadWriteLock.WriteLock writeLock = reentrantReadWriteLock.writeLock();
        new Thread(new Runnable() {
            @Override
            public void run() {
                readLock.lock();
                try {
                    Thread.sleep(5000L);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();
        new Thread(new Runnable() {
            @Override
            public void run() {
                readLock.lock();
                try {
                    Thread.sleep(5000L);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();
        new Thread(new Runnable() {
            @Override
            public void run() {
                readLock.lock();
                try {
                    Thread.sleep(5000L);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();

        new Thread(new Runnable() {
            @Override
            public void run() {
                readLock.lock();
                try {
                    Thread.sleep(5000L);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();
        System.out.println("Hello ...........");
    }

}

​ 在这个基础上,可以看自己想梳理什么过程,再进行相应的变动。

一、基本结构

1、ReentrantReadWriteLock

public class ReentrantReadWriteLock
        implements ReadWriteLock, java.io.Serializable {

​ 其有实现ReadWriteLock接口。

2、ReadWriteLock

public interface ReadWriteLock {

    Lock readLock();

    Lock writeLock();
}

​ 这个接口其定义了两个方法,一个获取读锁,一个获取写锁。

二、变量

1、readerLock

private final ReentrantReadWriteLock.ReadLock readerLock;

​ 读锁对象。

2、writerLock

private final ReentrantReadWriteLock.WriteLock writerLock;

​ 写锁对象。

3、sync

final Sync sync;

Sync同步对象。

三、Sync子类

1、结构

abstract static class Sync extends AbstractQueuedSynchronizer {

​ 其也是直接继承的AQS

2、变量

1)、计算值

static final int SHARED_SHIFT   = 16;
static final int SHARED_UNIT    = (1 << SHARED_SHIFT);
static final int MAX_COUNT      = (1 << SHARED_SHIFT) - 1;
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;

/** Returns the number of shared holds represented in count. */
static int sharedCount(int c)    { return c >>> SHARED_SHIFT; }
/** Returns the number of exclusive holds represented in count. */
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }

​ 我们将这4个变量与两个获取值的方法放在一起,这里的SHARED_SHIFT是一个转换基数。

SHARED_UNIT这个是被ReadLock读锁用来计数的,也就是设置status的值,每个获取锁的线程分别计数。其的初始值是1 << SHARED_SHIFT,也就是往前移动16未,然后其是往前高位累加的,也就是说其初始值是1 << SHARED_SHIFT,然后同一线程获取第二次就是SHARED_UNIT + SHARED_UNIT,也就是高位+1,这里要理解,然后要获取其的计算就是sharedCount(int c)方法,也就是c >>> SHARED_SHIFT将其再往后移16位。

MAX_COUNT是单个线程最大的获取status

EXCLUSIVE_MASK这个是用于独占锁计数的,其的初始值是c & EXCLUSIVE_MASK,也就是其从低位到高位都是1,然后其status的获取是通过exclusiveCount(int c)(c & EXCLUSIVE_MASK),这里是&运算,其是从低位0到高位。

这里用左移、右移、且这些计算是能加快速度,然后共享与独占一个高位计数一个低位,这样就能通过status完成两种模式即ReadWrite的总计数了。

2)、readHolds

private transient ThreadLocalHoldCounter readHolds;

​ 这个就是前面的ThreadLocalHoldCounter,被读锁使用,如果线程的status为0,就会将其remove

3)、cachedHoldCounter

private transient HoldCounter cachedHoldCounter;

​ 缓存值,指向最后使用的线程对应的HoldCounter

4)、firstReader

private transient Thread firstReader;

​ 指向队首的线程。

5)、firstReaderHoldCount

private transient int firstReaderHoldCount;

​ 队首的计数。

3、方法

1)、readerShouldBlock()

abstract boolean readerShouldBlock();

​ 读是否需要被阻塞。

2)、writerShouldBlock()

abstract boolean writerShouldBlock();

​ 写是否需要被阻塞。

3)、tryAcquire(int acquires)

protected final boolean tryAcquire(int acquires) {
    Thread current = Thread.currentThread();
    int c = getState();
    int w = exclusiveCount(c);
    if (c != 0) {
        // (Note: if c != 0 and w == 0 then shared count != 0)
        if (w == 0 || current != getExclusiveOwnerThread())
            return false;
        if (w + exclusiveCount(acquires) > MAX_COUNT)
            throw new Error("Maximum lock count exceeded");
        // Reentrant acquire
        setState(c + acquires);
        return true;
    }
    if (writerShouldBlock() ||
        !compareAndSetState(c, c + acquires))
        return false;
    setExclusiveOwnerThread(current);
    return true;
}

​ 尝试获取独占锁,整体与我们前面文章的流程差不多。

​ 这里是先判断c != 0也就是有没有已经获取锁,如果有就是这个分支:判断w == 0,这里我们要注意为什么c !=0w == 0,因为我们有提到,高位计数是用于共享的,而低位才是用于独占的,这个分支就是如果共享有获取,但目前独占锁还没有获取、或当前线程并不是获取锁的线程就返回false获取失败。

​ 如果独占已经有获取了,并且目前独占线程就是当前线程,判断是否有超过最大限制MAX_COUNT,没有超过就通过setState(c + acquires)设置status。返回获取成功。

​ 如果最开始c ==0,也就是共享独占都没有获取。先通过writerShouldBlock()判断是否阻塞,如果true或者CAS设置status竞争失败,我们返回false。如果不满足这些条件,就表示我们CAS设置status成功了,就可以设置独占线程为当前线程了,因为我们是最开始获取的c = = 0

4)、tryRelease(int releases)

protected final boolean tryRelease(int releases) {
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    int nextc = getState() - releases;
    boolean free = exclusiveCount(nextc) == 0;
    if (free)
        setExclusiveOwnerThread(null);
    setState(nextc);
    return free;
}

​ 尝试释放锁,首先是通过isHeldExclusively(),判断当前线程是不是占有线程,然后判断本次释放后status释放为0,如果是就表示释放完成,就通过setExclusiveOwnerThread(null)将独占线程置为null,最后通过setState(nextc)设置status

protected final boolean isHeldExclusively() {
    return getExclusiveOwnerThread() == Thread.currentThread();
}

5)、tryAcquireShared(int unused)

protected final int tryAcquireShared(int unused) {
    Thread current = Thread.currentThread();
    int c = getState();
    if (exclusiveCount(c) != 0 &&
        getExclusiveOwnerThread() != current)
        return -1;
    int r = sharedCount(c);
    if (!readerShouldBlock() &&
        r < MAX_COUNT &&
        compareAndSetState(c, c + SHARED_UNIT)) {
        if (r == 0) {
            firstReader = current;
            firstReaderHoldCount = 1;
        } else if (firstReader == current) {
            firstReaderHoldCount++;
        } else {
            HoldCounter rh = cachedHoldCounter;
            if (rh == null ||
                rh.tid != LockSupport.getThreadId(current))
                cachedHoldCounter = rh = readHolds.get();
            else if (rh.count == 0)
                readHolds.set(rh);
            rh.count++;
        }
        return 1;
    }
    return fullTryAcquireShared(current);
}

​ 这里我们首先明白:

if (exclusiveCount(c) != 0 &&
    getExclusiveOwnerThread() != current)
    return -1;

​ 这里的exclusiveCount(c) != 0,也就是独占不为0(共享是高位,则低位&运算必定为0)。也就是说,如果有独占写锁,并且这个写锁线程不是当前线程,则返回-1表示获取失败。如果当前ReentrantReadWriteLock还没有用到写锁,则就往后继续。

​ 判断如果不需要readerShouldBlock()并且目前的共享计数sharedCount(c)没有超过MAX_COUNT,就能通过compareAndSetState(c, c + SHARED_UNIT)设置status了,c + SHARED_UNIT就是高位计数,用于共享读锁计数统计。然后如果设置成功,则判断,如果其sharedCount(c)0,也就是还没有共享获取,就将其设置为firstReader指向头,firstReaderHoldCount设置为1。如果其不为0,也就是现在有共享获取,就判断头线程是不是当前线程,是的话就自增。如果也不是本身,就获取本次的上次计数缓存cachedHoldCounter,如果最开始为null(rh == null),或者不是当前线程,则通过readHolds(ThreadLocal)获取到当前线程的HoldCounter,然后计数rh.count++,最后返回1表示获取成功。

​ 同时我们看到上面这些是单个流程,如果这个单个获取失败,例如CAS设置status获取失败,就通过下面的fullTryAcquireShared(current)来满获取,也就是for(;;)

6)、fullTryAcquireShared(Thread current)

final int fullTryAcquireShared(Thread current) {
    HoldCounter rh = null;
    for (;;) {
        int c = getState();
        if (exclusiveCount(c) != 0) {
            if (getExclusiveOwnerThread() != current)
                return -1;
            // else we hold the exclusive lock; blocking here
            // would cause deadlock.
        } else if (readerShouldBlock()) {
            // Make sure we're not acquiring read lock reentrantly
            if (firstReader == current) {
                // assert firstReaderHoldCount > 0;
            } else {
                if (rh == null) {
                    rh = cachedHoldCounter;
                    if (rh == null ||
                        rh.tid != LockSupport.getThreadId(current)) {
                        rh = readHolds.get();
                        if (rh.count == 0)
                            readHolds.remove();
                    }
                }
                if (rh.count == 0)
                    return -1;
            }
        }
        if (sharedCount(c) == MAX_COUNT)
            throw new Error("Maximum lock count exceeded");
        if (compareAndSetState(c, c + SHARED_UNIT)) {
            if (sharedCount(c) == 0) {
                firstReader = current;
                firstReaderHoldCount = 1;
            } else if (firstReader == current) {
                firstReaderHoldCount++;
            } else {
                if (rh == null)
                    rh = cachedHoldCounter;
                if (rh == null ||
                    rh.tid != LockSupport.getThreadId(current))
                    rh = readHolds.get();
                else if (rh.count == 0)
                    readHolds.set(rh);
                rh.count++;
                cachedHoldCounter = rh; // cache for release
            }
            return 1;
        }
    }
}

​ 这个大概的与上面是相似的,主要是for (;;),这里就不具体分析了。

7)、tryReleaseShared(int unused)

protected final boolean tryReleaseShared(int unused) {
    Thread current = Thread.currentThread();
    if (firstReader == current) {
        // assert firstReaderHoldCount > 0;
        if (firstReaderHoldCount == 1)
            firstReader = null;
        else
            firstReaderHoldCount--;
    } else {
        HoldCounter rh = cachedHoldCounter;
        if (rh == null ||
            rh.tid != LockSupport.getThreadId(current))
            rh = readHolds.get();
        int count = rh.count;
        if (count <= 1) {
            readHolds.remove();
            if (count <= 0)
                throw unmatchedUnlockException();
        }
        --rh.count;
    }
    for (;;) {
        int c = getState();
        int nextc = c - SHARED_UNIT;
        if (compareAndSetState(c, nextc))
            // Releasing the read lock has no effect on readers,
            // but it may allow waiting writers to proceed if
            // both read and write locks are now free.
            return nextc == 0;
    }
}

​ 共享释放,这个流程也比较简单,如果头线程是当前线程,当firstReaderHoldCount计数为1,就表示其已经要释放完了,就置为null,不是就自减。

​ 不是当前线程。就是cachedHoldCounterreadHolds使用,也是判断是否需要readHolds.remove(),并--rh.count。最后再是for(;;)设置status

8)、tryWriteLock()

final boolean tryWriteLock() {
    Thread current = Thread.currentThread();
    int c = getState();
    if (c != 0) {
        int w = exclusiveCount(c);
        if (w == 0 || current != getExclusiveOwnerThread())
            return false;
        if (w == MAX_COUNT)
            throw new Error("Maximum lock count exceeded");
    }
    if (!compareAndSetState(c, c + 1))
        return false;
    setExclusiveOwnerThread(current);
    return true;
}

​ 尝试写锁获取,写锁是独占锁,所以如果current != getExclusiveOwnerThread(),或还没有独占,就返回false。如果为status == 0,就CAS设置status如果成功就设置当前线程为独占线程,如果失败就false

9)、tryReadLock()

final boolean tryReadLock() {
    Thread current = Thread.currentThread();
    for (;;) {
        int c = getState();
        if (exclusiveCount(c) != 0 &&
            getExclusiveOwnerThread() != current)
            return false;
        int r = sharedCount(c);
        if (r == MAX_COUNT)
            throw new Error("Maximum lock count exceeded");
        if (compareAndSetState(c, c + SHARED_UNIT)) {
            if (r == 0) {
                firstReader = current;
                firstReaderHoldCount = 1;
            } else if (firstReader == current) {
                firstReaderHoldCount++;
            } else {
                HoldCounter rh = cachedHoldCounter;
                if (rh == null ||
                    rh.tid != LockSupport.getThreadId(current))
                    cachedHoldCounter = rh = readHolds.get();
                else if (rh.count == 0)
                    readHolds.set(rh);
                rh.count++;
            }
            return true;
        }
    }
}

​ 尝试读锁获取,这里大体与上面的tryAcquireSh

以上是关于JDK类库源码分析系列2-AbstractQueuedSynchronizer-ReentrantReadWriteLock的主要内容,如果未能解决你的问题,请参考以下文章

JDK类库源码分析系列2-AbstractQueuedSynchronizer-CountDownLatch

JDK类库源码分析系列2-AbstractQueuedSynchronizer-ReentrantReadWriteLock

JDK类库源码分析系列2-AbstractQueuedSynchronizer-Semaphore

JDK源码分析实战系列-ThreadLocal

源码分析系列1:HashMap源码分析(基于JDK1.8)

《JDK源码分析》相关系列目录(JAVA 小虚竹)