Java 1.7 ReentrantReadWriteLock源码解析

Posted Mr-yuenkin

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Java 1.7 ReentrantReadWriteLock源码解析相关的知识,希望对你有一定的参考价值。

由于本人水平与表达能力有限,有错误的地方欢迎交流与指正。

1 简介

可重入读写锁时基于AQS实现的,典型的使用方法如JDK1.7中的示例:

  class RWDictionary 
      private final Map<String, Data> m = new TreeMap<String, Data>();
      private final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
      private final Lock r = rwl.readLock();
      private final Lock w = rwl.writeLock();
  
      public Data get(String key) 
          r.lock();
          try  return m.get(key); 
          finally  r.unlock(); 
      
      public String[] allKeys() 
          r.lock();
          try  return m.keySet().toArray(); 
          finally  r.unlock(); 
      
      public Data put(String key, Data value) 
          w.lock();
          try  return m.put(key, value); 
          finally  w.unlock(); 
      
      public void clear() 
          w.lock();
          try  m.clear(); 
          finally  w.unlock(); 
      
   

读锁使用的是AQS的共享模式,不会阻塞读锁,但是会阻塞写锁;写锁使用的是AQS的独占模式,读写锁都会被阻塞。读写锁是共用了一个Sync(AQS类),也就是说AQS中独占模式和共享模式是并存的。

Sync有两种实现方式,公平(FairSync)和非公平(NonfairSync)。公平的含义是指如果AQS的队列中有等待线程,则当前线程直接就放弃尝试获取锁,自觉的排队了;而非公平方式不一样,当前线程还是要尝试一下(仅仅一下,和AQS队列中第一个结点竞争获取锁),如果成功了,相当于插队成功了,但是如果失败了(就是tryAcquire或tryAcquireShared失败),还是要乖乖的排到最后去。

下面的是整个类的结构图:


Sync是个AQS类,它有两个子类FairSync和NonfairSync。ReadLock和WriteLock里有个成员变量sync(指向同个变量,FaireSync或NonfaireSync类型)。(UML图不是很熟,就这样文字描述了)

2 ReentrantReadWriteLock主类

public class ReentrantReadWriteLock
        implements ReadWriteLock, java.io.Serializable 
    /** Inner class providing readlock */
    private final ReentrantReadWriteLock.ReadLock readerLock;
    /** Inner class providing writelock */
    private final ReentrantReadWriteLock.WriteLock writerLock;
    /** Performs all synchronization mechanics */
    final Sync sync;

    /**
     * false,默认锁是非公平的
     */
    public ReentrantReadWriteLock() 
        this(false);
    

    /**
     * Creates a new @code ReentrantReadWriteLock with
     * the given fairness policy.
     *
     * @param fair @code true if this lock should use a fair ordering policy
     */
    public ReentrantReadWriteLock(boolean fair) 
        sync = fair ? new FairSync() : new NonfairSync();
        readerLock = new ReadLock(this);
        writerLock = new WriteLock(this);
    
    public ReentrantReadWriteLock.WriteLock writeLock()  return writerLock; 
    public ReentrantReadWriteLock.ReadLock  readLock()   return readerLock; 

主类有三个重要的成员变量:读锁、写锁和同步器。从构造函数可以看出读写锁的同步器默认是非公平的(NonfaireSync)。

3 ReadLock类

实现了Lock接口,并使用sync成员变量实现加锁、解锁功能

3.1 lock

public void lock() 
            sync.acquireShared(1);
        

 以AQS共享模式获取锁,参数值为1。如果系统中没有线程占有写锁,那么这个函数很快就会返回;否则,当前线程会一直阻塞,直至获取到锁。

3.2 lockInterruptibly

  public void lockInterruptibly() throws InterruptedException 
            sync.acquireSharedInterruptibly(1);
        

  以AQS共享模式获取锁,参数值为1。如果系统中没有线程占有写锁,那么这个函数很快就会返回;否则,当前线程会一直阻塞,直至获取到锁或被其他线程中断。

3.3 tryLock

  public  boolean tryLock() 
            return sync.tryReadLock();
        

 直接调用了sync的tryReadLock方法,这个方法和sync. tryAcquireShared基本一直(少了一个readerShouldBlock判断)。

3.4 带超时的tryLock

