JUC系列LOCK框架系列之七 核心锁类之ReentrantReadWriteLock

Posted 顧棟

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了JUC系列LOCK框架系列之七 核心锁类之ReentrantReadWriteLock相关的知识,希望对你有一定的参考价值。

ReentrantReadWriteLock

文章目录

什么是读写锁

读写锁支持支持多个线程同时读,但是当一个写线程访问的时候会阻塞其他所有写和读的线程。

读写锁同时拥有写锁和读锁,通过分离读写锁提高程序的执行效率,在读多写少的场景中,并发性能比单一的排他锁有明显提升。

核心实现思想

读写锁内部重新定义了AQS中的同步状态变量的state。从低16位(0-15)用来记录写锁,用高16位(16-31)记录读锁。写锁的16位可以用来表示写锁被重入的次数。读锁的16位既可以表示一个读线程的重入次数,也可以表示多少个线程获得了读锁。state==0表示既无写锁又无读锁。

        // 共享式的位移量 读锁的使用的位数
        static final int SHARED_SHIFT   = 16;
        // 将1左移16位,结果作为,高位读锁(共享)的计算基本单位 65536
        static final int SHARED_UNIT    = (1 << SHARED_SHIFT);
        // 锁的最大值 65535
        static final int MAX_COUNT      = (1 << SHARED_SHIFT) - 1;
        // 写(独占)锁的掩码 65535
        static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
		// 通过无符号右移快速查询读锁个数
		static int sharedCount(int c)     return c >>> SHARED_SHIFT; 
		// 通过位与运算快速判断写锁个数
		static int exclusiveCount(int c)  return c & EXCLUSIVE_MASK; 
                  -------------------  -------------------
              :   0000 0000 0000 0000  0000 0000 0000 0001
SHARED_UNIT   :   0000 0000 0000 0001  0000 0000 0000 0000
EXCLUSIVE_MASK:   0000 0000 0000 0000  1111 1111 1111 1111

计算演示

sharedCount(65536) ,通过以下计算得知:此时的读锁个数是1个。

-------------------  -------------------
0000 0000 0000 0001  0000 0000 0000 0000   // 65536的二进制
0000 0000 0000 0000  0000 0000 0000 0001   // 65536>>>16的值为1

exclusiveCount(5),通过以下计算得知:此时的读锁个数是5个。

-------------------  -------------------
0000 0000 0000 0000  0000 0000 0000 0101   // 5的二进制
0000 0000 0000 0000  1111 1111 1111 1111   // EXCLUSIVE_MASK 65535
0000 0000 0000 0000  0000 0000 0000 0101   // 5 & 65535 = 5

读写锁的特性

特性说明
公平性支持公平锁与非公平锁,默认非公平模式,吞吐量还是非公平高于公平
重入性该锁支持重进入,以读写线程为例:读线程在获取了读锁之后,能够再次获取读锁。而写线程在获取写锁之后,能够再次获取写锁,同时可以获取读锁
锁降级遵循读写锁、获取读锁在释放写锁的次序,写锁能够降级为读锁

读写锁的组成

类图

构造函数

在构造函数中会去构造ReadLock和WriteLock,WriteLock和ReadLock使用的是同一个AQS对象(Sync),这样就具备AQS的特性且互斥。

    /**
    * 无参构造函数 默认采用非公平策略
    */
	public ReentrantReadWriteLock() 
        this(false);
    

    /**
    * 含参构造函数 false非公平策略 true公平策略
    */
    public ReentrantReadWriteLock(boolean fair) 
        sync = fair ? new FairSync() : new NonfairSync();
        readerLock = new ReadLock(this);
        writerLock = new WriteLock(this);
    

	// 读锁
    public static class ReadLock implements Lock, java.io.Serializable 
      
        private final Sync sync;

        protected ReadLock(ReentrantReadWriteLock lock) 
            sync = lock.sync;
        
    

	// 写锁
	public static class WriteLock implements Lock, java.io.Serializable 
      
        private final Sync sync;

        protected WriteLock(ReentrantReadWriteLock lock) 
            sync = lock.sync;
        
    

内部类Sync

构造方法

