ReentrantReadWriteLock 源码分析

Posted zhuxudong

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了ReentrantReadWriteLock 源码分析相关的知识,希望对你有一定的参考价值。

ReentrantReadWriteLock

1)获取顺序:
非公平模式(默认):连续竞争的非公平锁可能无限期地推迟一个或多个 reader 或 writer 线程,但吞吐量通常要高于公平锁。
公平模式:当某个线程释放当前保持的锁时,可以为等待时间最长的单个 writer 线程分配写入锁,如果有一组等待时间大于所有正在等待的 writer 线程的 reader 线程,则将为该组分配读取锁。
2)重入:此锁允许 reader 和 writer 按照 ReentrantLock 的样式重新获取读取锁或写入锁。
3)锁降级:重入还允许从写入锁降级为读取锁,其实现方式是:先获取写入锁,然后获取读取锁,最后释放写入锁。但是,从读取锁升级到写入锁是不可能的。
4)锁获取的中断:读取锁和写入锁都支持锁获取期间的中断。
5)同步状态值的高 16 位保存读取锁被持有的次数,低 16 位保存写入锁被持有的次数。

创建实例

    /** 内部类实现的读锁 */
    private final ReentrantReadWriteLock.ReadLock readerLock;
    /** 内部类实现的写锁 */
    private final ReentrantReadWriteLock.WriteLock writerLock;
    /** 实现锁操作的同步器 */
    final Sync sync;

    /**
     * 创建一个非公平的读写锁实例
     */
    public ReentrantReadWriteLock() {
        this(false);
    }

    /**
     * 1)fair=true,创建一个公平的读写锁实例。
     * 2)fair=false,创建一个非公平的读写锁实例。
     */
    public ReentrantReadWriteLock(boolean fair) {
        sync = fair ? new FairSync() : new NonfairSync();
        // 基于当期实例创建读锁和写锁
        readerLock = new ReadLock(this);
        writerLock = new WriteLock(this);
    }

读锁获取:ReadLock#lock

        /**
         * 1)如果写锁没有被其他线程持有,则成功获取读锁并返回
         * 2)写锁被其他线程持有,则进入阻塞状态。
         */
        @Override
        public void lock() {
            sync.acquireShared(1);
        }

AbstractQueuedSynchronizer#acquireShared
    /**
     * 在共享模式下获取锁,忽略线程中断。
     */
    public final void acquireShared(int arg) {
        // 尝试获取共享锁
        if (tryAcquireShared(arg) < 0) {
            // 再次获取共享锁
            doAcquireShared(arg);
        }
    }

ReentrantReadWriteLock#Sync
    abstract static class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 6317671515068378041L;

        /**
         * 高 16 位记录写锁持有次数,低 16 位记录读锁持有次数
         */
        static final int SHARED_SHIFT   = 16;
        static final int SHARED_UNIT    = 1 << Sync.SHARED_SHIFT;
        static final int MAX_COUNT      = (1 << Sync.SHARED_SHIFT) - 1;
        static final int EXCLUSIVE_MASK = (1 << Sync.SHARED_SHIFT) - 1;

        /** 读锁被持有的计数值 */
        static int sharedCount(int c)    { return c >>> Sync.SHARED_SHIFT; }
        /** 写锁被持有的计数值 */
        static int exclusiveCount(int c) { return c & Sync.EXCLUSIVE_MASK; }

        /**
         * 每个线程的读锁保持计数器
         */
        static final class HoldCounter {
            int count;          // initially 0
            // Use id, not reference, to avoid garbage retention
            final long tid = LockSupport.getThreadId(Thread.currentThread());
        }

        /**
         * 持有读锁计数器的线程局部对象
         */
        static final class ThreadLocalHoldCounter
        extends ThreadLocal<HoldCounter> {
            @Override
            public HoldCounter initialValue() {
                return new HoldCounter();
            }
        }

        /**
         * 当前线程持有的读锁计数器对象,在创建时初始化,当读锁释放时移除
         */
        private transient ThreadLocalHoldCounter readHolds;

        /**
         * 最近一个成功获取读锁的线程的读锁持有计数器
         */
        private transient HoldCounter cachedHoldCounter;

        /**
         * 第一个获取读锁的线程
         */
        private transient Thread firstReader;
        /**
         * 第一个获取读锁的线程持有读锁的计数值
         */
        private transient int firstReaderHoldCount;

        Sync() {
            readHolds = new ThreadLocalHoldCounter();
            setState(getState()); // ensures visibility of readHolds
        }


    /**
     * 非公平同步器
     */
    static final class NonfairSync extends Sync {
        private static final long serialVersionUID = -8159625535654395037L;
        @Override
        boolean writerShouldBlock() {
            return false; // writers can always barge
        }
        @Override
        boolean readerShouldBlock() {
            // 避免获取写锁的线程饥饿
            return apparentlyFirstQueuedIsExclusive();
        }
    }

