源码分析:升级版的读写锁 StampedLock

Posted 精灵王

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了源码分析:升级版的读写锁 StampedLock相关的知识,希望对你有一定的参考价值。

简介

StampedLock 是JDK1.8 开始提供的一种锁, 是对之前介绍的读写锁 ReentrantReadWriteLock 的功能增强。StampedLock 有三种模式:Writing(读)、Reading(写)、Optimistic Reading(乐观度),StampedLock 的功能不是基于AQS来实现的,而是完全自己内部实现的功能,不支持重入。在加锁的时候会返回一个戳,解锁的时候需要传入,匹配完成解锁操作。

官方使用示例

class Point {
    private double x, y;
    private final StampedLock sl = new StampedLock();

    void move(double deltaX, double deltaY) // an exclusively locked method
        // 写锁-独占资源
        long stamp = sl.writeLock();
        try {
            x += deltaX;
            y += deltaY;
        } finally {
            sl.unlockWrite(stamp);
        }
    }

    double distanceFromOrigin() // A read-only method
        // 只读的方法,比较乐观,认为读的过程中不会有写,所以这里是乐观度
        long stamp = sl.tryOptimisticRead();
        double currentX = x, currentY = y;
        if (!sl.validate(stamp)) { // 检查乐观读锁后是否有其他写锁发生
            // 获取一个普通的读锁
            stamp = sl.readLock();
            try {
                currentX = x;
                currentY = y;
            } finally {
                // 释放读锁
                sl.unlockRead(stamp);
            }
        }
        return Math.sqrt(currentX * currentX + currentY * currentY);
    }

    void moveIfAtOrigin(double newX, double newY) // upgrade
        // Could instead start with optimistic, not read mode
        long stamp = sl.readLock();
        try {
            while (x == 0.0 && y == 0.0) {
                // 普通读锁转换成写锁,返回0为转换失败
                long ws = sl.tryConvertToWriteLock(stamp);
                if (ws != 0L) {
                    stamp = ws;
                    x = newX;
                    y = newY;
                    break;
                }  else {
                    sl.unlockRead(stamp);
                    stamp = sl.writeLock();
                }
            }
        } finally {
            sl.unlock(stamp);
        }
    }
}

官方demo中用到的 api 主要有获取写锁(writeLock())、释放写锁(unlockWrite(stamp))、获取普通读锁(readLock())、释放普通读锁(unlockRead(stamp))、获取乐观读锁(tryOptimisticRead())、检测乐观读版本(validate(stamp))、普通读锁转换成写锁(tryConvertToWriteLock(stamp))。下面分析源码的时候也主要根据这几个方法来分析。

源码分析

主要内部类

  1. 等待节点:WNode
    用于维护 CLH 队列的节点,源码如下:

    static final class WNode {
        volatile WNode prev;  // 前驱节点
        volatile WNode next;  // 后继节点
        volatile WNode cowait;    // 链接的读者列表
        volatile Thread thread;   // 线程
        volatile int status;      // 状态 0, WAITING, or CANCELLED
        final int mode;           // 两种模式:RMODE or WMODE
        WNode(int m, WNode p) { mode = m; prev = p; }
    }
  2. ReadWriteLockView:实现了ReadWriteLock接口,提供了读写锁获取接口

    final class ReadWriteLockView implements ReadWriteLock {
        public Lock readLock() return asReadLock(); }
        public Lock writeLock() return asWriteLock(); }
    }
  3. ReadLockView 和 WriteLockView:都实现了Lock接口,并实现了所有的方法

