jdk1.8 J.U.C并发源码阅读------ReentrantReadWriteLock源码解析

Posted Itzel_yuki

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了jdk1.8 J.U.C并发源码阅读------ReentrantReadWriteLock源码解析相关的知识,希望对你有一定的参考价值。

一、继承关系

public class ReentrantReadWriteLock implements ReadWriteLock, java.io.Serializable
实现了ReadWriteLock接口和Serializable接口
ReadWriteLock接口内部包含两个抽象方法:

Lock readLock();//返回一个读锁(共享锁)
Lock writeLock();//返回一个写锁(独占锁)
说明:
(1)ReentrantReadWriteLock中包含了读锁和写锁,分别有公平和非公平两种实现。
(2)该类中包含一个继承了AQS的Sync类型的对象,Sync抽象类有两个子类NonfairSync和FairSync,分别实现公平和非公平两种方式。在new ReentrantReadWriteLock对象时可以传递一个boolean类型的参数指定是读写锁是采用公平还是非公平策略实现。
(3)readLock和writeLock中都包含一个成员变量sync,lock和unlock都是通过调用sync对象的相应方法实现。

二、成员变量

/** Inner class providing readlock */
private final ReentrantReadWriteLock.ReadLock readerLock;//读锁
/** Inner class providing writelock */
private final ReentrantReadWriteLock.WriteLock writerLock;//写锁
/** Performs all synchronization mechanics */
final Sync sync;//AQS子类

三、内部类

(1)Sync:继承自AQS
总结:
写线程获取锁成功情况:
(1)有线程独占锁(可能该线程还共享的占有锁),且该线程是自身,这时写线程获取锁成功。
(2)没有线程占有锁,锁处于空闲状态,此时该写线程可能占有锁成功。
(3)当只有读线程占有锁(无论是否是自身),或者有线程独占锁但不是自身,这两种情况获取锁失败。
读线程获取锁成功情况:
(1)有写线程占有锁,但该写线程是自身,获取锁成功
(2)没有写线程占有锁,获取锁成功
(3)当有写线程占有锁,且不是自身时,获取锁失败。
关于锁降级:写锁--->读锁
当该线程的写线程占有锁,该线程又想以读方式占有锁,此时获取成功,该线程的分别以读和写的方式占有锁,若此时,该线程写方式释放了锁,则此时锁由写锁降级成了读锁。(顺序:写--读---写释放)