AbstractQueuedSynchronizer#apparentlyFirstQueuedIsExclusive
    /**
     * 同步队列中第一个等待获取锁的线程,需要获取独占的写锁,则返回 true
     */
    final boolean apparentlyFirstQueuedIsExclusive() {
        Node h, s;
        // 头结点、第二个节点都不为 null,节点处于独占模式,并且节点上有驻留线程,表示有线程在等待获取写锁。
        return (h = head) != null &&
                (s = h.next)  != null &&
                !s.isShared()         &&
                s.thread != null;
    }

ReentrantReadWriteLock#Sync#tryAcquireShared
        @Override
        @ReservedStackAccess
        protected final int tryAcquireShared(int unused) {
            // 读取当前线程
            final Thread current = Thread.currentThread();
            // 读取同步状态
            final int c = getState();
            /**
             * 1)如果写锁已经被线程持有,并且不是当前线程,则获取失败,返回 -1。
             */
            if (Sync.exclusiveCount(c) != 0 &&
                    getExclusiveOwnerThread() != current) {
                return -1;
            }
            /**
             * 2)写锁未被任何线程持有,或写锁被当前线程持有。
             */
            // 读取读锁计数值
            final int r = Sync.sharedCount(c);
            /**
             * 获取读锁的线程是否应该被阻塞【同步队列第一个阻塞线程在等待获取写锁】
             * && 读锁的占用计数值 < 65535
             * && 比较设置读锁的新计数值
             */
            if (!readerShouldBlock() &&
                    r < Sync.MAX_COUNT &&
                    compareAndSetState(c, c + Sync.SHARED_UNIT)) {
                // 1)如果是第一个获取读锁的线程
                if (r == 0) {
                    firstReader = current;
                    firstReaderHoldCount = 1;
                    // 2)当前线程是第一个获取读锁的线程,并在重复获取读锁
                } else if (firstReader == current) {
                    // 累积读锁持有计数值
                    firstReaderHoldCount++;
                    // 3)当前线程不是第一个获取读锁的线程
                } else {
                    // 读取最近一个成功获取读锁的线程的读锁持有计数器
                    HoldCounter rh = cachedHoldCounter;
                    // 如果最近获取读锁的线程不是当前线程
                    if (rh == null ||
                            rh.tid != LockSupport.getThreadId(current)) {
                        // 获取当前线程的持有计数器
                        cachedHoldCounter = rh = readHolds.get();
                    } else if (rh.count == 0) {
                        readHolds.set(rh);
                    }
                    // 递增计数值
                    rh.count++;
                }
                return 1;
            }
            // 写锁已经被当前线程持有,正在获取读锁
            return fullTryAcquireShared(current);
        }

        /**
         * Full version of acquire for reads, that handles CAS misses
         * and reentrant reads not dealt with in tryAcquireShared.
         */
        final int fullTryAcquireShared(Thread current) {
            HoldCounter rh = null;
            for (;;) {
                // 读取同步状态
                final int c = getState();
                // 1)有线程持有写锁
                if (Sync.exclusiveCount(c) != 0) {
                    // 如果不是当前线程持有,则返回 -1
                    if (getExclusiveOwnerThread() != current)
                    {
                        return -1;
                        // else we hold the exclusive lock; blocking here
                        // would cause deadlock.
                    }
                // 2)写锁没有被任何线程持有
                } else if (readerShouldBlock()) {
                    // Make sure we‘re not acquiring read lock reentrantly
                    if (firstReader == current) {
                        // assert firstReaderHoldCount > 0;
                    } else {
                        if (rh == null) {
                            rh = cachedHoldCounter;
                            if (rh == null ||
                                    rh.tid != LockSupport.getThreadId(current)) {
                                rh = readHolds.get();
                                // 读锁持有计数值为 0,则移除计数器
                                if (rh.count == 0) {
                                    readHolds.remove();
                                }
                            }
                        }
                        if (rh.count == 0) {
                            return -1;
                        }
                    }
                }
                // 读锁的获取计数值已经为最大值
                if (Sync.sharedCount(c) == Sync.MAX_COUNT) {
                    throw new Error("Maximum lock count exceeded");
                }
                // 比较更新读锁的计数值
                if (compareAndSetState(c, c + Sync.SHARED_UNIT)) {
                    // 1)当前线程是第一个获取读锁的线程
                    if (Sync.sharedCount(c) == 0) {
                        firstReader = current;
                        firstReaderHoldCount = 1;
                    // 2)当前线程是第一个获取读锁的线程,重复获取   
                    } else if (firstReader == current) {
                        firstReaderHoldCount++;
                    } else {
                        if (rh == null) {
                            rh = cachedHoldCounter;
                        }
                        if (rh == null ||
                                rh.tid != LockSupport.getThreadId(current)) {
                            rh = readHolds.get();
                        } else if (rh.count == 0) {
                            readHolds.set(rh);
                        }
                        rh.count++;
                        cachedHoldCounter = rh; // cache for release
                    }
                    return 1;
                }
            }
        }