public boolean tryLock(long timeout, TimeUnit unit)
                throws InterruptedException 
            return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
        

tryLock函数尝试获取锁,直到获取到或超时或被中断。如果当前没有线程占用写锁,会立即返回成功。和3.3的tryLock不一样,这个公平模式下会排队的。如果你不想排队又想支持超时,可以这么写代码:if(lock.tryLock() || lock.tryLock(timeout, unit) ) ...

3.5 newCondition

public Condition newCondition() 
            throw new UnsupportedOperationException();
        

 读锁不支持条件队列的。

4 WriteLock类

实现了Lock接口,并使用sync成员变量实现加锁、解锁功能

4.1 lock

public void lock() 
            sync.acquire(1);
        

  以AQS独占方式获取写锁,如果当前没有线程占有读锁和写锁,该函数会立即返回;如果当前线程已经获取写锁了,holdCount的值会加1;否则,会阻塞直至获取到锁。注意:如果当前线程已经获取到读锁了,紧接着就获取写锁就会死锁了。

4.2 lockInterruptibly

public void lockInterruptibly() throws InterruptedException 
            sync.acquireInterruptibly(1);
        

   以AQS独占方式 获取写锁直到被中断。其他跟lock一致。

4.3 tryLock

public boolean tryLock( ) 
            return sync.tryWriteLock();
        

直接调用了sync的tryWriteLock方法,这个方法和sync. tryAcquire基本一直(少了一个writerShouldBlock判断)。不管在公平模式还是非公平模式下,都不用排队。

4.4 带超时的tryLock

public boolean tryLock(long timeout, TimeUnit unit)
                throws InterruptedException 
            return sync.tryAcquireNanos(1, unit.toNanos(timeout));
        

   tryLock函数尝试获取锁,直到获取到或超时或被中断。如果没有其他线程占有读锁和写锁,立马返回true。和4.3不一样,公平模式下会排队的。如果你又想插队又想支持超时,可以这么写:if(lock.tryLock() || lock.tryLock(timeout, unit) ) ...

4.5 newCondition

public Condition newCondition() 
            return sync.newCondition();
        

4.6 isHeldByCurrentThread

public boolean isHeldByCurrentThread() 
            return sync.isHeldExclusively();
        

  当前线程是否占有写锁。

4.7 getHoldCount

public int getHoldCount() 
            return sync.getWriteHoldCount();
        

 当前线程占有几个写锁。

5 Sync类

    读锁和写锁公用的同步器,有两个版本:公平的和非公平的。

5.1 内部类结构

abstract static class Sync extends AbstractQueuedSynchronizer 
        private static final long serialVersionUID = 6317671515068378041L;

        /*
         * AQS的state字段被拆成两部分了:高16位表示获取读锁的次数,低16位表示
         * 获取写锁的次数
         */
        static final int SHARED_SHIFT   = 16;
// 每次线程获取读锁成功就会执行state+=SHARED_UNIT操作,不是+1因为
// 高16位表示获取读锁的次数。
        static final int SHARED_UNIT    = (1 << SHARED_SHIFT);
// 允许读或写获取锁的最大次数,都是65535
        static final int MAX_COUNT      = (1 << SHARED_SHIFT) - 1;
        static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;

        /** 获取当前读锁的总数 */
        static int sharedCount(int c)     return c >>> SHARED_SHIFT; 
        /** 获取当前写锁的总数 */
        static int exclusiveCount(int c)  return c & EXCLUSIVE_MASK; 

        /**
         * A counter for per-thread read hold counts.
         * Maintained as a ThreadLocal; cached in cachedHoldCounter
         */
        static final class HoldCounter 
            int count = 0;
            // Use id, not reference, to avoid garbage retention
            final long tid = Thread.currentThread().getId();
        

        /**
         * 每个线程都绑定一个HoldCounter对象
         */
        static final class ThreadLocalHoldCounter
            extends ThreadLocal<HoldCounter> 
            public HoldCounter initialValue() 
                return new HoldCounter();
            
        

        /**
         * The number of reentrant read locks held by current thread.
         * Initialized only in constructor and readObject.
         * Removed whenever a thread's read hold count drops to 0.
         */
