StampedLock
Posted paulwang92115
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了StampedLock相关的知识,希望对你有一定的参考价值。
简介
StampedLock 类,是 JDK 1.8 时引入,是对读写锁 ReentrantReadWriteLock 的增强,该类提供了一些功能。优化了读锁,写锁的访问。同时使得读锁和写锁之间可以相互转换,更细力度地控制并发。
该类的设计初衷是作为一个内部工具类,用于辅助线程安全组件开发,用的好可以提升系统性能,用不好,容易产生死锁和其他莫名其妙的问题。
原因
既然已经有了 ReentrantReadWriteLock,为什么还要引入 StampedLock?
ReentrantReadWriteLock 使得多个线程同时持有读锁,而写锁是独占的,读写互斥。如果使用不当很容易出现饥饿问题。
比如读线程很多,写线程很少的情况下,就容易使读线程饥饿,虽然公平策略可以一定程度上缓解这个问题,但是公平策略是以牺牲吞吐量为代价的。
特点
- 所有获取锁的方法,都返回一个邮戳(Stamp),Stamp 为 0 表示失败,其余表示成功。
- 所有释放锁的方法,都需要一个邮戳(Stamp),这个 Stamp 必须是和成功获取锁时得到的 Stamp 一致。
- StampedLock 是可重入的,如果一个线程已经持有了写锁,再去获取写锁的话就会造成死锁。
- StampedLock 有三种访问模式:
- Reading(读模式):功能和 ReentrantReadWriteLock 的读锁类似。
- Writing(写模式):功能和 ReentrantReadWriteLock 的写锁类似。
- Optimistic reading(乐观读):这是一种优化的读模式。
- 无论写锁还是读锁,都不支持 Condition 等待。
ReentrantReadWriteLock 中,当读锁被使用时,如果有线程尝试获取写锁,该写线程会被阻塞。但是在 Optimistic reading 中,即使读线程获取到了读锁,写线程尝试获取写锁也不会阻塞。这相当于对读模式的优化,但是可能会导致数据不一致问题,所以使用 Optimistic reading 获取到读锁时,必须对获取到的结果进行校验。
例子
来看看 oracle 官方的例子。
class Point {
private double x, y;
private final StampedLock sl = new StampedLock();
// 使用写锁的例子
void move(double deltaX, double deltaY) {
long stamp = sl.writeLock(); //涉及对共享资源的修改,使用写锁-独占操作
try {
x += deltaX;
y += deltaY;
} finally {
sl.unlockWrite(stamp);
}
}
/**
* 使用乐观读锁访问共享资源
* 注意:乐观读锁在保证数据一致性上需要拷贝一份要操作的变量到方法栈,并且在操作数据时候可能其他写线程已经修改了数据,
* 而我们操作的是方法栈里面的数据,也就是一个快照,所以最多返回的不是最新的数据,但是一致性还是得到保障的。
*
* @return
*/
double distanceFromOrigin() {
long stamp = sl.tryOptimisticRead(); // 使用乐观读锁
double currentX = x, currentY = y; // 拷贝共享资源到本地方法栈中
if (!sl.validate(stamp)) { // 如果有写锁被占用,可能造成数据不一致,所以要切换到普通读锁模式
stamp = sl.readLock();
try {
currentX = x;
currentY = y;
} finally {
sl.unlockRead(stamp);
}
}
return Math.sqrt(currentX * currentX + currentY * currentY);
}
//某些情况下,将读锁转换为写锁
void moveIfAtOrigin(double newX, double newY) { // upgrade
// Could instead start with optimistic, not read mode
long stamp = sl.readLock();
try {
while (x == 0.0 && y == 0.0) {
long ws = sl.tryConvertToWriteLock(stamp); //读锁转换为写锁
if (ws != 0L) {
stamp = ws;
x = newX;
y = newY;
break;
} else {
sl.unlockRead(stamp);
stamp = sl.writeLock();
}
}
} finally {
sl.unlock(stamp);
}
}
}
上面的方法中使用了 “distanceFromOrigin” 方法,这个方法中使用了 Optimistic reading 乐观读锁,使得读可以并发执行。但是使用必须遵循以下模式:
long stamp = lock.tryOptimisticRead(); // 非阻塞获取版本信息
copyVaraibale2ThreadMemory(); // 拷贝变量到线程本地堆栈
if(!lock.validate(stamp)){ // 校验
long stamp = lock.readLock(); // 获取读锁
try {
copyVaraibale2ThreadMemory(); // 拷贝变量到线程本地堆栈
} finally {
lock.unlock(stamp); // 释放悲观锁
}
}
useThreadMemoryVarables(); // 使用线程本地堆栈里面的数据进行操作
源码分析
StampedLock 虽然不想其他锁一样定义了内部类来实现 AQS 框架,但是 StampedLock 的基本思路还是利用 CLH 队列进行线程的管理,通过同步状态值来表示锁的状态和类型。
StampedLock 内部定义了很多常量,定义这些常量的根本目的还是和 ReentrantReadWriteLock 一样,对同步状态值按位切分,以通过位运算对 State 进行操作:
对于 StampedLock 来说,写锁被占用的标志是第八位为 1,读锁使用 0-7 位,正常情况下读锁数目为 1-126,当超过 126 时,使用一个名为 readOverflow 的 int 整型保存超出数。
另外,StampedLock 相比于 ReentrantReadWriteLock,对多核 CPU 进行了优化,可以看到,当 CPU 核数超过 1 时,会有一些自旋操作:
构造方法
public StampedLock() {
state = ORIGIN;
}
构造器很简单,构造时设置同步状态的初始值。
另外,StampedLock 还提供了三类试图:
// 视图
transient ReadLockView readLockView;
transient WriteLockView writeLockView;
transient ReadWriteLockView readWriteLockView;
这些视图是对 StampedLock 方法的封装,便于习惯了 ReentrantReadWriteLock 的用户使用。ReadLockView 相当于 ReentrantReadWriteLock.readLock() 返回的读锁。
final class ReadLockView implements Lock {
public void lock() { readLock(); }
public void lockInterruptibly() throws InterruptedException {
readLockInterruptibly();
}
public boolean tryLock() { return tryReadLock() != 0L; }
public boolean tryLock(long time, TimeUnit unit)
throws InterruptedException {
return tryReadLock(time, unit) != 0L;
}
public void unlock() { unstampedUnlockRead(); }
public Condition newCondition() {
throw new UnsupportedOperationException();
}
}
A 线程调用 writeLock 获取写锁
/**
* 获取写锁,如果获取失败,则进入阻塞
* 该方法不响应中断
*
* @返回非 0 即表示成功
*/
public long writeLock() {
long s, next; // bypass acquireWrite in fully unlocked case only
return ((((s = state) & ABITS) == 0L && //(s = state) & ABITS == 0 表示读锁写锁都未被使用
U.compareAndSwapLong(this, STATE, s, next = s + WBIT)) ? //通过 CAS 更改第 8 位为 1,表示写锁被占用
next : acquireWrite(false, 0L)); //获取失败则加入等待对列
}
(s = state) & ABITS == 0L
表示读锁和写锁都未被使用,这里写锁可以立即获取成功,然后CAS 操作更新同步状态值State。- 否则加入等待队列。
A 线程获得写锁之后,等待队列的结构如下:
头节点和尾节点都指向为 null。
/** Wait nodes */
static final class WNode {
volatile WNode prev;
volatile WNode next;
volatile WNode cowait; // list of linked readers
volatile Thread thread; // non-null while possibly parked
volatile int status; // 0, WAITING, or CANCELLED
final int mode; // RMODE or WMODE
WNode(int m, WNode p) { mode = m; prev = p; }
}
/** Head of CLH queue */
private transient volatile WNode whead;
/** Tail (last) of CLH queue */
private transient volatile WNode wtail;
B 线程调用 readLock 获取读锁:
/**
* 获取读锁,如果写锁被占用,线程会被阻塞
* 该方法不响应中断
*
* @返回非 0 表示获取成功
*/
public long readLock() {
long s = state, next; // bypass acquireRead on common uncontended case
return ((whead == wtail && (s & ABITS) < RFULL && //(s & ABITS) < RFULL 表示写锁违背占用,且读锁数量没有超出限制
U.compareAndSwapLong(this, STATE, s, next = s + RUNIT)) ?
next : acquireRead(false, 0L));
}
由于此时 A 线程持有写锁,所以 B 获取读锁失败,将调用 acquireRead 方法,加入等待队列。
总结
StampedLock 的等待队列于 ReentrantReadWriteLock 的 CLH 相比,有以下特点:
- 当入队一个线程时,如果队尾时读节点,不会直接链接到队尾,而是连接到读节点的 cowait 链中,cowait 链本质是一个栈。
- 当入队一个线程时,如果队尾是写节点,则直接链接到队尾。
- 唤醒线程的规则和 AQS 类似,首先是唤醒队首节点。区别是,如果唤醒的节点如果是读节点,会唤醒该读节点 cowait 链中所有读节点。
以上是关于StampedLock的主要内容,如果未能解决你的问题,请参考以下文章