主要属性

  1. CLH 队列

    /** Head of CLH queue */
    private transient volatile WNode whead;
    /** Tail (last) of CLH queue */
    private transient volatile WNode wtail;
  2. 其他常量属性

    /** CPU的核心数量,用来控制自旋的次数 */
    private static final int NCPU = Runtime.getRuntime().availableProcessors();
    /** 获取锁入队前最大重试次数:CPU核心数大于1:64,否则0 */
    private static final int SPINS = (NCPU > 1) ? 1 << 6 : 0;
    /** 等待队列的头结点,获取锁最大重试次数:CPU核心数大于1:1024,否则0 */
    private static final int HEAD_SPINS = (NCPU > 1) ? 1 << 10 : 0;
    /** 再次阻塞前最大重试次数:CPU核心数大于1:65536,否则0  */
    private static final int MAX_HEAD_SPINS = (NCPU > 1) ? 1 << 16 : 0;
    /** The period for yielding when waiting for overflow spinlock */
    private static final int OVERFLOW_YIELD_RATE = 7// must be power 2 - 1

    /** 用于读取器计数的位数 */
    private static final int LG_READERS = 7;

    // 用来计算state值的常量
    private static final long RUNIT = 1L;  // 读单位
    // 写锁的标识位 十进制:128  二进制位标示:1000 0000 
    private static final long WBIT  = 1L << LG_READERS;  
    // 读状态标识 admol 十进制:127  二进制: 0111 1111
    private static final long RBITS = WBIT - 1L;   
    // 读锁的最大标识  十进制:126 二进制 :0111 1110 
    private static final long RFULL = RBITS - 1L;  
    // 用来读取读写状态  十进制:255 二进制:1111 1111
    private static final long ABITS = RBITS | WBIT; 
    // ~255 ==  11111111111111111111111111111111111111111111111111111111 1000 0000
    // -128
    private static final long SBITS = ~RBITS; 

    // 同步状态state的初始值 256  二进制:0001 0000 0000
    private static final long ORIGIN = WBIT << 1;

    // 中断
    private static final long INTERRUPTED = 1L;

    // 节点的状态
    private static final int WAITING   = -1;
    private static final int CANCELLED =  1;

    // 节点的模式 
    private static final int RMODE = 0;
    private static final int WMODE = 1;

    /** 同步状态 初始值 256 0001 0000 0000*/
    private transient volatile long state;
    /** 读计数饱和时的额外读取器计数 */
    private transient int readerOverflow;

    StampedLock 虽然没有继承AQS,但是属性上很相似,都有一个CLH队列,和一个同步状态值state, StampedLock用8位来表示读写锁状态,前7位是用来标识读锁状态的,第8位标识写锁占用,如果读锁数量超过了126(0111 1110 ),超出的用readerOverflow来计数。

构造方法

public StampedLock() {
   // 初始值 256,  二进制:0001 0000 0000
    state = ORIGIN;
}

获取写锁:writeLock()

源码展示:

public long writeLock() {
    long s, next;  // bypass acquireWrite in fully unlocked case only
    return ((((s = state) & ABITS) == 0L &&  U.compareAndSwapLong(this, STATE, s, next = s + WBIT)) ? next : acquireWrite(false0L));
}

代码分析:

  1. 首先执行的是 ((s = state) &amp; ABITS)== 0L,用来表示读锁和写锁是否可以被获取
    解析:第一次时,state 初始值是256,ABITS是255,计算过程:0001 0000 0000 && 0000 1111 1111 ,结算结果为0。

  2. 如果CAS执行失败,则执行acquireWrite(false, 0L) ,进入等待队列获取锁

acquireWrite 代码分析:

// interruptible 是否要检查中断
// deadline:0 一直等待获取锁
private long acquireWrite(boolean interruptible, long deadline) {
    // node:即将入队排队的节点
    // p:当前排队节点入队之前的尾节点
    WNode node = null, p;
    // 第一次自旋:排队节点入队列自旋
    for (int spins = -1;;) { // spin while enqueuing
        long m, s, ns;
        // 这个if 和外面一样的,从代码运行到这期间所有没有被释放
        if ((m = (s = state) & ABITS) == 0L) {
            // CAS 再次尝试获取下写锁
            if (U.compareAndSwapLong(this, STATE, s, ns = s + WBIT))
                // 成功获取写锁
                return ns;
        }  else if (spins < 0)  // 走到这,说明上面还是没获取到写锁,写锁被占用了,m的值为128
            // 1. 确定自旋的次数 spins: 64 or 0 
            spins = (m == WBIT && wtail == whead) ? SPINS : 0;
        else if (spins > 0) {
            // 2.自旋的次数大于0,随机减一次自旋次数,直到减到spins为0(by.精灵王 这里实际是空转,没什么特点的逻辑处理)
            if (LockSupport.nextSecondarySeed() >= 0)
                --spins;
        } else if ((p = wtail) == null) { // initialize queue
            // 3.自旋spins减到0后会立马执行到这里
            // p被赋值为尾节点  
            // 初始化队列, WMODE:写,null:前驱节点
            WNode hd = new WNode(WMODE, null);
            if (U.compareAndSwapObject(this, WHEAD, null, hd))
                // 初始化队列时,尾节点等于头节点
                wtail = hd;
        } else if (node == null)
            // 4.初始化队列后,下一次自旋,构建当前排队节点,并指定了其尾节点
            node = new WNode(WMODE, p);
        else if (node.prev != p) // 如果当前节点的前驱不是尾节点
            // 5.前驱节点设置为之前队列的尾节点
            node.prev = p;
        else if (U.compareAndSwapObject(this, WTAIL, p, node)) {
            // 6. CAS 更新尾节点为当前排队的节点 
            p.next = node;
            // 退出自旋
            break;
        }
    }
    // 第二次自旋
    for (int spins = -1;;) {
        WNode h, np, pp; int ps; 
        if ((h = whead) == p) { // 如果头节点和之前的尾节点p是同一个, 说明马上应该轮到node节点获得锁
            if (spins < 0)
                // ① 设置自旋次数  1024 or 0
                spins = HEAD_SPINS;
            else if (spins < MAX_HEAD_SPINS)
                // 第一次自旋1024次没有获取到锁,这次自旋翻倍
                // 自旋次数*2  2048   继续进入到下面的自旋
                spins <<= 1;
            for (int k = spins;;) { // spin at head
                // ② 第三次自旋,不断尝试获得锁(自旋1024或者2048次),直到成功获得锁 或者 break
                long s, ns;
                // ((s = state) & ABITS) == 0L  表示锁没有被占用
                if (((s = state) & ABITS) == 0L) {
                    // CAS 修改state值
                    if (U.compareAndSwapLong(this, STATE, s,  ns = s + WBIT)) {
                        // CAS 修改成功获得锁,设置新的头结点 
                        whead = node;
                        node.prev = null;
                        return ns;
                    }
                } else if (LockSupport.nextSecondarySeed() >= 0 && --k <= 0)
                    // 随机立减自旋次数  自旋次数为0时跳出自旋循环
                    break;
            }
        } else if (h != null) { // help release stale waiters
            // 头节点不为空
            // 进入情景:写锁被获取,队列中很多等待获取读锁的线程,写锁释放,读锁被唤醒后可能进入到这里
            WNode c; Thread w;
            while ((c = h.cowait) != null) { // 自旋
                // 头节点的 cowait不为空
                // h.cowait 修改成节点的下一个cowait
                if (U.compareAndSwapObject(h, WCOWAIT, c, c.cowait) && (w = c.thread) != null)
                    // 唤醒 cowait 里面的线程
                    U.unpark(w);
            }
        }
        if (whead == h) { // 如果头结点没有变化
            // P 是之前的尾节点
            if ((np = node.prev) != p) { 
                // != 之前的尾节点,也就是说当前节点的前驱节点不是尾节点时
                if (np != null)
                    // 保存尾节点和当前节点的连接关系
                    (p = np).next = node;   // stale
            }
            else if ((ps = p.status) == 0)
                // ③ 上面第三次自旋break后会进入到这里,修改尾节点状态
                // 更新尾节点的状态为WAITING:-1, 然后继续回到第二次自旋的地方,重新开始自旋
                U.compareAndSwapInt(p, WSTATUS, 0, WAITING);
            else if (ps == CANCELLED) {
                // p节点状态是取消,则删除p节点
                if ((pp = p.prev) != null) {
                    node.prev = pp;
                    pp.next = node;
                }
            } else {
                // 超时时间
                long time; // 0 argument to park means no timeout
                if (deadline == 0L)
                    time = 0L;
                else if ((time = deadline - System.nanoTime()) <= 0L)
                    // 设置了超时时间 已经超时,取消当前node节点
                    return cancelWaiter(node, node, false);
                Thread wt = Thread.currentThread();
                // 为给定地址设置值,忽略修饰限定符的访问限制,与此类似操作还有: putInt,putDouble,putLong,putChar等
                U.putObject(wt, PARKBLOCKER, this);
                // node节点指向当前线程
                node.thread = wt;
                // p.status < 0 的只有-1 也就是WAITING 
                if (p.status < 0 && (p != h || (state & ABITS) != 0L) && whead == h && node.prev == p)
                    // 阻塞当前线程
                    U.park(false, time);  // emulate LockSupport.park
                // 线程被唤醒后,清除节点的线程
                node.thread = null;
                U.putObject(wt, PARKBLOCKER, null);
                if (interruptible && Thread.interrupted()) // 要检查中断 && 线程有被中断
                    // 取消当前node节点
                    return cancelWaiter(node, node, true);
            }
        }
    }
}

获取写锁过程总结:

  1. 首先检查锁有没有被占用

    1. 没有被占用,尝试 CAS 修改state值,CAS 修改成功则获得锁,返回新的 state 值。

    2. CAS 修改失败的话进入下面自旋的逻辑

  2. 第一层自旋(节点入队):

    1. 先检查下锁有没有被占用(m = (s = state) & ABITS) == 0L, CAS 尝试一下获取锁,获取失败再继续自旋

    2. 入队之前会自旋64次(CPU核心数大于1),期间不做任何处理

    3. 初始化排队队列的队头队尾节点,当前节点加入到队尾,CAS 更新尾节点,更新成功则退出第一次自旋

  3. 开始第二层自旋(尝试获取锁,阻塞线程),第二层自旋和第三层自旋嵌套执行的:

    • 这里其实就是自旋和park线程之间性能的一个权衡,马上就要获得锁了,是自旋还是阻塞线程继续等,这里选择了先自旋1024次,如果没有获得锁,继续自旋2048次,如果还是没获得锁,则退出第三层自旋,回到第二层自旋,准备阻塞当前线程。

    1. 初始化第三层自旋次数(第一次1024,第二次2048),开启第三层自旋

    2. 位运算检查锁是否有被释放((s = state) & ABITS) == 0L),CAS 修改 state 值,修改成功,退出,返回新的state值

    3. 如果头节点和之前的尾节点p还是是同一个(没有其他获取锁的节点排队已经入队), 说明马上应该轮到node节点获得锁(排队的只有node节点)。

    4. 如果排队的头结点不为空,检查头结点的cowait 链表,如果不为空,自旋 CAS 修改头节点的cowait, 尝试唤醒整个链的节点线程

    5. 第三层自旋完成后还是没有获取到锁,阻塞当前线程,等待被唤醒,被唤醒后继续第二层自旋获取锁,重复这个过程,直到获取锁成功推出。