// 第一个获取读锁线程的HoldCounter没有在里面管理,而是通过firstReader
// 和firstReaderHoldCount两个变量维护的。
        private transient ThreadLocalHoldCounter readHolds;  
        private transient HoldCounter cachedHoldCounter;
        private transient Thread firstReader = null;
        private transient int firstReaderHoldCount;

        Sync() 
            readHolds = new ThreadLocalHoldCounter();
            setState(getState()); // ensures visibility of readHolds
        

        /*
         * Acquires and releases use the same code for fair and
         * nonfair locks, but differ in whether/how they allow barging
         * when queues are non-empty.
         */

        /**
         * Returns true if the current thread, when trying to acquire
         * the read lock, and otherwise eligible to do so, should block
         * because of policy for overtaking other waiting threads.
         */
        abstract boolean readerShouldBlock();

        /**
         * Returns true if the current thread, when trying to acquire
         * the write lock, and otherwise eligible to do so, should block
         * because of policy for overtaking other waiting threads.
         */
        abstract boolean writerShouldBlock();

5.2 tryAcquire

protected final boolean tryAcquire(int acquires) 
            Thread current = Thread.currentThread();
            int c = getState();
            int w = exclusiveCount(c);
            if (c != 0) 
              // c!=0&&w==0说明有线程(包括当前线程)持有读锁,直接返回false
// c!=0&&w!=0&¤t!=当前持有锁的线程,直接返回false
// 注意:如果一个线程先获取读锁,紧接着获取写锁时会死锁的
                if (w == 0 || current != getExclusiveOwnerThread())
                    return false;
// 写锁数量超过65535,直接抛异常了
                if (w + exclusiveCount(acquires) > MAX_COUNT)
                    throw new Error("Maximum lock count exceeded");
                // 走到这边说明这个锁是重入了的,不需要CAS了,直接设置state
                setState(c + acquires);
                return true;
            
// c==0或者重入的,如果写需要阻塞或者CAS设置state失败,直接返回false
            if (writerShouldBlock() ||
                !compareAndSetState(c, c + acquires))
                return false;
// 设置独占线程标识
            setExclusiveOwnerThread(current);
            return true;
        

 tryAcquire 函数是尝试获取写锁:1.如果有读线程或者写线程且不是当前线程,直接失败;2.如果写锁的count超过了65535,直接失败;3.否则,这个线程能够拥有锁(eligible),队列策略允许(writerShouldBlock返回false)或者是重入的锁。

5.3 tryRelease

protected final boolean tryRelease(int releases) 
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
// state的高16位和低16位肯定都是大于releases的,可以直接相减
            int nextc = getState() - releases;
            boolean free = exclusiveCount(nextc) == 0;
// 如果当前写锁释放后没有写锁了,置空独占标识
            if (free)
                setExclusiveOwnerThread(null);
// 设置新的state
            setState(nextc);
            return free;
        

 tryRelease函数一般情况下就是释放写锁,但是也有不一般的情况,就是在Condition中调用tryRelease,因此,releases参数可能会包含读和写两个锁的信息。

5.4 tryAcquireShared

protected final int tryAcquireShared(int unused) 
// 获取当前线程和state值         
            Thread current = Thread.currentThread();
            int c = getState();
// 如果有线程持有写锁且该线程不是当前线程,直接返回-1
            if (exclusiveCount(c) != 0 &&
                getExclusiveOwnerThread() != current)
                return -1;
            int r = sharedCount(c);
            if (!readerShouldBlock() &&
                r < MAX_COUNT &&
                compareAndSetState(c, c + SHARED_UNIT)) 
                if (r == 0) 
                    firstReader = current;
                    firstReaderHoldCount = 1;
                 else if (firstReader == current) 
                    firstReaderHoldCount++;
                 else 
                    HoldCounter rh = cachedHoldCounter;
                    if (rh == null || rh.tid != current.getId())
                        cachedHoldCounter = rh = readHolds.get();
// 为啥等于0的时候要set下?因为release时,count=0会执行readHolds.remove
// 方法,但是不会清空cachedHoldCounter。此时,同个线程再成功获取读锁时count
// 的值是0且readHolds已经为空了,因此要重新set下。
                    else if (rh.count == 0)
                        readHolds.set(rh);
                    rh.count++;
                
                return 1;
            
            return fullTryAcquireShared(current);
        

  tryAcquireShared函数的参数(unused)没有用,主要是尝试获取读锁:

