Java并发编程之StampedLock锁源码探究
Posted 妮蔻
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Java并发编程之StampedLock锁源码探究相关的知识,希望对你有一定的参考价值。
StampedLock是JUC并发包里面JDK1.8版本新增的一个锁,该锁提供了三种模式的读写控制,当调用获取锁的系列函数的时候,会返回一个long 型的变量,该变量被称为戳记(stamp),这个戳记代表了锁的状态。
try系列获取锁的函数,当获取锁失败后会返回为0的stamp值。当调用释放锁和转换锁的方法时候需要传入获取锁时候返回的stamp值。
StampedLockd的内部实现是基于CLH锁的,CLH锁原理:锁维护着一个等待线程队列,所有申请锁且失败的线程都记录在队列。一个节点代表一个线程,保存着一个标记位locked,用以判断当前线程是否已经释放锁。当一个线程试图获取锁时,从队列尾节点作为前序节点,循环判断所有的前序节点是否已经成功释放锁。
如下图所示:
我们首先看Stampedlock有哪些属性先,源码如下:
private static final long serialVersionUID = -6001602636862214147L; /** 获取服务器CPU核数 */ private static final int NCPU = Runtime.getRuntime().availableProcessors(); /** 线程入队列前自旋次数 */ private static final int SPINS = (NCPU > 1) ? 1 << 6 : 0; /** 队列头结点自旋获取锁最大失败次数后再次进入队列 */ private static final int HEAD_SPINS = (NCPU > 1) ? 1 << 10 : 0; /** 重新阻塞前的最大重试次数 */ private static final int MAX_HEAD_SPINS = (NCPU > 1) ? 1 << 16 : 0; /** The period for yielding when waiting for overflow spinlock */ private static final int OVERFLOW_YIELD_RATE = 7; // must be power 2 - 1 /** 溢出之前用于阅读器计数的位数 */ private static final int LG_READERS = 7; // 锁定状态和stamp操作的值 private static final long RUNIT = 1L; // 每次获取读锁 进行+1 private static final long WBIT = 1L << LG_READERS; //写状态 1000 0000 private static final long RBITS = WBIT - 1L; //溢出保护 0111 1111 private static final long RFULL = RBITS - 1L; //最大读线程数 0111 1110 private static final long ABITS = RBITS | WBIT; // 掩码 前8位都为1 1111 1111 private static final long SBITS = ~RBITS; // 掩码 1... 1000 0000 //锁state初始值,第9位为1,避免算术时和0冲突 private static final long ORIGIN = WBIT << 1; // 来自取消获取方法的特殊值,因此调用者可以抛出IE private static final long INTERRUPTED = 1L; // WNode节点的status值 private static final int WAITING = -1; private static final int CANCELLED = 1; // WNode节点的读写模式 private static final int RMODE = 0; private static final int WMODE = 1; /** Wait nodes */ static final class WNode { volatile WNode prev; volatile WNode next; volatile WNode cowait; // 读模式使用该节点形成栈 volatile Thread thread; // non-null while possibly parked volatile int status; // 0, WAITING, or CANCELLED final int mode; // RMODE or WMODE WNode(int m, WNode p) { mode = m; prev = p; } } /** CLH队头节点 */ private transient volatile WNode whead; /** CLH队尾节点 */ private transient volatile WNode wtail; // views transient ReadLockView readLockView; transient WriteLockView writeLockView; transient ReadWriteLockView readWriteLockView; /** 锁队列状态, 当处于写模式时第8位为1,读模式时前7为为1-126(附加的readerOverflow用于当读者超过126时) */ private transient volatile long state; /** 将state超过 RFULL=126的值放到readerOverflow字段中 */ private transient int readerOverflow;
StampedLockd源码中的WNote就是等待链表队列,每一个WNode标识一个等待线程,whead为CLH队列头,wtail为CLH队列尾,state为锁的状态。long型即64位,倒数第八位标识写锁状态,如果为1,标识写锁占用!下面围绕这个state来讲述锁操作。
首先是常量标识:
readreaderOverflow
记录。因为readreaderOverflow
不是个原子变量,所以为了保证它的同步性,需要进行同步处理。WBIT=1000 0000(即-128)
RBIT =0111 1111(即127)
SBIT =1000 0000(后7位表示当前正在读取的线程数量,清0)
StampedLock 给我们提供了3种读写模式的锁,如下:
1.写锁writeLock是一个独占锁,同时只有一个线程可以获取该锁,当一个线程获取该锁后,其他请求读锁和写锁的线程必须等待,这跟ReentrantReadWriteLock 的写锁很相似,不过要注意的是StampedLock的写锁是不可重入锁,
当目前没有线程持有读锁或者写锁的时候才可以获取到该锁,请求该锁成功后会返回一个stamp 票据变量来表示该锁的版本,如下源码所示:
/** * *获取写锁,获取失败会一直阻塞,直到获得锁成功 * @return 可以用来解锁或转换模式的戳记(128的整数) */ public long writeLock() { long s, next; return ((((s = state) & ABITS) == 0L && // 完全没有任何锁(没有读锁和写锁)的时候可以通过 U.compareAndSwapLong(this, STATE, s, next = s + WBIT)) ? //第8位置为1 next : acquireWrite(false, 0L)); }
writeLock():典型的cas操作,如果STATE等于s,设置写锁位为1(s+WBIT)。acquireWrite跟acquireRead逻辑类似,先自旋尝试、加入等待队列、直至最终Unsafe.park()挂起线程。
private long acquireWrite(boolean interruptible, long deadline) { WNode node = null, p; for (int spins = -1;;) { // 入队时自旋 long m, s, ns; //无锁 if ((m = (s = state) & ABITS) == 0L) { if (U.compareAndSwapLong(this, STATE, s, ns = s + WBIT)) return ns; } else if (spins < 0) //持有写锁,并且队列为空 spins = (m == WBIT && wtail == whead) ? SPINS : 0; else if (spins > 0) { //恒成立 if (LockSupport.nextSecondarySeed() >= 0) --spins; } else if ((p = wtail) == null) { //初始化队列,写锁入队列 WNode hd = new WNode(WMODE, null); if (U.compareAndSwapObject(this, WHEAD, null, hd)) wtail = hd; } else if (node == null) //不为空,写锁入队列 node = new WNode(WMODE, p); else if (node.prev != p) node.prev = p; else if (U.compareAndSwapObject(this, WTAIL, p, node)) { p.next = node; break;//入队列成功退出循环 } } for (int spins = -1;;) { WNode h, np, pp; int ps; //前驱节点为头节点 if ((h = whead) == p) { if (spins < 0) spins = HEAD_SPINS; else if (spins < MAX_HEAD_SPINS) spins <<= 1; for (int k = spins;;) { // spin at head long s, ns; //无锁 if (((s = state) & ABITS) == 0L) { if (U.compareAndSwapLong(this, STATE, s, ns = s + WBIT)) { //当前节点设置为头结点 whead = node; node.prev = null; return ns; } } else if (LockSupport.nextSecondarySeed() >= 0 && --k <= 0) break; } } else if (h != null) { // help release stale waiters WNode c; Thread w; //头结点为读锁将栈中所有读锁线程唤醒 while ((c = h.cowait) != null) { if (U.compareAndSwapObject(h, WCOWAIT, c, c.cowait) && (w = c.thread) != null) U.unpark(w); } } // if (whead == h) { if ((np = node.prev) != p) { if (np != null) (p = np).next = node; // stale } else if ((ps = p.status) == 0) //前驱节点置为等待状态 U.compareAndSwapInt(p, WSTATUS, 0, WAITING); else if (ps == CANCELLED) { if ((pp = p.prev) != null) { node.prev = pp; pp.next = node; } } else { long time; // 0 argument to park means no timeout if (deadline == 0L) time = 0L; else if ((time = deadline - System.nanoTime()) <= 0L) return cancelWaiter(node, node, false); Thread wt = Thread.currentThread(); U.putObject(wt, PARKBLOCKER, this); node.thread = wt; if (p.status < 0 && (p != h || (state & ABITS) != 0L) && whead == h && node.prev == p) U.park(false, time); // emulate LockSupport.park node.thread = null; U.putObject(wt, PARKBLOCKER, null); if (interruptible && Thread.interrupted()) return cancelWaiter(node, node, true); } } } }
并且StampedLock还提供了非阻塞tryWriteLock方法,源码如下:
/** * 没有任何锁时则获取写锁,否则返回0 * * @return 可以用来解锁或转换模式的戳记(128的整数),获取失败返回0 */ public long tryWriteLock() { long s, next; return ((((s = state) & ABITS) == 0L && U.compareAndSwapLong(this, STATE, s, next = s + WBIT)) ? next : 0L); }
/** * unit时间内获得写锁成功返回状态值,失败返回0,或抛出InterruptedException * @return 0:获得锁失败 * @throws InterruptedException 线程获得锁之前调用interrupt()方法抛出的异常 */ public long tryWriteLock(long time, TimeUnit unit) throws InterruptedException { long nanos = unit.toNanos(time); if (!Thread.interrupted()) { long next, deadline; if ((next = tryWriteLock()) != 0L) //获得锁成功 return next; if (nanos <= 0L) //超时返回0 return 0L; if ((deadline = System.nanoTime() + nanos) == 0L) deadline = 1L; if ((next = acquireWrite(true, deadline)) != INTERRUPTED) //规定时间内获得锁结果 return next; } throw new InterruptedException(); }
当释放该锁的时候需要调用unlockWrite方法并传递获取锁的时候的stamp参数。源码如下:
/** * state匹配stamp则释放写锁, * @throws IllegalMonitorStateException 不匹配则抛出异常 */ public void unlockWrite(long stamp) { WNode h; //state不匹配stamp 或者 没有写锁 if (state != stamp || (stamp & WBIT) == 0L) throw new IllegalMonitorStateException(); //state += WBIT, 第8位置为0,但state & SBITS 会循环,一共有4个值 state = (stamp += WBIT) == 0L ? ORIGIN : stamp; if ((h = whead) != null && h.status != 0) //唤醒继承者节点线程 release(h); }
unlockWrite():释放锁与加锁动作相反。将写标记位清零,如果state溢出,则退回到初始值;
2.悲观锁readLock,是个共享锁,在没有线程获取独占写锁的情况下,同时多个线程可以获取该锁;如果已经有线程持有写锁,其他线程请求获取该锁会被阻塞,这类似ReentrantReadWriteLock 的读锁(不同在于这里的读锁是不可重入锁)。
这里说的悲观是指在具体操作数据前,悲观的认为其他线程可能要对自己操作的数据进行修改,所以需要先对数据加锁,这是在读少写多的情况下的一种考虑,请求该锁成功后会返回一个stamp票据变量来表示该锁的版本,源码如下:
/** * 悲观读锁,非独占锁,为获得锁一直处于阻塞状态,直到获得锁为止 */ public long readLock() { long s = state, next; // 队列为空 && 没有写锁同时读锁数小于126 && CAS修改状态成功 则状态加1并返回,否则自旋获取读锁 return ((whead == wtail && (s & ABITS) < RFULL && U.compareAndSwapLong(this, STATE, s, next = s + RUNIT)) ? next : acquireRead(false, 0L)); }
乐观锁失败后锁升级为readLock():尝试state+1,用于统计读线程的数量,如果失败,进入acquireRead()进行自旋,通过CAS获取锁。
如果自旋失败,入CLH队列,然后再自旋,如果成功获得读锁,则激活cowait队列中的读线程Unsafe.unpark(),如果最终依然失败,则Unsafe().park()挂起当前线程。
/** * @param interruptible 是否允许中断 * @param 标识超时限时(0代表不限时),然后进入循环。 * @return next state, or INTERRUPTED */ private long acquireRead(boolean interruptible, long deadline) { WNode node = null, p; //自旋 for (int spins = -1;;) { WNode h; //判断队列为空 if ((h = whead) == (p = wtail)) { //定义 long m,s,ns,并循环 for (long m, s, ns;;) { //将state超过 RFULL=126的值放到readerOverflow字段中 if ((m = (s = state) & ABITS) < RFULL ? U.compareAndSwapLong(this, STATE, s, ns = s + RUNIT) : (m < WBIT && (ns = tryIncReaderOverflow(s)) != 0L)) //获取锁成功返回 return ns; //state高8位大于0,那么说明当前锁已经被写锁独占,那么我们尝试自旋 + 随机的方式来探测状态 else if (m >= WBIT) { if (spins > 0) { if (LockSupport.nextSecondarySeed() >= 0) --spins; } else { if (spins == 0) { WNode nh = whead, np = wtail; //一直获取锁失败,或者有线程入队列了退出内循环自旋,后续进入队列 if ((nh == h && np == p) || (h = nh) != (p = np)) break; } //自旋 SPINS 次 spins = SPINS; } } } } if (p == null) { //初始队列 WNode hd = new WNode(WMODE, null); if (U.compareAndSwapObject(this, WHEAD, null, hd)) wtail = hd; } //当前节点为空则构建当前节点,模式为RMODE,前驱节点为p即尾节点。 else if (node == null) node = new WNode(RMODE, p); //当前队列为空即只有一个节点(whead=wtail)或者当前尾节点的模式不是RMODE,那么我们会尝试在尾节点后面添加该节点作为尾节点,然后跳出外层循环 else if (h == p || p.mode != RMODE) { if (node.prev != p) node.prev = p; else if (U.compareAndSwapObject(this, WTAIL, p, node)) { p.next = node; //入队列成功,退出自旋 break; } } //队列不为空并且是RMODE模式, 添加该节点到尾节点的cowait链(实际上构成一个读线程stack)中 else if (!U.compareAndSwapObject(p, WCOWAIT, node.cowait = p.cowait, node)) //失败处理 node.cowait = null; else { //通过CAS方法将该节点node添加至尾节点的cowait链中,node成为cowait中的顶元素,cowait构成了一个LIFO队列。 //循环 for (;;) { WNode pp, c; Thread w; //尝试unpark头元素(whead)的cowait中的第一个元素,假如是读锁会通过循环释放cowait链 if ((h = whead) != null && (c = h.cowait) != null && U.compareAndSwapObject(h, WCOWAIT, c, c.cowait) && (w = c.thread) != null) U.unpark(w); //node所在的根节点p的前驱就是whead或者p已经是whead或者p的前驱为null if (h == (pp = p.prev) || h == p || pp == null) { long m, s, ns; do { //根据state再次积极的尝试获取锁 if ((m = (s = state) & ABITS) < RFULL ? U.compareAndSwapLong(this, STATE, s, ns = s + RUNIT) : (m < WBIT && (ns = tryIncReaderOverflow(s)) != 0L)) return ns; } while (m < WBIT);//条件为读模式 } if (whead == h && p.prev == pp) { long time; if (pp == null || h == p || p.status > 0) { //这样做的原因是被其他线程闯入夺取了锁,或者p已经被取消 node = null; // throw away break; } if (deadline == 0L) time = 0L; else if ((time = deadline - System.nanoTime()) <= 0L) return cancelWaiter(node, p, false); Thread wt = Thread.currentThread(); U.putObject(wt, PARKBLOCKER, this); node.thread = wt; if ((h != pp || (state & ABITS) == WBIT) && whead == h && p.prev == pp) U.park(false, time); node.thread = null; U.putObject(wt, PARKBLOCKER, null); //出现的中断情况下取消当前节点的cancelWaiter操作 if (interruptible && Thread.interrupted()) return cancelWaiter(node, p, true); } } } } for (int spins = -1;;) { WNode h, np, pp; int ps; if ((h = whead) == p) { if (spins < 0) spins = HEAD_SPINS; else if (spins < MAX_HEAD_SPINS) spins <<= 1; for (int k = spins;;) { // spin at head long m, s, ns; if ((m = (s = state) & ABITS) < RFULL ? U.compareAndSwapLong(this, STATE, s, ns = s + RUNIT) : (m < WBIT && (ns = tryIncReaderOverflow(s)) != 0L)) { WNode c; Thread w; whead = node; node.prev = null; while ((c = node.cowait) != null) { if (U.compareAndSwapObject(node, WCOWAIT, c, c.cowait) && (w = c.thread) != null) U.unpark(w); } return ns; } else if (m >= WBIT && LockSupport.nextSecondarySeed() >= 0 && --k <= 0) break; } } else if (h != null) { WNode c; Thread w; while ((c = h.cowait) != null) { if (U.compareAndSwapObject(h, WCOWAIT, c, c.cowait) && (w = c.thread) != null) U.unpark(w); } } if (whead == h) { if ((np = node.prev) != p) { if (np != null) (p = np).next = node; // stale } else if ((ps = p.status) == 0) U.compareAndSwapInt(p, WSTATUS, 0, WAITING); else if (ps == CANCELLED) { if ((pp = p.prev) != null) { node.prev = pp; pp.next = node; } } else { long time; if (deadline == 0L) time = 0L; else if ((time = deadline - System.nanoTime()) <= 0L) return cancelWaiter(node, node, false); Thread wt = Thread.currentThread(); U.putObject(wt, PARKBLOCKER, this); node.thread = wt; if (p.status < 0 && (p != h || (state & ABITS) == WBIT) && whead == h && node.prev == p) U.park(false, time); node.thread = null; U.putObject(wt, PARKBLOCKER, null); if (interruptible && Thread.interrupted()) return cancelWaiter(node, node, true); } } } }
并且StampedLock还提供了非阻塞tryReadLock方法,源码如下:
/** * 可以立即获得锁,则获取读锁,否则返回0 */ public long tryReadLock() { for (;;) { long s, m, next; //持有写锁返回0 if ((m = (s = state) & ABITS) == WBIT) return 0L; //读线程数 < RFULL,CAS变更状态 else if (m < RFULL) { if (U.compareAndSwapLong(this, STATE, s, next = s + RUNIT)) return next; } //将state超过 RFULL的值放到readerOverflow字段 else if ((next以上是关于Java并发编程之StampedLock锁源码探究的主要内容,如果未能解决你的问题,请参考以下文章
Java Review - 并发编程_StampedLock锁探究
JUC并发编程 共享模式之工具 JUC 读写锁 StampedLock -- 介绍 & 使用