AQS实现共享锁及CountDownLatch源码解析
Posted Pimow博客
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了AQS实现共享锁及CountDownLatch源码解析相关的知识,希望对你有一定的参考价值。
内容目录
AQS实现共享锁释放共享锁ConuntDownLatch简介CountDownLatch源码解析内部类syncCountDownLatch()初始化await()方法countDown()方法总结阅读更多文章
AQS实现共享锁
1:共享锁shared是一个乐观锁。可以允许多个线程阻塞点,可以多个线程同时获取到锁。
它允许一个资源可以被多个读操作,或者被一个写操作访问,但是两个操作不能同时访问。
/**
* 获取共享锁
*/
public final void acquireShared(int arg) {
//tryReleaseShared()返回值标志了是否允许其他线程继续进入获取失败;返回大于等于允许,否则不允许
if (tryAcquireShared(arg) < 0)
//获取锁失败调用该方法
doAcquireShared(arg);
}
private void doAcquireShared(int arg) {
//把当前节点添加对clh队列中,addWaiter()讲aqs实现独占锁已经详细分析,此处讲解大概
//1:快速添加,判断tail节点是否为空,不为空直接添加
//2:为空调用enq()方法,初始化head和tail
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
//获取当前节点的prev节点
final Node p = node.predecessor();
//判断是否是head头结点
if (p == head) {
//返回负数表示获取失败;返回大于等于0表示成功
int r = tryAcquireShared(arg);
// 一旦共享获取成功,设置新的头结点,并且唤醒后继线程
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
//shouldParkAfterFailedAcquire()之前也讲过,此处讲解大概
//1:获取前驱节点状态,等于-1返回true,大于0循环删除掉取消的节点,
//执行到这里代表节点是0或者PROPAGATE,CAS修改为SIGNAL,但是还不能park挂起线程。需要重试是否能获取,如果不能则挂起。
//重试的目的是有可能代码执行到这里的时候上个线程unpark啦。
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
/**
* 这个函数做的事情有两件:
* 1. 设置当前节点为head节点
* 2. 判断是否要唤醒后继节点
*/
private void setHeadAndPropagate(Node node, int propagate) {
// head节点赋值
Node h = head;
//把当前节点设置为head
setHead(node);
/*
* propagate是tryAcquireShared()的返回值,是传播唤醒的依据之一。
* h.waitStatus为SIGNAL或者PROPAGATE时也根据node的下一个节点共享来决定是否传播唤醒,此处不涉及condition只需考虑waitstatus小于0
* h == null 不知道是什么情况出现的,如果有知道的大佬希望可以指点
* 这里为什么不能只用propagate > 0来决定是否可以传播? 网上查阅资料说是为了解决信号量的bug
*/
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
/*
* s 如果是共享类型节点,则应该唤醒该节点
* s=null,并不能说明s是尾节点 为什么会为空呢?由于cpu的切换,可能会导导致tail节点跟前面的节连接不上。enq()方法已经讲了原理
*/
if (s == null || s.isShared())
doReleaseShared();
}
}
/**
* 唤醒下一个线程或者设置传播状态。
* 后继线程被唤醒后,会尝试获取共享锁,如果成功之后,则又会调用setHeadAndPropagate,将唤醒传播下去。
* 这个函数的作用是保障在acquire和release存在竞争的情况下,保证队列中处于等待状态的节点能被唤醒。
*/
private void doReleaseShared() {
for (;;) {
Node h = head;
// 如果队列中存在尾节点,并且head节点不为空
if (h != null && h != tail) {
//获取h节点状态
int ws = h.waitStatus;
//如果等于SIGNAL,则把这个节点状态改为0,并且唤醒这个节点
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue;
unparkSuccessor(h);
}
// 如果h节点的状态为0,需要设置为PROPAGATE。
//这样的话获取锁的线程在执行setHeadAndPropagate时可以读到PROPAGATE,从而由获取锁的线程去释放后继等待线程。
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue;
}
// 检查h是否仍然是head,如果不是的话需要再进行循环。
if (h == head)
break;
}
}
释放共享锁
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
//实现共享锁的时候已经介绍改方法
doReleaseShared();
return true;
}
return false;
}
ConuntDownLatch简介
for example:大头儿子和小头爸爸各自出去玩,出门前,妈妈说,你们两个要一起回家,如果大头儿子没回来,那小头爸爸就只能在家门口等着,我不开门。你们必须要一起敲门回来我再开门。这个例子说明CountDownLatch有一个计数器,当计数器不为零时所有线程一直阻塞。当计数器为零时,则所有等待的线程就全部唤醒开始工作。
countDownLatch类中只有下面几个方法,实现蛮简单。其中有一个内部类sync继承AbstractQueuedSynchronizer
CountDownLatch源码解析
内部类sync
/**
* 使用AQS的state代表count的值
*/
private static final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 4982264981922014374L;
//初始化state值,调用aqs的setState()设置值
Sync(int count) {
setState(count);
}
//获取state值
int getCount() {
return getState();
}
//获取共享锁
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
//释放共享锁
protected boolean tryReleaseShared(int releases) {
// 递减state,如果state数为0返回false
for (;;) {
int c = getState();
if (c == 0)
return false;
//state减1,通过cas修改state值
int nextc = c-1;
if (compareAndSetState(c, nextc))
//如果state=1返回true,返回true会调用aqs类doReleaseShared()方法
return nextc == 0;
}
}
}
CountDownLatch()初始化
public CountDownLatch(int count) {
if (count < 0)
throw new IllegalArgumentException("count < 0");
//构造函数内完成了sync的初始化,并设置state值
this.sync = new Sync(count);
}
await()方法
1:判断初始化的state是否等于0,等于直接往后执行。否则执行doAcquireSharedInterruptibly,会阻塞
public void await() throws InterruptedException {
//调用了aqs的acquireSharedInterruptibly()方法
sync.acquireSharedInterruptibly(1);
}
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
//判断线程是否终端,终端抛出异常
if (Thread.interrupted())
throw new InterruptedException();
//tryAcquireShared()方法判断state是否等于0,0返回1,否则-1
if (tryAcquireShared(arg) < 0)
//如果返回-1说明还有线程没有执行countDown()
doAcquireSharedInterruptibly(arg);
}
/**
* 实现共享锁可中断
*/
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
//加入一个节点到CLH队列,这个加入节点的过程在之前的aqs文章已讲解
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
//获取当前节点的prev节点
final Node p = node.predecessor();
//判断节点是否是头节点
if (p == head) {
//判断state是否等于0,0返回1,否则-1
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
//此处逻辑跟之前AQS实现独占锁逻辑一样
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
countDown()方法
1:state的值-1,
public void countDown() {
//调用aqs的releaseShared()方法,然后调用sync类中tryReleaseShared()并且返回true,说明要唤醒阻塞的线程,调用 doReleaseShared();
sync.releaseShared(1);
}
总结
一:AQS实现共享锁
1:获取共享同步状态
2:若获取失败,生成节点,进入队列
3:如果前驱为头结点,再次获取共享同步状态
4:获取成功则将自己设为头结点,如果后继节点是共享类型的,则唤醒
5:将节点状态设为 SIGNAL,失败则unpark。如果节点状态为0,修改为PROPAGATE
二:countDountLatch()
1:初始化state
2:调用一次countDown(),然后调用sync类中tryReleaseShared()并且返回true,说明要唤醒阻塞的线程,调用doReleaseShared();返回false说明不需阻塞。
3:await()判断初始化的state是否等于0,等于直接往后执行。否则执行doAcquireSharedInterruptibly。继续判断是否是头结点,并且再一次判断state的值。等于0设置为setHeadAndPropagate。否则阻塞。
阅读更多文章
以上是关于AQS实现共享锁及CountDownLatch源码解析的主要内容,如果未能解决你的问题,请参考以下文章