1.如果有线程持有写锁,直接返回失败;

2.否则,继续判断,如果队列策略允许(readerShouldBlock返回false)获取锁且CAS设置state成功,则设置读锁count的值。这一步并没有检查读锁重入的情况,被延迟到fullTryAcquireShared里了,因为大多数情况下不是重入的;

3.如果步骤2失败了,或许是队列策略返回false或许是CAS设置失败了等,则执行fullTryAcquireShared。

final int fullTryAcquireShared(Thread current)           
            HoldCounter rh = null;
// 是一个循环   
            for (;;) 
                int c = getState();
                if (exclusiveCount(c) != 0) 
// 有线程持有写锁且不是当前线程,直接失败
                    if (getExclusiveOwnerThread() != current)
                        return -1;
                    // else we hold the exclusive lock; blocking here
                    // would cause deadlock.
// 如果队列策略不允许,需要检查是否是读锁重入的情况。队列策略是否允许,分两种情况:
// 1.公平模式:如果当前AQS队列前面有等待的结点,返回false;2.非公平模式:如果
// AQS前面有线程在等待写锁,返回false(这样做的原因是为了防止写饥饿)。
                 else if (readerShouldBlock()) 
                    // 如果当前线程是第一个获取读锁的线程,则有资格获取读锁
                    if (firstReader == current) 
                        // assert firstReaderHoldCount > 0;
                     else 
                        if (rh == null) 
// 优先赋值成上一次获取读锁成功的cache,如果发现线程tid和当前线程不相等,在从
// ThreadLocal里获取
                            rh = cachedHoldCounter;
                            if (rh == null || rh.tid != current.getId()) 
                                rh = readHolds.get();
                                if (rh.count == 0)
// 帮助GC
                                    readHolds.remove();
                            
                        
// 说明不是读锁重入的情况,直接返回失败了
                        if (rh.count == 0)
                            return -1;
                    
                
                if (sharedCount(c) == MAX_COUNT)
                    throw new Error("Maximum lock count exceeded");
                if (compareAndSetState(c, c + SHARED_UNIT)) 
// 设置当前线程为第一个获取读锁的线程
                    if (sharedCount(c) == 0) 
                        firstReader = current;
                        firstReaderHoldCount = 1;
// 说明当前线程为第一个获取读锁的线程
                     else if (firstReader == current) 
                        firstReaderHoldCount++;
                     else 
// 其他获取读锁成功的情况
                        if (rh == null)
                            rh = cachedHoldCounter;
                        if (rh == null || rh.tid != current.getId())
                            rh = readHolds.get();
                        else if (rh.count == 0)
                            readHolds.set(rh);
                        rh.count++;
                        cachedHoldCounter = rh; // cache for release
                    
                    return 1;
                
            
        

 fullTryAcquireShared函数处理上面tryAcquireShared中没有处理的读锁重入的问题或者CAS设置失败。其实,这函数代码和tryAcquireShared有些重复,但是把处理读锁重入的问题从tryAcquireShared中分离出来了。

5.5 tryReleaseShared

protected final boolean tryReleaseShared(int unused) 
            Thread current = Thread.currentThread();
// 如果当前线程是第一个获取读锁的,需要特殊处理(第一个reader不是用
// HoldCounter类型变量保存的)
            if (firstReader == current) 
                // assert firstReaderHoldCount > 0;
// 如果firstReaderHoldCount<=0也没报错啊(不会出现这种情况的)。
// 因为firstReaderHoldCount==1时,firstReader就是null了,条件
// firstReader==current就不会成立了
                if (firstReaderHoldCount == 1)
                    firstReader = null;
                else
                    firstReaderHoldCount--;
             else 
// 如果当前线程是最后一个获取锁的,就是cachedHoldCounter了
// 如果也不是最后一个获取锁的,就要从threadlocal里取了
                HoldCounter rh = cachedHoldCounter;
                if (rh == null || rh.tid != current.getId())
                    rh = readHolds.get();
                int count = rh.count;