主要是对读锁计数器的初始化。

        Sync() 
            // 初始化本地线程计数器
            readHolds = new ThreadLocalHoldCounter();
            setState(getState()); // ensures visibility of readHolds
        

		// 利用ThreadLocal来存储本地线程计数器
        static final class ThreadLocalHoldCounter
            extends ThreadLocal<HoldCounter> 
            public HoldCounter initialValue() 
                return new HoldCounter();
            
        
		// 本地线程计数器 count用来存储数量 tid代表线程的ID值
        static final class HoldCounter 
            int count = 0;
            // Use id, not reference, to avoid garbage retention
            final long tid = getThreadId(Thread.currentThread());
        
主要成员
        // 共享式的位移量 读锁的使用的位数
        static final int SHARED_SHIFT   = 16;
        // 将1左移16位,结果作为,高位读锁(共享)的计算基本单位
        static final int SHARED_UNIT    = (1 << SHARED_SHIFT);
        // 读(共享)锁的最大值
        static final int MAX_COUNT      = (1 << SHARED_SHIFT) - 1;
        // 写(独占)锁的最大值
        static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
 		// 本地线程计数器
        private transient ThreadLocalHoldCounter readHolds;
        // 第一个读线程
        private transient Thread firstReader = null;
        // 第一个读线程的计数
        private transient int firstReaderHoldCount;
        // 缓存的计数器
        private transient HoldCounter cachedHoldCounter;