abstract static class Sync extends AbstractQueuedSynchronizer 
        private static final long serialVersionUID = 6317671515068378041L;
		//由于AQS中state只有一个变量保存锁的数目,这里有两种锁的数目需要保存,故采用将state划分为高16位和低16位,高位保存共享锁的数目,低位保存独占锁的数目
        static final int SHARED_SHIFT   = 16;
        static final int SHARED_UNIT    = (1 << SHARED_SHIFT);//1左移16位为0x100,共享锁的最小单位
        static final int MAX_COUNT      = (1 << SHARED_SHIFT) - 1;//共享锁和独占锁的最大数目为0xff
        static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;//独占锁的掩码

        /** Returns the number of shared holds represented in count  */
        static int sharedCount(int c)     return c >>> SHARED_SHIFT; //右移16位,获取高位表示的共享锁的数目
        /** Returns the number of exclusive holds represented in count  */
        static int exclusiveCount(int c)  return c & EXCLUSIVE_MASK; //与0xff进行&操作,获得低位的独占锁的数目

        /**
         * A counter for per-thread read hold counts.
         * Maintained as a ThreadLocal; cached in cachedHoldCounter
         */
		 //每一个线程拥有锁的数目
        static final class HoldCounter 
            int count = 0;
            // Use id, not reference, to avoid garbage retention
            final long tid = getThreadId(Thread.currentThread());
        

        /**
         * ThreadLocal subclass. Easiest to explicitly define for sake
         * of deserialization mechanics.
         */
		 //将holdCounter与Thread结合起来
        static final class ThreadLocalHoldCounter
            extends ThreadLocal<HoldCounter> 
            public HoldCounter initialValue() 
                return new HoldCounter();
            
        

        /**
         * The number of reentrant read locks held by current thread.
         * Initialized only in constructor and readObject.
         * Removed whenever a thread's read hold count drops to 0.
         */
        private transient ThreadLocalHoldCounter readHolds;

        /**
         * The hold count of the last thread to successfully acquire
         * readLock. This saves ThreadLocal lookup in the common case
         * where the next thread to release is the last one to
         * acquire. This is non-volatile since it is just used
         * as a heuristic, and would be great for threads to cache.
         *
         * <p>Can outlive the Thread for which it is caching the read
         * hold count, but avoids garbage retention by not retaining a
         * reference to the Thread.
         *
         * <p>Accessed via a benign data race; relies on the memory
         * model's final field and out-of-thin-air guarantees.
         */
        private transient HoldCounter cachedHoldCounter;

        /**
         * firstReader is the first thread to have acquired the read lock.
         * firstReaderHoldCount is firstReader's hold count.
         *
         * <p>More precisely, firstReader is the unique thread that last
         * changed the shared count from 0 to 1, and has not released the
         * read lock since then; null if there is no such thread.
         *
         * <p>Cannot cause garbage retention unless the thread terminated
         * without relinquishing its read locks, since tryReleaseShared
         * sets it to null.
         *
         * <p>Accessed via a benign data race; relies on the memory
         * model's out-of-thin-air guarantees for references.
         *
         * <p>This allows tracking of read holds for uncontended read
         * locks to be very cheap.
         */
        private transient Thread firstReader = null;
        private transient int firstReaderHoldCount;

		//构造方法
        Sync() 
            readHolds = new ThreadLocalHoldCounter();
            setState(getState()); // ensures visibility of readHolds
        

       //抽象方法,需要子类实现
        abstract boolean readerShouldBlock();

       //抽象方法,需要子类实现
        abstract boolean writerShouldBlock();
		
		//尝试释放独占锁。首先检查当前线程是否是拥有锁的线程,不是则抛出异常IllegalMonitorStateException,接着计算释放锁后state的低16位是否为0,为0表示当前线程已经完全释放了独占锁,返回true;不为0表示当前线程只是部分释放了独占锁,返回false
        protected final boolean tryRelease(int releases) 
		//当前线程是否独占锁,没有则抛出异常
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
			//计算释放releases数目的独占锁后state的值
            int nextc = getState() - releases;
			//计算释放锁后独占锁的数目
            boolean free = exclusiveCount(nextc) == 0;
			//free为true表示当前独占锁已经全部释放完成,设置当前拥有独占锁的线程为null
            if (free)
                setExclusiveOwnerThread(null);
			//更新state
            setState(nextc);
            return free;
        
		//尝试获取独占锁。
		//1、如果当前锁只有线程以共享模式占有,则获取失败
		//2、如果当前锁有线程独占模式占有(可能同时存在共享占有锁的线程),且该线程不是本线程,则获取失败
		//3、当前线程以独占的模式占有锁(可能同时存在共享占有锁的线程),则当前线程再次请求获取锁时,由于锁的可重入性,因此获取成功
		//4、当前锁没有被任何线程占有,查看writerShouldBlock,如果不用阻塞则CAS设置state的状态,将当前锁的拥有者设置为当前线程,锁获取成功。
		//其中1,2两种情况锁获取失败;3、4两种情况锁获取成功。
        protected final boolean tryAcquire(int acquires) 
            Thread current = Thread.currentThread();
			//c表示独占锁和共享锁的混合计数
            int c = getState();
			//w表示独占锁的数目
            int w = exclusiveCount(c);
            if (c != 0) 
                // c!=0&&w==0表示当前占锁的占有模式是共享的
				//c!=0&&w!=0&¤t != getExclusiveOwnerThread()表示当前独占锁的线程不是当前线程
				//以上两种方式当前线程独占获取锁失败
                if (w == 0 || current != getExclusiveOwnerThread())
                    return false;
				//表示当前占有锁的线程是当前线程,因此可重入
				//锁的占有数目超过了最大数目,抛出异常
                if (w + exclusiveCount(acquires) > MAX_COUNT)
                    throw new Error("Maximum lock count exceeded");
                // 更新state
                setState(c + acquires);
				//可重入锁获取成功
                return true;
            
			//c==0表示当前锁没有被任何线程占有
			//首先查看writerShouldBlock是否需要阻塞当前线程,是则在此处阻塞,否则CAS设置state,并且将锁的拥有者设置成当前线程。
            if (writerShouldBlock() ||
                !compareAndSetState(c, c + acquires))
                return false;
            setExclusiveOwnerThread(current);
            return true;
        
		//共享模式释放锁,state每次都-1,不像独占模式可以一次释放多个锁。
		//直接更新state的值,返回更新后state是否为0,为0表示当前锁处于空闲状态,返回true,否则返回false。
        protected final boolean tryReleaseShared(int unused) 
            Thread current = Thread.currentThread();
            if (firstReader == current) 
                // assert firstReaderHoldCount > 0;
                if (firstReaderHoldCount == 1)
                    firstReader = null;
                else
                    firstReaderHoldCount--;
             else 
                HoldCounter rh = cachedHoldCounter;
                if (rh == null || rh.tid != getThreadId(current))
                    rh = readHolds.get();
                int count = rh.count;
                if (count <= 1) 
                    readHolds.remove();
                    if (count <= 0)
                        throw unmatchedUnlockException();
                
                --rh.count;
            
            for (;;) 
                int c = getState();
				//释放锁后state的值
                int nextc = c - SHARED_UNIT;
				//更新state
                if (compareAndSetState(c, nextc))
                    //返回state的状态,若为0表示当前锁是空闲的,返回true
					//共享锁的释放对readers没有影响,但是对writers有影响。
					//在CLH队列中共享模式的node一旦获得了锁,就会无条件传播向后继的shared模式的node,直至遇到一个非shared模式的节点。
                    return nextc == 0;
            
        

        private IllegalMonitorStateException unmatchedUnlockException() 
            return new IllegalMonitorStateException(
                "attempt to unlock read lock, not locked by current thread");
        
		//共享模式获取锁。
		//1、没有线程以独占的模式占有锁,锁的持有者都是reader,则该线程以共享模式获取锁成功。
		//2、有线程以独占模式获取锁,该线程是本线程,则该线程以共享模式获取锁成功。
		//以上两种方式是能成功获取共享锁的两种情况。
        protected final int tryAcquireShared(int unused) 
           
            Thread current = Thread.currentThread();
            int c = getState();
			//有线程以独占的模式占有锁,并且该线程不是本线程,返回-1
            if (exclusiveCount(c) != 0 &&
                getExclusiveOwnerThread() != current)
                return -1;
			//表示该线程能成功获取锁。
			//r表示共享模式占有锁的线程个数
            int r = sharedCount(c);
			//readerShouldBlock不用阻塞,并且r小于最大锁数目,则CAS设置当前线程共享占有锁
            if (!readerShouldBlock() &&
                r < MAX_COUNT &&
                compareAndSetState(c, c + SHARED_UNIT)) 
				//
                if (r == 0) 
                    firstReader = current;
                    firstReaderHoldCount = 1;
                 else if (firstReader == current) 
                    firstReaderHoldCount++;
                 else 
                    HoldCounter rh = cachedHoldCounter;
                    if (rh == null || rh.tid != getThreadId(current))
                        cachedHoldCounter = rh = readHolds.get();
                    else if (rh.count == 0)
                        readHolds.set(rh);
                    rh.count++;
                
                return 1;
            
            return fullTryAcquireShared(current);
        

       //共享模式获取锁
        final int fullTryAcquireShared(Thread current) 
            HoldCounter rh = null;
            for (;;) 
                int c = getState();
                if (exclusiveCount(c) != 0) 
                    if (getExclusiveOwnerThread() != current)
                        return -1;
                    // else we hold the exclusive lock; blocking here
                    // would cause deadlock.
                 else if (readerShouldBlock()) 
                    // Make sure we're not acquiring read lock reentrantly
                    if (firstReader == current) 
                        // assert firstReaderHoldCount > 0;
                     else 
                        if (rh == null) 
                            rh = cachedHoldCounter;
                            if (rh == null || rh.tid != getThreadId(current)) 
                                rh = readHolds.get();
                                if (rh.count == 0)
                                    readHolds.remove();
                            
                        
                        if (rh.count == 0)
                            return -1;
                    
                
                if (sharedCount(c) == MAX_COUNT)
                    throw new Error("Maximum lock count exceeded");
                if (compareAndSetState(c, c + SHARED_UNIT)) 
                    if (sharedCount(c) == 0) 
                        firstReader = current;
                        firstReaderHoldCount = 1;
                     else if (firstReader == current) 
                        firstReaderHoldCount++;
                     else 
                        if (rh == null)
                            rh = cachedHoldCounter;
                        if (rh == null || rh.tid != getThreadId(current))
                            rh = readHolds.get();
                        else if (rh.count == 0)
                            readHolds.set(rh);
                        rh.count++;
                        cachedHoldCounter = rh; // cache for release
                    
                    return 1;
                
            
        

        //写线程获取锁,不考虑writerShouldBlock
        final boolean tryWriteLock() 
            Thread current = Thread.currentThread();
            int c = getState();
            if (c != 0) 
                int w = exclusiveCount(c);
                if (w == 0 || current != getExclusiveOwnerThread())
                    return false;
                if (w == MAX_COUNT)
                    throw new Error("Maximum lock count exceeded");
            
			//当前独占有锁的线程是自身,或者当前锁是空闲状态,这两种情况下写线程获取锁成功
            if (!compareAndSetState(c, c + 1))
                return false;
            setExclusiveOwnerThread(current);
            return true;
        

        //读线程获取锁,不考虑ReadShouldBlock
        final boolean tryReadLock() 
            Thread current = Thread.currentThread();
            for (;;) 
                int c = getState();
				//当前有线程以独占的方式占有锁,并且该线程不是本线程,则写线程获取锁失败。
                if (exclusiveCount(c) != 0 &&
                    getExclusiveOwnerThread() != current)
                    return false;
                int r = sharedCount(c);
                if (r == MAX_COUNT)
                    throw new Error("Maximum lock count exceeded");
                if (compareAndSetState(c, c + SHARED_UNIT)) 
                    if (r == 0) 
                        firstReader = current;
                        firstReaderHoldCount = 1;
                     else if (firstReader == current) 
                        firstReaderHoldCount++;
                     else 
                        HoldCounter rh = cachedHoldCounter;
                        if (rh == null || rh.tid != getThreadId(current))
                            cachedHoldCounter = rh = readHolds.get();
                        else if (rh.count == 0)
                            readHolds.set(rh);
                        rh.count++;
                    
                    return true;
                
            
        

        protected final boolean isHeldExclusively() 
            // While we must in general read state before owner,
            // we don't need to do so to check if current thread is owner
            return getExclusiveOwnerThread() == Thread.currentThread();
        

        // Methods relayed to outer class

        final ConditionObject newCondition() 
            return new ConditionObject();
        

        final Thread getOwner() 
            // Must read state before owner to ensure memory consistency
            return ((exclusiveCount(getState()) == 0) ?
                    null :
                    getExclusiveOwnerThread());
        

        final int getReadLockCount() 
            return sharedCount(getState());
        

        final boolean isWriteLocked() 
            return exclusiveCount(getState()) != 0;
        

        final int getWriteHoldCount() 
            return isHeldExclusively() ? exclusiveCount(getState()) : 0;
        

        final int getReadHoldCount() 
            if (getReadLockCount() == 0)
                return 0;

            Thread current = Thread.currentThread();
            if (firstReader == current)
                return firstReaderHoldCount;

            HoldCounter rh = cachedHoldCounter;
            if (rh != null && rh.tid == getThreadId(current))
                return rh.count;

            int count = readHolds.get().count;
            if (count == 0) readHolds.remove();
            return count;
        

        /**
         * Reconstitutes the instance from a stream (that is, deserializes it).
         */
        private void readObject(java.io.ObjectInputStream s)
            throws java.io.IOException, ClassNotFoundException 
            s.defaultReadObject();
            readHolds = new ThreadLocalHoldCounter();
            setState(0); // reset to unlocked state
        

        final int getCount()  return getState(); 
    