释放写锁:unlockWrite(stamp)

public void unlockWrite(long stamp) {
    WNode h;
    // state != stamp  检查解锁与加锁的版本是否匹配
    // (stamp & WBIT) == 0L 为true的话说明锁没有被占用
    if (state != stamp || (stamp & WBIT) == 0L)
        // 抛出异常
        throw new IllegalMonitorStateException();
    // 释放写锁,会增加state的版本
    // stamp += WBIT 等于二进制(第一次加写锁和解锁) 0001 1000 0000 + 0000 1000 0000 == 0010 0000 0000
    // 解锁会把stamp 的二进制第8位设置为0
    // 相当于重新赋值state值
    state = (stamp += WBIT) == 0L ? ORIGIN : stamp;
    if ((h = whead) != null && h.status != 0// 头结点不为 && 状态不为初始状态0(一般是WAITING -1),说明队列中有排队获取锁的线程
         // 唤醒头节点的后继节点
         release(h);
}
private void release(WNode h) {
    if (h != null) {
        // q节点:头节点的有效后继节点
        // w: 需要唤醒的线程
        WNode q; Thread w;
        // 将头节点的状态设置成0
        U.compareAndSwapInt(h, WSTATUS, WAITING, 0);
        if ((q = h.next) == null || q.status == CANCELLED) { // 如果头节点的后继为空 或者 是取消状态
            // 就从排队的队尾找一个有效的节点
            for (WNode t = wtail; t != null && t != h; t = t.prev)
                if (t.status <= 0)
                    q = t;
        }
        // 找到了有效的节点,唤醒其线程
        if (q != null && (w = q.thread) != null)
            U.unpark(w);
    }
}

释放写锁过程总结:

  1. 检查锁印章戳是否匹配,锁是否有被占用,检查不通过抛出异常

  2. 通过位运算stamp += WBIT计算新的state值,state 二进制位的第8位会被设置成0就是写锁解锁

  3. 检测队列中是否有排队获取锁的线程

    1. 唤醒下一个等待获取锁的线程(`unpark(thread)`)

获取普通读锁:readLock()

public long readLock() {
    long s = state, next;  // bypass acquireRead on common uncontended case
    // 在没有线程获得锁的情况下,s的初始值是256 
    // whead == wtail 为true:表示队列为空
    // (s & ABITS) < RFULL: 已获取读锁的数小于最大值126
    return ((whead == wtail && (s & ABITS) < RFULL &&  U.compareAndSwapLong(this, STATE, s, next = s + RUNIT)) 
                    ? next : acquireRead(false0L));
}

代码解析:

  1. 队列为空 && 已获取读锁次数小于126 && CAS 修改 state 值

    1. 条件完全成立,成功获得锁,返回新的state值

  2. 没有成功,进入到acquireRead(false, 0L)方法排队获取锁

acquireRead源码展示(代码超100行,需耐心观看):

private long acquireRead(boolean interruptible, long deadline) {
        // p节点为尾节点  node为入队节点
        WNode node = null, p;
        // 第一层大循环 第一次自旋,是不是和获取写锁的很像?
        for (int spins = -1;;) {
            WNode h;
            if ((h = whead) == (p = wtail)) { // 首尾节点相等,说明队列为空,有线程在排队不会进入if
                // 前面没获取到锁,队列又为空,是不是应该马上就是当前线程获取锁了?
                // 第二次自旋 自旋64次 目的是为了看马上能不能获取锁(排队队列为空,没线程排队时,会在这里自旋获取锁)
                for (long m, s, ns;;) {
                    // 这里是个三目运算,代码太长,拆开来看
                    // (m = (s = state) & ABITS) < RFULL;和进入readLock()方法时的条件一样,判断读锁的数是否达到最大值,只有写锁被获取,这里就是false
                    // 我们假设前面写锁被获取了,现在获取读锁,m 就是128,大于RFULL 126
                    // 没有达到最大值, CAS 修改状态值 U.compareAndSwapLong(this, STATE, s, ns = s + RUNIT)
                    // 超过最大值了,记得前面那个readerOverflow属性不?在tryIncReaderOverflow这累加   
                    if ((m = (s = state) & ABITS) < RFULL ? U.compareAndSwapLong(this, STATE, s, ns = s + RUNIT) :
                        (m < WBIT && (ns = tryIncReaderOverflow(s)) != 0L))
                        return ns;
                    else if (m >= WBIT) {// if条件成立,说明说明被占用
                        // 
                        if (spins > 0) {
                            if (LockSupport.nextSecondarySeed() >= 0)
                                --spins; // 随机减自旋次数
                        } else {
                            if (spins == 0) { // 自旋次数减到0了,还没获取到读锁
                                WNode nh = whead, np = wtail;
                                if ((nh == h && np == p) || (h = nh) != (p = np))
                                    break// 退出自旋
                            } 
                            spins = SPINS; // 初始自旋次数 64次
                        }
                    }
                }
               // 上面到这里都是在处理队列为空,马上要获取到锁的情况
            } 
                        if (p == null) { // 尾节点为空,初始化排队队列,有线程在排队时不会进入到这里
                WNode hd = new WNode(WMODE, null);
                if (U.compareAndSwapObject(this, WHEAD, null, hd)) // CAS 设置头节点
                    // 运行到这里后,会回到第一次的自旋,再次进入到第二次自旋,这次 spins 为0,只会自旋一次
                    wtail = hd;
            } else if (node == null)
                // 初始化当前入队排队节点,有线程在排队时,直接进入到这里,然后继续第一次自旋
                node = new WNode(RMODE, p); 
            else if (h == p || p.mode != RMODE) { // 队列为空 或者 尾节点不是读模式
                // 排队节点入队
                if (node.prev != p)
                    node.prev = p; // 设置排队节点的前驱节点
                    // 继续第一次自旋     
                else if (U.compareAndSwapObject(this, WTAIL, p, node)) {  // CAS 修改尾节点
                    p.next = node; // 老的尾节点的后继节点为当前节点
                    // 进入到这里会退出第一层自旋, 直接进入到下面第二层的大的自旋
                    break;
                }                
            } else if (!U.compareAndSwapObject(p, WCOWAIT, node.cowait = p.cowait, node))
                // 上面那个if分支进不去,只有条件:队列不为空 and  尾节点是读模式 为真
                // 进入到了这里,说明CAS失败
                // 这里的CAS 就是把当前节点加入到尾节点的cowait栈里面
                // 从这里可以看出加入的顺序是个栈结构,先把旧的尾节点的cowait赋值给node节点的cowait,然后再把node节点赋值给尾节点
                node.cowait = null;
            else { 
                // 进入到这,说明上面的if分支都没有进去,尾节点不为空,当前节点不为空,队列不为空,尾节点是读模式,上面CAS修改成功
                // 总结一下进入到这里的条件就是,有个线程获得了写锁还没释放,队列中有读线程在排队
                // 第三次自旋,有线程在排队获取锁时,会进入到这里自旋
                for (;;) {
                    WNode pp, c; Thread w;
                    if ((h = whead) != null && (c = h.cowait) != null &&
                        U.compareAndSwapObject(h, WCOWAIT, c, c.cowait) &&
                        (w = c.thread) != null// help release
                        // 头节点不为空,且其cowait节点不为空,唤醒整个cowait栈的线程
                        U.unpark(w);
                    if (h == (pp = p.prev) || h == p || pp == null) {
                        // 头节点等于尾节点的前驱节点 或者头节点等于尾节点 或者 尾节点的前驱节点为空
                        // 说明还是马上轮到自己获得锁       
                        long m, s, ns;
                        do {
                            // 判断是否可以使用CAS获取读锁             
                            if ((m = (s = state) & ABITS) < RFULL ?  
                                U.compareAndSwapLong(this, STATE, s,  ns = s + RUNIT) :
                                (m < WBIT && (ns = tryIncReaderOverflow(s)) != 0L))
                                return ns;
                        } while (m < WBIT); // m < WBIT时表示写锁没有被占用,一直尝试获取锁
                    }
                    if (whead == h && p.prev == pp) { // 对头没有发生变化 ,队尾也没发生变化
                        long time;
                        if (pp == null || h == p || p.status > 0) { // 队尾的前驱节点为空或者 头节点等于尾节点 或者 老的尾节点被取消(>0的状态只有1,取消)
                            node = null// 抛弃当前节点,退出当前循环,回到第一层的自旋,重新构建节点
                            break;
                        }
                        if (deadline == 0L)
                            time = 0L;
                        else if ((time = deadline - System.nanoTime()) <= 0L// 超时
                            return cancelWaiter(node, p, false); // 取消节点
                        Thread wt = Thread.currentThread();
                        U.putObject(wt, PARKBLOCKER, this);
                        node.thread = wt;
                        if ((h != pp || (state & ABITS) == WBIT) &&
                            whead == h && p.prev == pp)
                            U.park(false, time); // 阻塞当前线程
                        // 线程被唤醒,开始继续自旋获取锁  
                        node.thread = null;
                        U.putObject(wt, PARKBLOCKER, null);
                        if (interruptible && Thread.interrupted())
                            return cancelWaiter(node, p, true);
                    }
                }
            }
        }
        // 第二层大的循环 
        for (int spins = -1;;) {
            WNode h, np, pp; int ps;
            if ((h = whead) == p) {  // 如果队列为空,说明马上轮到当前线程获得锁了
                // 这个大的if 里面做的就是获取锁
                if (spins < 0)
                    // 初始化本次自旋获取锁的次数:1024次
                    spins = HEAD_SPINS;
                else if (spins < MAX_HEAD_SPINS)
                    // 上面1024次自旋没有获取到锁,就自旋翻倍:2048次,继续下面的自旋
                    spins <<= 1;
                // 开始自旋获取锁
                for (int k = spins;;) { // spin at head
                    long m, s, ns;
                    // 这个if条件 检查了是否可以获取锁,如果可以就CAS获取锁
                    if ((m = (s = state) & ABITS) < RFULL ?
                        U.compareAndSwapLong(this, STATE, s, ns = s + RUNIT) :
                        (m < WBIT && (ns = tryIncReaderOverflow(s)) != 0L)) {
                        // 进入到这个if里面,说明就获得了锁
                        WNode c; Thread w;
                        // 这里的node节点是还没有绑定thread的
                        whead = node;
                        node.prev = null;
                        // 要唤醒当前node节点中的所有cowait节点线程
                        // 当前节点是在上面的第一层大的自旋入队的,其他获取读锁的节点都是挂在这个节点的cowait下的
                        while ((c = node.cowait) != null) {
                            if (U.compareAndSwapObject(node, WCOWAIT,
                                                       c, c.cowait) &&
                                (w = c.thread) != null)
                                U.unpark(w); // 唤醒线程
                        }
                        return ns;
                    }  else if (m >= WBIT &&  LockSupport.nextSecondarySeed() >= 0 && --k <= 0)
                        // 上面没有获取到锁,自旋减次数,直到为0,退出自旋
                        break;
                }
            } else if (h != null) { // 队列不为空,头节点不为空
                WNode c; Thread w;
                while ((c = h.cowait) != null) {
                    if (U.compareAndSwapObject(h, WCOWAIT, c, c.cowait) &&
                        (w = c.thread) != null)
                        U.unpark(w);
                }
            }
            // 运行到这,说明还没获取到锁
            if (whead == h) { // 头节点没变过
                if ((np = node.prev) != p) { // 检查节点的链接关系
                    if (np != null)
                        (p = np).next = node;   // stale
                }
                else if ((ps = p.status) == 0)
                    // 检查尾节点的状态,为0则更新成-1,回到第二层大的循环开始处
                    U.compareAndSwapInt(p, WSTATUS, 0, WAITING);
                else if (ps == CANCELLED) { // p节点被取消,删除这个节点
                    if ((pp = p.prev) != null) {
                        node.prev = pp;
                        pp.next = node;
                    }
                } else { // 上面2048次自旋后还是没获取到锁,进入到最终阻塞线程的环节
                    long time;
                    if (deadline == 0L)
                        time = 0L;
                    else if ((time = deadline - System.nanoTime()) <= 0L)
                        return cancelWaiter(node, node, false); // 超时了就取消当前节点
                    Thread wt = Thread.currentThread(); // 当前线程
                    U.putObject(wt, PARKBLOCKER, this);
                    node.thread = wt;
                    if (p.status < 0 && (p != h || (state & ABITS) == WBIT) && whead == h && node.prev == p)
                        U.park(false, time); // 阻塞线程
                    // 线程被唤醒了,继续执行 第二层大的自旋获取锁
                    node.thread = null;
                    U.putObject(wt, PARKBLOCKER, null);
                    if (interruptible && Thread.interrupted()) // 被唤醒了,发现要求中断线程 并且线程被中断了
                        return cancelWaiter(node, node, true); // 取消当前节点
                }
            }
        }
    }

获取普通读锁总结:

  • 位运算检查写锁是否被占用,读锁是否超限制

    1. 满足条件,直接CAS 修改state值,并返回新的state值

  • 开始第一层大的自旋,第一层大的自旋里面包含了两种情况不同的自旋:

    • 这种情况,会不断尝试获取锁,阻塞线程,等待被唤醒,一直在这个自旋里面,直到获得锁,或者超时中断被取消,不会进入到第二层大的自旋

    • 这种情况说明,锁虽然已经被占用,但是马上就应该是我得到锁了,所以我先在这儿自旋(64次)等等你释放锁,免得阻塞我自己,之后唤醒还需要成本

    • 自旋没有获取到锁,会退出第一层大的自旋,进入到第二层大的自旋

    1. 第一种自旋情况:排队的队列为空,没有其他线程在排队等待锁时

    2. 第二种自旋情况:写锁被占用,排队的队列不为空,队尾是读模式时

  • 第二层大的自旋

    • 首先会尝试自旋1024次获得锁,如果前面还没释放锁,再自旋2048次

    • 这一层的自旋是对第一层里面第一种自旋情况(马上轮到我获得锁,但是前面持有锁的线程就是不释放)的补充,因为没有线程在排队,只要前面的线程释放了锁,马上就可以获得锁了,所以这一层还是在自旋获得锁,只不过自旋次数有增加

    • 如果2048次之后还是没有等到前面的锁释放,就阻塞当前线程,等待被唤醒,直到获得锁,或者超时中断被取消

cowait栈分析

下面代码是main线程先获取写锁不释放,之后T0,T1,T2,T3线程先后去获取读锁,最后断点观察整个排队队列的情况

StampedLock sl = new StampedLock();
long stamp = sl.writeLock();
// 先让T0线程去排队到尾节点
TimeUnit.SECONDS.sleep(1);
new Thread(new Runnable(){
    @SneakyThrows
    @Override
    public void run(){
        long stamp = sl.readLock();
        System.out.println(stamp + "   x");
    }
},"T0").start();

// 之后T1线程来获取读
TimeUnit.SECONDS.sleep(3);
new Thread(new Runnable(){
    @SneakyThrows
    @Override
    public void run(){
        long stamp = sl.readLock();
        System.out.println(stamp + "   x");
    }
},"T1").start();
TimeUnit.SECONDS.sleep(3);
new Thread(new Runnable(){
    @SneakyThrows
    @Override
    public void run(){
        long stamp = sl.readLock();
        System.out.println(stamp + "   x");
    }
},"T2").start();
TimeUnit.SECONDS.sleep(3); // 在这里先断点,进入到这里后,再到源码位置去断点,就可以看到如下图的情况了
new Thread(new Runnable(){
    @SneakyThrows
    @Override
    public void run(){
        long stamp = sl.readLock();
        System.out.println(stamp + "   x");
    }
},"T3").start();

运行代码后,断点截图:

Untitled

他们的关系可以用如下表示,横向是链表,纵向是cowait栈。

cowait栈

释放读锁:unlockWrite(stamp)

public void unlockRead(long stamp) {
    long s, m; WNode h;
    for (;;) { // 自旋
        if (((s = state) & SBITS) != (stamp & SBITS) || (stamp & ABITS) == 0L || (m = s & ABITS) == 0L || m == WBIT)
            // 检查版本
            throw new IllegalMonitorStateException();
        if (m < RFULL) {  // 锁标识小于读锁的最大标识
            if (U.compareAndSwapLong(this, STATE, s, s - RUNIT)) { // CAS 更新state值
                if (m == RUNIT && (h = whead) != null && h.status != 0// 头结点不为空
                    release(h); // 唤醒下一个节点
                break;
            }
        } else if (tryDecReaderOverflow(s) != 0L
            // 读锁个数饱和溢出,尝试减少readerOverflow
            break;
    }
}
private void release(WNode h) {
    if (h != null) {
        WNode q; Thread w;
        U.compareAndSwapInt(h, WSTATUS, WAITING, 0);
        if ((q = h.next) == null || q.status == CANCELLED) {
            for (WNode t = wtail; t != null && t != h; t = t.prev)
                if (t.status <= 0)
                    q = t;
        }
        if (q != null && (w = q.thread) != null)
            U.unpark(w);
    }
}

释放读锁的逻辑也比较简单,和释放写锁的逻辑很相识,唤醒下一个节点的release方法也完全一致

获取乐观读锁:tryOptimisticRead()

public long tryOptimisticRead() {
    long s;
    return (((s = state) & WBIT) == 0L) ? (s & SBITS) : 0L;
}

乐观读锁的逻辑也比较简单,就一个三目运算,((s = state) & WBIT) == 0L 就是看写锁是否有被占用,写锁被占用返回0,否则返回写锁没被占用的包含高位版本有效戳(也就是写锁的版本)。

检测乐观读版本:validate(stamp)

public boolean validate(long stamp) {
    // 插入内存屏障,禁止load操作重排序。
    // 由于StampedLock提供的乐观读锁不阻塞写线程获取读锁,当线程共享变量从主内存load到线程工作内存时,会存在数据不一致问题
    // 解决锁状态校验运算发生重排序导致锁状态校验不准确的问题
      U.loadFence(); 
      return (stamp & SBITS) == (state & SBITS);
}

返回true:表示期间没有写锁发生,读锁为所谓

返回false:表示期间有写锁发生

那这里是怎么计算的呢?

SBITS为-128,用二进制表示是:1111 1111 1111 1000 0000

两种情况:

  1. 假如先获取乐观锁,再获取读锁;
    乐观锁返回的stamp为256,二进制位是 0001 0000 0000;
    获取读锁之后state值是257,二进制位是 0001 0000 0001;
    它们分别于与-128 进行与运算后都是0001 0000 0000,也就是十进制256,返回true;

  2. 假如先获取乐观锁,再获取写锁;
    乐观锁返回的stamp为256,二进制位是 0001 0000 0000;
    获取写锁之后state值是384,二进制位是 0001 1000 0000;
    它们分别与-128 进行与运算后,相当与 256 == 384,结果肯定返回false;

普通读锁转换成写锁:tryConvertToWriteLock(stamp)

public long tryConvertToWriteLock(long stamp) {
    // m标识最新的锁标识
    // a标识被转换的锁的锁标识
    long a = stamp & ABITS, m, s, next;
    while (((s = state) & SBITS) == (stamp & SBITS)) { // 检查锁持有状态
        if ((m = s & ABITS) == 0L) {
            if (a != 0L)
                break;
            if (U.compareAndSwapLong(this, STATE, s, next = s + WBIT))
                return next;
        } else if (m == WBIT) { // 写锁已经被占用
            if (a != m)
                break;
            return stamp;   // 说明被转换前就是写锁
        } else if (m == RUNIT && a != 0L) { // 被转换前的是普通读锁,写锁没被占用
            if (U.compareAndSwapLong(this, STATE, s, next = s - RUNIT + WBIT))
                // s:是之前的锁状态
                // s - RUNIT:就是释放读锁
                //  + WBIT :就是加写锁(进入之前写锁没被占用)
                return next; // 返回最新的锁状态
        } else
            break;  // 其他情况,全部返回0,转换失败
    }
    return 0L// 返回0,标识转换写锁失败
}

普通读锁转换成写锁过程总结:

  1. 如果转换前是写锁,直接返回写锁

  2. 如果转换前是读锁,转换期间,写锁被占用,返回0,转换失败

  3. 如果转换前是读锁,写锁没有被占用,释放读锁,加写锁,返回写锁,转换成功

  4. 其他情况,全部返回0,转换失败

StampedLock 总结

  1. StampedLock 是一种支持乐观读锁的高级版读写锁

  2. StampedLock 没有使用AQS 同步框架,而是完全自己实现的同步状态state 和 CLH队列维护算法

  3. 同步状态state的低7位标识读锁的数量,第8位标识写锁是否被占用,高24位记录写锁的版本,每次释放写锁会版本位置会加1

  4. 写锁每次获取state会加128,释放也会加128,读锁是加减1

  5. StampedLock 的连续多个读锁线程,只有第一个是在队列上,后面的读线程都存在第一个线程的cowait栈结构上

  6. StampedLock 唤醒一个读锁线程后,读锁线程会唤醒所有在它cowait栈上的等待读锁线程

  7. StampedLock 用到了大量的自旋操作,适合持有锁时间比较短的任务,持有锁时间长的话等待的线程自旋后还是会阻塞自己。

  8. StampedLock 同一个线程先获取读锁,再获取写锁也会死锁

  9. StampedLock 写锁不支持重入,读锁支持重入

  10. StampedLock 不支持条件锁

  11. StampedLock 不支持公平锁,上来有条件就 CAS 尝试获得锁

StampedLock 与 ReentrantReadWriteLock的区别总结

使用功能上的区别:

  1. StampedLock 支持乐观读锁,RRWL 没有

  2. StampedLock 支持锁转换,tryConvertToXXXXX(stamp)

  3. StampedLock 写锁不支持重入,RRWL 支持重入

  4. StampedLock 不支持条件锁,RRWL 支持条件锁

  5. StampedLock 不支持公平锁,RRWL 支持公平锁

底层实现的区别:

  1. StampedLock 没有使用同步框架AQS,RRWL 是基于AQS 来实现排队、阻塞、唤醒等功能的

  2. StampedLock 获取锁时,会直接使用CAS尝试获得锁(不公平,不看排队),会根据CPU核心数来决定自旋次数等待获取锁

  3. StampedLock 的 CLH 队列中连续的读线程只有首个节点存储在队列中,后面的节点都存储的首个节点的cowait栈中,即 1→5→4→3→2→1 这种顺序。

  4. StampedLock 中同步状态 state 被分成了三部分,第8位记录的是写锁的状态,低7位记录读锁的次数,其他位记录的是写锁的版本

  5. RRWL 中同步状态 state 被分成两部分,高16位记录读锁次数,低16位记录写锁次数

  6. StampedLock 唤醒一个读锁线程后,读线程会唤醒所有在它cowait栈上的等待读锁线程


以上是关于源码分析:升级版的读写锁 StampedLock的主要内容,如果未能解决你的问题,请参考以下文章

JUC之StampedLock读写锁增强辅助类

死磕 java同步系列之StampedLock源码解析

Java并发工具类StampedLock:比读写锁更快的锁

Java8 读写锁的改进:StampedLock(笔记)

读写锁StampedLock的思想

通俗易懂的JUC源码剖析-StampedLock