主要方法
方法名说明
int sharedCount(int c)
int exclusiveCount(int c)
abstract boolean readerShouldBlock();读请求时是否阻塞由公平和非公平去实现。公平模式下,判断是否存在队列且在其他结点之后,如果有前驱结点,则返回true,阻塞获取读锁。非公平模式下,如果其他线程获取了写锁,则返回true 阻塞当前线程获取读锁。
abstract boolean writerShouldBlock();写请求时是否阻塞由公平和非公平去实现。公平模式下,判断是否存在队列且在其他结点之后,如果有前驱结点,则返回true,阻塞获取写锁。非公平模式下,直接不阻塞,返回false。
final boolean tryRelease(int releases)
final boolean tryAcquire(int acquires)
final boolean tryReleaseShared(int unused)
final int tryAcquireShared(int unused)
final int fullTryAcquireShared(Thread current)
final boolean tryWriteLock()
final boolean tryReadLock()
final int getReadLockCount()返回当前读锁被获取的次数。该次数不等于获取读锁的线程数,一个线程连续获取了(重入)n次读书,那么这个方法的返回值是n,但是线程数只有1。
final boolean isWriteLocked()判断写锁是否已经被获取
final int getWriteHoldCount()返回当前写锁被获取的次数
final int getReadHoldCount()返回当前线程获取读锁的次数。
final int getCount()
Sync完整源码
    abstract static class Sync extends AbstractQueuedSynchronizer 
        private static final long serialVersionUID = 6317671515068378041L;

        /*
         * Read vs write count extraction constants and functions.
         * Lock state is logically divided into two unsigned shorts:
         * The lower one representing the exclusive (writer) lock hold count,
         * and the upper the shared (reader) hold count.
         * 锁的状态在逻辑上使用两个unsigned shorts:低位的表示独占(写入)锁保持计数,高位的表示共享(读取)锁保持计数。简单来说就是使用32位逻辑上被拆分成了两个16位,高16位存储读锁的状态 低32位存储写锁的状态
         */

        // 共享式的位移量 读锁的使用的位数
        static final int SHARED_SHIFT   = 16;
        // 将1左移16位,结果作为,高位读锁(共享)的计算基本单位
        static final int SHARED_UNIT    = (1 << SHARED_SHIFT);
        // 锁的最大值
        static final int MAX_COUNT      = (1 << SHARED_SHIFT) - 1;
        // 写(独占)锁的掩码 用来快速计算写锁
        static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;

        /** Returns the number of shared holds represented in count  占有读锁的线程数量*/
        static int sharedCount(int c)     return c >>> SHARED_SHIFT; 
        /** Returns the number of exclusive holds represented in count  占有写锁的线程数量*/
        static int exclusiveCount(int c)  return c & EXCLUSIVE_MASK; 

        /**
         * A counter for per-thread read hold counts.
         * Maintained as a ThreadLocal; cached in cachedHoldCounter
         */
        static final class HoldCounter 
            int count = 0;
            // 使用线程ID,而不是引用,避免垃圾无法回收
            final long tid = getThreadId(Thread.currentThread());
        

        /**
         * ThreadLocal subclass. Easiest to explicitly define for sake
         * of deserialization mechanics.
         */
        static final class ThreadLocalHoldCounter
            extends ThreadLocal<HoldCounter> 
            public HoldCounter initialValue() 
                return new HoldCounter();
            
        

        /**
         * The number of reentrant read locks held by current thread.
         * Initialized only in constructor and readObject.
         * Removed whenever a thread's read hold count drops to 0.
         // 本地线程计数器
         */
        private transient ThreadLocalHoldCounter readHolds;

        /**
         * The hold count of the last thread to successfully acquire
         * readLock. This saves ThreadLocal lookup in the common case
         * where the next thread to release is the last one to
         * acquire. This is non-volatile since it is just used
         * as a heuristic, and would be great for threads to cache.
         *
         * <p>Can outlive the Thread for which it is caching the read
         * hold count, but avoids garbage retention by not retaining a
         * reference to the Thread.
         *
         * <p>Accessed via a benign data race; relies on the memory
         * model's final field and out-of-thin-air guarantees.
         // 缓存的计数器
         */
        private transient HoldCounter cachedHoldCounter;

        /**
         * firstReader is the first thread to have acquired the read lock.
         * firstReaderHoldCount is firstReader's hold count.
         *
         * <p>More precisely, firstReader is the unique thread that last
         * changed the shared count from 0 to 1, and has not released the
         * read lock since then; null if there is no such thread.
         *
         * <p>Cannot cause garbage retention unless the thread terminated
         * without relinquishing its read locks, since tryReleaseShared
         * sets it to null.
         *
         * <p>Accessed via a benign data race; relies on the memory
         * model's out-of-thin-air guarantees for references.
         *
         * <p>This allows tracking of read holds for uncontended read
         * locks to be very cheap.
         */
        // 第一个读线程
        private transient Thread firstReader = null;
        // 第一个读线程的计数
        private transient int firstReaderHoldCount;

        Sync() 
            readHolds = new ThreadLocalHoldCounter();
            // 设置AQS的状态
            setState(getState()); // ensures visibility of readHolds
        

        /*
         * Acquires and releases use the same code for fair and
         * nonfair locks, but differ in whether/how they allow barging
         * when queues are non-empty.
         */

        /**
         * Returns true if the current thread, when trying to acquire
         * the read lock, and otherwise eligible to do so, should block
         * because of policy for overtaking other waiting threads.
         */
        abstract boolean readerShouldBlock();

        /**
         * Returns true if the current thread, when trying to acquire
         * the write lock, and otherwise eligible to do so, should block
         * because of policy for overtaking other waiting threads.
         */
        abstract boolean writerShouldBlock();

        /*
         * Note that tryRelease and tryAcquire can be called by
         * Conditions. So it is possible that their arguments contain
         * both read and write holds that are all released during a
         * condition wait and re-established in tryAcquire.
         */
        protected final boolean tryRelease(int releases) 
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            int nextc = getState() - releases;
            boolean free = exclusiveCount(nextc) == 0;
            if (free)
                setExclusiveOwnerThread(null);
            setState(nextc);
            return free;
        

        protected final boolean tryAcquire(int acquires) 
            /*
             * Walkthrough:
             * 1. If read count nonzero or write count nonzero
             *    and owner is a different thread, fail.
             * 2. If count would saturate, fail. (This can only
             *    happen if count is already nonzero.)
             * 3. Otherwise, this thread is eligible for lock if
             *    it is either a reentrant acquire or
             *    queue policy allows it. If so, update state
             *    and set owner.
             */
            Thread current = Thread.currentThread();
            int c = getState();
            int w = exclusiveCount(c);
            if (c != 0) 
                // (Note: if c != 0 and w == 0 then shared count != 0)
                if (w == 0 || current != getExclusiveOwnerThread())
                    return false;
                if (w + exclusiveCount(acquires) > MAX_COUNT)
                    throw new Error("Maximum lock count exceeded");
                // Reentrant acquire
                setState(c + acquires);
                return true;
            
            if (writerShouldBlock() ||
                !compareAndSetState(c, c + acquires))
                return false;
            setExclusiveOwnerThread(current);
            return true;
        

        protected final boolean tryReleaseShared(int unused) 
            Thread current = Thread.currentThread();
            if (firstReader == current) 
                // assert firstReaderHoldCount > 0;
                if (firstReaderHoldCount == 1)
                    firstReader = null;
                else
                    firstReaderHoldCount--;
             else 
                HoldCounter rh = cachedHoldCounter;
                if (rh == null || rh.tid != getThreadId(current))
                    rh = readHolds.get();
                int count = rh.count;
                if (count <= 1) 
                    readHolds.remove();
                    if (count <= 0)
                        throw unmatchedUnlockException();
                
                --rh.count;
            
            for (;;) 
                int c = getState();
                int nextc = c - SHARED_UNIT;
                if (compareAndSetState(c, nextc))
                    // Releasing the read lock has no effect on readers,
                    // but it may allow waiting writers to proceed if
                    // both read and write locks are now free.
                    return nextc == 0;
            
        

        private IllegalMonitorStateException unmatchedUnlockException() 
            return new IllegalMonitorStateException(
                "attempt to unlock read lock, not locked by current thread");
        

        protected final int tryAcquireShared(int unused) 
            /*
             * Walkthrough:
             * 1. If write lock held by another thread, fail.
             * 2. Otherwise, this thread is eligible for
             *    lock wrt state, so ask if it should block
             *    because of queue policy. If not, try
             *    to grant by CASing state and updating count.
             *    Note that step does not check for reentrant
             *    acquires, which is postponed to full version
             *    to avoid having to check hold count in
             *    the more typical non-reentrant case.
             * 3. If step 2 fails either because thread
             *    apparently not eligible or CAS fails or count
             *    saturated, chain to version with full retry loop.
             */
            Thread current = Thread.currentThread();
            int c = getState();
            if (exclusiveCount(c) != 0 &&
                getExclusiveOwnerThread() != current)
                return -1;
            int r = sharedCount(c);
            if (!readerShouldBlock() &&
                r < MAX_COUNT &&
                compareAndSetState(c, c + SHARED_UNIT)) 
                if (r == 0) 
                    firstReader = current;
                    firstReaderHoldCount = 1;
                 else if (firstReader == current) 
                    firstReaderHoldCount++;
                 else 
                    HoldCounter rh = cachedHoldCounter;
                    if (rh == null || rh.tid != getThreadId(current))
                        cachedHoldCounter = rh = readHolds.get();
                    else if (rh.count == 0)
                        readHolds.set(rh);
                    rh.count++;
                
                return 1;
            
            return fullTryAcquireShared(current);
        

        /**
         * Full version of acquire for reads, that handles CAS misses
         * and reentrant reads not dealt with in tryAcquireShared.
         */
        final int fullTryAcquireShared(Thread current) 
            /*
             * This code is in part redundant with that in
             * tryAcquireShared but is simpler overall by not
             * complicating tryAcquireShared with interactions between
             * retries and lazily reading hold counts.
             */
            HoldCounter rh = null;
            for (;;) 
                int c = getState();
                if (exclusiveCount(c) != 0) 
                    if (getExclusiveOwnerThread() != current)
                        return -1;
                    // else we hold the exclusive lock; blocking here
                    // would cause deadlock.
                 else if (readerShouldBlock()) 
                    // Make sure we're not acquiring read lock reentrantly
                    if (firstReader == current) 
                        // assert firstReaderHoldCount > 0;
                     else 
                        if (rh == null) 
                            rh = cachedHoldCounter;
                            if (rh == null || rh.tid != getThreadId(current)) 
                                rh = readHolds.get();
                                if (rh.count == 0)
                                    readHolds.remove();
                            
                        
                        if (rh.count == 0)
                            return -1;
                    
                
                if (sharedCount(c) == MAX_COUNT)
                    throw new Error("Maximum lock count exceeded");
                if (compareAndSetState(c, c + SHARED_UNIT)) 
                    if (sharedCount(c) == 0) 
                        firstReader = current;
                        firstReaderHoldCount = 1;
                     else if (firstReader == current) 
                        firstReaderHoldCount++;
                     else 
                        if (rh == null)
                            rh = cachedHoldCounter;
                        if (rh == null || rh.tid != getThreadId(current))
                            rh = readHolds.get();
                        else if<

以上是关于JUC系列LOCK框架系列之七 核心锁类之ReentrantReadWriteLock的主要内容,如果未能解决你的问题,请参考以下文章

JUC系列LOCK框架系列之五 核心锁类AbstractQueuedSynchonizer

JUC系列同步工具类之CyclicBarrier

JUC系列LOCK框架系列之三 同步工具类 CountDownLatch

JUC系列LOCK框架系列之四 同步工具类Semaphore

JUC系列同步工具类之ThreadLocal

JUC的LOCK框架系列一LOCK框架预览