(2)NonfairSync:非公平锁

static final class NonfairSync extends Sync 
        private static final long serialVersionUID = -8159625535654395037L;
		//不阻塞写线程
        final boolean writerShouldBlock() 
            return false; // writers can always barge
        
        final boolean readerShouldBlock() 
            //如果CLH队列中首节点的下一个节点是独占模式的节点,则该reader线程应该阻塞
            return apparentlyFirstQueuedIsExclusive();
        
    

(3)FairSync:公平锁

static final class FairSync extends Sync 
        private static final long serialVersionUID = -2274990926593161451L;
		//如果CLH队列中首节点的下一个节点不是该线程的节点,则阻塞。按照FIFO顺序获取锁
        final boolean writerShouldBlock() 
            return hasQueuedPredecessors();
        
		//如果CLH队列中首节点的下一个节点不是该线程的节点,则阻塞。按照FIFO顺序获取锁
        final boolean readerShouldBlock() 
            return hasQueuedPredecessors();
        
    

非公平锁和公平锁的区别:
(1)公平锁无论writerShouldBlock或者readerShouldBlock都要遵循FIFO的规则,只有当CLH队列中该node位于head的下一个节点,才不需要阻塞该线程。
(2)非公平锁对于writeShouldBlock,返回false,都不阻塞。
(3)非公平锁对于readerShouldBlock,查看CLH队列中head的next节点是否是一个独占的节点,如果是则返回true,需要阻塞当前线程,否则不需要阻塞该线程。
(好像是为了防止独占的节点很长时间都不能获取到锁)