AbstractQueuedSynchronizer#doAcquireShared
    /**
     * 在共享模式下获取锁,线程能响应中断
     */
    private void doAcquireShared(int arg) {
        // 创建一个共享节点
        final Node node = addWaiter(Node.SHARED);
        boolean interrupted = false;
        try {
            for (;;) {
                // 读取前置节点
                final Node p = node.predecessor();
                // 前置节点是头节点
                if (p == head) {
                    // 尝试获取锁
                    final int r = tryAcquireShared(arg);
                    /**
                     * 第一个等待获取写锁的线程已经成功获取写锁,并且已经使用完毕而释放,
                     * 当前线程成功获取读锁。
                     */
                    if (r >= 0) {
                        // 设置当前节点为新的头节点,并尝试将信号往后传播,唤醒等待获取读锁的线程
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        return;
                    }
                }
                // 当前节点的驻留线程需要被阻塞,则阻塞当前线程
                if (AbstractQueuedSynchronizer.shouldParkAfterFailedAcquire(p, node)) {
                    interrupted |= parkAndCheckInterrupt();
                }
            }
        } catch (final Throwable t) {
            cancelAcquire(node);
            throw t;
        } finally {
            if (interrupted) {
                AbstractQueuedSynchronizer.selfInterrupt();
            }
        }
    }

AbstractQueuedSynchronizer#setHeadAndPropagate
    /**
     * 设置头节点,如果同步队列中有等待获取读锁的线程,则尝试唤醒
     */
    private void setHeadAndPropagate(Node node, int propagate) {
        Node h = head; // Record old head for check below
        setHead(node);
        if (propagate > 0 || h == null || h.waitStatus < 0 ||
                (h = head) == null || h.waitStatus < 0) {
            // 读取当前节点的后置节点,如果其为 null 或处于共享模式【等待获取读锁】
            final Node s = node.next;
            if (s == null || s.isShared()) {
                // 尝试释放后继节点
                doReleaseShared();
            }
        }
    }

AbstractQueuedSynchronizer#doReleaseShared
    /**
     * Release action for shared mode -- signals successor and ensures
     * propagation. (Note: For exclusive mode, release just amounts
     * to calling unparkSuccessor of head if it needs signal.)
     */
    private void doReleaseShared() {
        for (;;) {
            // 读取头节点
            final Node h = head;
            // 头结点和尾节点不相等,表示有线程在等待获取锁
            if (h != null && h != tail) {
                // 读取头节点的同步状态
                final int ws = h.waitStatus;
                // 后继节点需要被唤醒
                if (ws == Node.SIGNAL) {
                    // 比较更新同步状态
                    if (!h.compareAndSetWaitStatus(Node.SIGNAL, 0))
                    {
                        // 更新失败则再次确认
                        continue;            // loop to recheck cases
                    }
                    // 同步状态成功更新为 0,则唤醒后继节点的驻留线程
                    unparkSuccessor(h);
                }
                // 同步状态为 0,并且比较更新为 PROPAGATE 失败,则继续循环
                else if (ws == 0 &&
                        !h.compareAndSetWaitStatus(0, Node.PROPAGATE))
                {
                    continue;                // loop on failed CAS
                }
            }
            /**
             * 1)后继节点被唤醒并且还未成功获取到锁,则直接退出循环,此时只唤醒了一个线程。
             * 2)被唤醒的后继节点成功获取到读锁,驻留线程已经被释放,此时头节点已经改变,则进行重试。
             */
            if (h == head) {
                break;
            }
        }
    }