// count==1的话直接执行readHolds.remove;count<=0就报错
                if (count <= 1) 
                    readHolds.remove();
                    if (count <= 0)
                        throw unmatchedUnlockException();
                
                --rh.count;
            
            for (;;) 
                int c = getState();
                int nextc = c - SHARED_UNIT;
                if (compareAndSetState(c, nextc))
                // Releasing the read lock has no effect on readers,
                // but it may allow waiting writers to proceed if
                // both read and write locks are now free.
// mark:当且仅当所有的读和写线程都释放锁了,才会返回true
                    return nextc == 0;
            
        

tryReleaseShared函数的作用就是释放读锁,需要注意两点:1、如果一个线程没有获取过读锁,执行release方法会报异常;2、当且仅当所有的读和写线程都释放了,这个函数才会返回true。

5.6 tryWriteLock

final boolean tryWriteLock() 
            Thread current = Thread.currentThread();
            int c = getState();
            if (c != 0) 
                int w = exclusiveCount(c);
// 如果有线程持有读锁(自己也不行)直接返回false
// 如果不是重入获取写锁的直接返回false
                if (w == 0 || current != getExclusiveOwnerThread())
                    return false;
                if (w == MAX_COUNT)
                    throw new Error("Maximum lock count exceeded");
            
// c==0或者写锁重入
            if (!compareAndSetState(c, c + 1))
                return false;
            setExclusiveOwnerThread(current);
            return true;
        

  tryWriteLock函数会被写锁调用,和tryAcquire基本一致除了少调用一个writerShouldBlock函数,公平和非公平两种模式都允许插队(barging in)相当于是非公平模式了。

5.7 tryReadLock

final boolean tryReadLock() 
            Thread current = Thread.currentThread();
            for (;;) 
                int c = getState();
// 如果有线程持有写锁且该线程不是当前线程,直接返回-1
                if (exclusiveCount(c) != 0 &&
                    getExclusiveOwnerThread() != current)
                    return false;
                int r = sharedCount(c);
                if (r == MAX_COUNT)
                    throw new Error("Maximum lock count exceeded");
                if (compareAndSetState(c, c + SHARED_UNIT)) 
                    if (r == 0) 
                        firstReader = current;
                        firstReaderHoldCount = 1;
                     else if (firstReader == current) 
                        firstReaderHoldCount++;
                     else 
                        HoldCounter rh = cachedHoldCounter;
                        if (rh == null || rh.tid != current.getId())
                            cachedHoldCounter = rh = readHolds.get();
                        else if (rh.count == 0)
                            readHolds.set(rh);
                        rh.count++;
                    
                    return true;
                
            
        

tryReadLock函数被读锁调用(tryLock),和tryAcquireShared函数基本一致,除了少调用一个readerShouldBlock方法,公平和非公平两种模式都允许插队(barging in)相当于是非公平模式了。

6 NonfairSync类

static final class NonfairSync extends Sync 
        private static final long serialVersionUID = -8159625535654395037L;
// 写线程无条件插队
        final boolean writerShouldBlock() 
            return false; // writers can always barge
        
        final boolean readerShouldBlock() 
            // 为了方式写线程饥饿的情况,如果AQS等待队里的第一个线程是独占的,
            // 当前读线程就阻塞。
            return apparentlyFirstQueuedIsExclusive();
        
    

非公平锁的实现类。

7 FairSync类

static final class FairSync extends Sync 
        private static final long serialVersionUID = -2274990926593161451L;
// 如果AQS队列里有等待的线程,当前线程就阻塞
        final boolean writerShouldBlock() 
            return hasQueuedPredecessors();
        
// 如果AQS队列里有等待的线程,当前线程就阻塞
        final boolean readerShouldBlock() 
            return hasQueuedPredecessors();
        
    

 公平锁的实现类writerShouldBlock和readerShouldBlock实现方式完全一样,只要队列里有其他线程在等待,当前线程就阻塞,FIFO模式。





以上是关于Java 1.7 ReentrantReadWriteLock源码解析的主要内容,如果未能解决你的问题,请参考以下文章

从 1.7 更新到 Java 1.8 以使用 Heroku 会导致任何问题

Java 1.7 ThreadPoolExecutor源码解析

升级到Java8后,javac仍然显示1.7

为啥 Maven 使用 JDK 1.6 但我的 java -version 是 1.7

IntelliJ IDEA 13 使用 Java 1.5,尽管设置为 1.7

Java 1.7 NQuery