(4)ReadLock:读锁

public static class ReadLock implements Lock, java.io.Serializable 
        private static final long serialVersionUID = -5992448646407690164L;
        private final Sync sync;//两个子类,公平和非公平的实现方式

       
        protected ReadLock(ReentrantReadWriteLock lock) 
            sync = lock.sync;
        

        //调用AQS的acquireShared,tryAcquireShared在sync中实现,获取失败则调用doAcquireShared在CLH队列中获取
        public void lock() 
            sync.acquireShared(1);
        

        //调用AQS中的acquireSharedInterruptibly方法,tryAcquireShared在sync中实现,获取失败则调用doAcquireSharedInterruptibly在CLH队列中获取
        public void lockInterruptibly() throws InterruptedException 
            sync.acquireSharedInterruptibly(1);
        

        //调用sync中的tryReadLock,进行一次尝试获取,不进入CLH队列中。
        public boolean tryLock() 
            return sync.tryReadLock();
        

        //调用AQS中的tryAcquireSharedNanos,tryAcquire失败则调用doAcquireSharedNanos在队列中获取
        public boolean tryLock(long timeout, TimeUnit unit)
                throws InterruptedException 
            return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
        

       //调用AQS中的releaseShared方法,tryReleaseShared(Sync中)成功则调用doReleaseShared处理唤醒后继等操作
        public void unlock() 
            sync.releaseShared(1);
        

        public Condition newCondition() 
            throw new UnsupportedOperationException();
        

        public String toString() 
            int r = sync.getReadLockCount();
            return super.toString() +
                "[Read locks = " + r + "]";
        
    