读锁释放:ReadLock#unlock

        /**
         * 释放读锁,如果读锁的持有计数值为 0,则写锁可以被获取。
         */
        @Override
        public void unlock() {
            sync.releaseShared(1);
        }

        @Override
        @ReservedStackAccess
        protected final boolean tryReleaseShared(int unused) {
            // 读取当前线程
            final Thread current = Thread.currentThread();
            // 1)当前线程是获取读锁的第一个线程
            if (firstReader == current) {
                // 读锁持有计数为 1,则释放后为 0,
                if (firstReaderHoldCount == 1) {
                    firstReader = null;
                } else {
                    // 读锁被当前线程多次持有,则递减读锁持有计数值
                    firstReaderHoldCount--;
                }
            // 2)当前线程不是持有读锁的线程
            } else {
                // 最近持有读锁的线程计数值
                HoldCounter rh = cachedHoldCounter;
                if (rh == null ||
                        rh.tid != LockSupport.getThreadId(current)) {
                    // 读取当前线程的读锁持计数值对象
                    rh = readHolds.get();
                }
                final int count = rh.count;
                if (count <= 1) {
                    // 如果为 1,则移除线程局部对象
                    readHolds.remove();
                    if (count <= 0) {
                        throw Sync.unmatchedUnlockException();
                    }
                }
                // 递减计数值
                --rh.count;
            }
            // 原子更新同步状态值
            for (;;) {
                final int c = getState();
                final int nextc = c - Sync.SHARED_UNIT;
                if (compareAndSetState(c, nextc)) {
                    // Releasing the read lock has no effect on readers,
                    // but it may allow waiting writers to proceed if
                    // both read and write locks are now free.
                    return nextc == 0;
                }
            }
        }

写锁获取:WriteLock#lock

ReentrantReadWriteLock#Sync
        @Override
        @ReservedStackAccess
        protected final boolean tryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            // 读取同步状态值
            final int c = getState();
            // 写锁被持有的计数值
            final int w = Sync.exclusiveCount(c);
            // 1)读锁或写锁至少有一个被线程持有
            if (c != 0) {
                /**
                 * 2)读锁被线程持有,或当前线程不是写锁持有线程。
                 * 目标线程不允许先获取读锁,后获取写锁,即 ReentrantReadWriteLock 不支持锁升级。
                 */
                if (w == 0 || current != getExclusiveOwnerThread()) {
                    // 尝试获取失败
                    return false;
                }
                // 写锁重入次数超出最大值
                if (w + Sync.exclusiveCount(acquires) > Sync.MAX_COUNT) {
                    throw new Error("Maximum lock count exceeded");
                }
                // 更新同步状态,写锁获取成功
                setState(c + acquires);
                return true;
            }
            // 读锁和写锁都未被线程持有,则原子更新同步状态
            if (writerShouldBlock() ||
                    !compareAndSetState(c, c + acquires)) {
                return false;
            }
            // 设置写锁的独占线程为当前线程
            setExclusiveOwnerThread(current);
            return true;
        }

写锁释放:WriteLock#unlock

        /**
         * 释放锁,写锁已经被释放,则返回 true
         */
        @Override
        @ReservedStackAccess
        protected final boolean tryRelease(int releases) {
            // 写锁是否被当前线程持有
            if (!isHeldExclusively()) {
                throw new IllegalMonitorStateException();
            }
            // 计算同步状态值,写锁计数值保存在低 16 位
            final int nextc = getState() - releases;
            // 新的写锁计数值为 0,则表示读写锁已经自由了
            final boolean free = Sync.exclusiveCount(nextc) == 0;
            if (free) {
                // 清空独占锁持有线程
                setExclusiveOwnerThread(null);
            }
            setState(nextc);
            return free;
        }

以上是关于ReentrantReadWriteLock 源码分析的主要内容,如果未能解决你的问题,请参考以下文章

ReentrantReadWriteLock源码分析

ReentrantReadWriteLock源码分析

Java多线程——ReentrantReadWriteLock源码阅读

Java多线程—ReentrantReadWriteLock源码阅读

ReentrantReadWriteLock 源码分析

[源码分析]读写锁ReentrantReadWriteLock