(5)WriteLock:写锁

public static class WriteLock implements Lock, java.io.Serializable 
        private static final long serialVersionUID = -4992448646407690164L;
        private final Sync sync;//保存一个sync类型的对象,两种实现,公平和非公平,默认是非公平实现

        
        protected WriteLock(ReentrantReadWriteLock lock) 
            sync = lock.sync;
        

        //直接调用AQS的acquire方式获取锁,tryAcquire在Sync中实现,考虑writeShouldBlock
        public void lock() 
            sync.acquire(1);
        

       //直接调用AQS的acquireInterruptibly方式获取锁,tryAcquire在Sync中实现,考虑writeShouldBlock
        public void lockInterruptibly() throws InterruptedException 
            sync.acquireInterruptibly(1);
        

        //调用Sync中的tryWriteLock直接获取锁,不用入CLH队列,忽略writeShouldBlock
        public boolean tryLock( ) 
            return sync.tryWriteLock();
        

        //调用AQS中的tryAcquireNanos实现,需要入队
        public boolean tryLock(long timeout, TimeUnit unit)
                throws InterruptedException 
            return sync.tryAcquireNanos(1, unit.toNanos(timeout));
        

        //调用AQS中的release方法释放锁,tryRelease在Sync中实现,tryRelease成功,需要unparkSuccessor唤醒后继
        public void unlock() 
            sync.release(1);
        

        
        public Condition newCondition() 
            return sync.newCondition();
        

        
        public String toString() 
            Thread o = sync.getOwner();
            return super.toString() + ((o == null) ?
                                       "[Unlocked]" :
                                       "[Locked by thread " + o.getName() + "]");
        

        
        public boolean isHeldByCurrentThread() 
            return sync.isHeldExclusively();
        

        
        public int getHoldCount() 
            return sync.getWriteHoldCount();
        
    

四、构造方法

//默认创建一个非公平的sync锁
	public ReentrantReadWriteLock() 
        this(false);
    
	
	public ReentrantReadWriteLock(boolean fair) 
	//sync提供两种方式实现,公平和非公平,默认非公平
        sync = fair ? new FairSync() : new NonfairSync();
        readerLock = new ReadLock(this);//构造方法要求传递一个sync对象
        writerLock = new WriteLock(this);//构造方法要求传递一个sync对象
    
	ReadWriteLock接口中两个抽象方法的具体实现:
	public ReentrantReadWriteLock.WriteLock writeLock()  return writerLock; 
    public ReentrantReadWriteLock.ReadLock  readLock()   return readerLock; 

以上是关于jdk1.8 J.U.C并发源码阅读------ReentrantReadWriteLock源码解析的主要内容,如果未能解决你的问题,请参考以下文章

jdk1.8 J.U.C并发源码阅读------ReentrantLock源码解析

jdk1.8 J.U.C并发源码阅读------ReentrantLock源码解析

jdk1.8 J.U.C并发源码阅读------CountDownLatch源码解析

jdk1.8 J.U.C并发源码阅读------CountDownLatch源码解析

jdk1.8 J.U.C并发源码阅读------CyclicBarrier源码解析

jdk1.8 J.U.C并发源码阅读------CyclicBarrier源码解析