Semaphore 使用及源码分析
Posted 小猪快跑22
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Semaphore 使用及源码分析相关的知识,希望对你有一定的参考价值。
前言:
之前的博文也做了一些关于AQS相关的线程同步的分析,这篇分析下 Semaphore 的使用和原理。
一、Semaphore 的用法
Semaphore就类似于许可证的概念,获取到许可就可以执行,不然就得等待。
举个例子:一个超小型停车场只有3个车位,那么前3名来停车是可以获取到许可的,即可以停车;后面来的车辆必须要等前面的3辆车之一的出去,即释放许可。
Semaphore semaphore = new Semaphore(2);
semaphore.acquire()
默认的表示申请一个许可证,还有带参数的,表示申请多个许可证 public void acquire(int permits)
;
release()
: 表示释放一个许可证;同样的 public void release(int permits)
表示释放多个许可证。
写个例子,例子如下:
Semaphore semaphore = new Semaphore(2);
new Thread("A") {
@Override
public void run() {
try {
semaphore.acquire();
System.out.println("thread "+Thread.currentThread().getName() + " >>> start , time = "+System.currentTimeMillis());
Thread.sleep(2_000);
semaphore.release(1);
System.out.println("thread "+Thread.currentThread().getName() + " >>> end !!! , time = "+System.currentTimeMillis())
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}.start();
new Thread("B") {
@Override
public void run() {
try {
semaphore.acquire();
System.out.println("thread "+Thread.currentThread().getName() + " >>> start , time = "+System.currentTimeMillis());
Thread.sleep(2_000);
semaphore.release();
System.out.println("thread "+Thread.currentThread().getName() + " >>> end !!!, time = "+System.currentTimeMillis());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}.start();
new Thread("C") {
@Override
public void run() {
try {
semaphore.acquire();
System.out.println("thread "+Thread.currentThread().getName() + " >>> start , time = "+System.currentTimeMillis());
Thread.sleep(2_000);
semaphore.release();
System.out.println("thread "+Thread.currentThread().getName() + " >>> end !!!, time = "+System.currentTimeMillis());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}.start();
例子中 Semaphore 的许可证为2,表示最多同时允许2个线程获取许可。然后线程 A 、B、C分别去获取许可,可以发现,线程C 只有等线程A或线程B释放了许可之后才可以获取许可继续执行。
结果如下:
thread A >>> start , time = 1631950075265
thread B >>> start , time = 1631950075266
thread A >>> end !!! , time = 1631950077270
thread C >>> start , time = 1631950077270
thread B >>> end !!!, time = 1631950077270
thread C >>> end !!!, time = 1631950079275
二、aquire() 方法解析
Semaphore 的内部类 Sync 继承了AQS: abstract static class Sync extends AbstractQueuedSynchronizer
;
非公平锁的实现类 NonfairSync继承了Sync,如下:
static final class NonfairSync extends Sync {
private static final long serialVersionUID = -2694183684443567898L;
NonfairSync(int permits) {
super(permits);
}
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
}
前面说过,Semaphore分为公平锁和非公平锁,这里主要说非公平锁。
aquire
方法如下:
public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
acquireSharedInterruptibly
调用的是AQS的方法:
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
// 线程是否被中断,是则抛出异常
if (Thread.interrupted())
throw new InterruptedException();
// 尝试获取锁,小于0表示获取失败,
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
tryAcquireShared
调用的是 Semaphore 内部类 NonfairSync
的方法,如下:
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
nonfairTryAcquireShared
是 NonfairSync 父类 Sync 的方法:
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
// 获取当前剩余许可
int available = getState();
// 当前剩余许可减去你申请的许可
int remaining = available - acquires;
// 小于0 或者 cas 操作设置 state值失败则返回
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
如果 tryAcquireShared
返回值小于0,就执行 doAcquireSharedInterruptibly
,如下:
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
// 把当前结点对应的线程插入到等待队列的尾部,且标记改结点是共享的
final Node node = addWaiter(Node.SHARED); // 注释【1】
boolean failed = true;
try {
for (;;) { // 自旋
// 获取 node 的前驱结点
final Node p = node.predecessor(); // 注释【2】
// 如果前驱结点是头结点 head
if (p == head) { // 注释【3】
// 尝试去获取许可证
int r = tryAcquireShared(arg); // 注释【4】
// r>=0表示获取到证书,那么设置当前结点为头结点,并且尝试唤醒 node的后继结点
if (r >= 0) {
setHeadAndPropagate(node, r); // 注释【5】
p.next = null; // help GC
failed = false;
return;
}
}
// 主要是判断是否需要挂起线程
if (shouldParkAfterFailedAcquire(p, node) // 注释【6】
&& parkAndCheckInterrupt()) // 挂起当前线程 // 注释【7】
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
上面的注释大家可能看的比较懵逼,下面举个例子来加以说明:
假设 Semaphore semaphore = new Semaphore(2)
,有4个线程A、B、C、D去获取许可,假设4个线程的都是执行耗时的任务。
由于开始有2个许可,那么线程A和B是可以拿到许可继续执行任务的,此时剩余许可等于0了,那么 线程C 再去获取许可会失败,会走到上面的方法 doAcquireSharedInterruptibly
中;
注释【1】:addWaiter(Node.SHARED)
的方法如下:
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail; // 注释【1】
if (pred != null) { // 注释【2】
node.prev = pred; // 注释【3】
if (compareAndSetTail(pred, node)) { // 注释【4】
pred.next = node;
return node;
}
}
enq(node); // 注释【5】
return node;
}
- new 一个结点:
Node(Thread thread, Node mode) {
this.nextWaiter = mode;
this.thread = thread;
}
设置 nextWaiter
为 Node.SHARED 结点,作用就是用来判断该结点是否是共享结点:
final boolean isShared() {
return nextWaiter == SHARED;
}
由于线程C是第一个申请许可失败的,所以 addWaiter
方法的注释【1】中,pred = tail 是等于 null 的,走到 注释【5】的 enq(node)
,如下:
private Node enq(final Node node) {
for (;;) {
Node t = tail;
// t = null 表示等待队列为空需要初始化
if (t == null) {
// new 一个空结点作为头结点 head
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
// 设置 node 为尾结点
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
执行完上述的操作如下图:
addWaiter
方法总结:操作其实很简单就是双链表的尾插法,主要有2点:
- 如果等待队列为空,即队列未初始化,那么需要新建一个空的结点,然后让head指向该空结点。
- 然后采用尾插法将当前线程对应的结点插入队列的尾部,注意,由于这里面是共享队列,所以
node
的nextWaiter = Node.SHARED
。
分析到这里,其实才分析了 doAcquireSharedInterruptibly
方法注释【1】,现在来看注释【2】:
final Node p = node.predecessor()
: p 指向 node的前驱结点,即 head 结点;由于 p 指向 head ,所以注释【3】成立;然后走到注释【4】再次尝试去获取许可证;
我们假设 线程A 和 线程B执行的是耗时任务,所以再次获取许可证还是会失败,会走到注释【6】的 shouldParkAfterFailedAcquire
方法,如下:
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
// 表示已经设置为SIGNAL,需要在释放的时候唤醒后继结点
return true;
if (ws > 0) {
/*
* 表示该结点pred已经被取消,那么跳过该结点,然后从该结点向前重试,
* 跳过已取消的结点,直到遇到未被取消的结点
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
// 设置当前线程结点的前驱结点的waitStatus为SIGNAL,
// 为SIGNAL则表示该结点的后继结点需要被唤醒
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
线程C 走到这里,由于线程C结点的前驱结点是 head,且 head 的 waitStatus = 0
,所以走到
compareAndSetWaitStatus(pred, ws, Node.SIGNAL)
即设置 head 的waitStatus = SIGNAL,然后返回false,接着继续 doAcquireSharedInterruptibly
方法的for循环;最终还是走到 shouldParkAfterFailedAcquire中,此时的 pred 对应的任然是 head,即 waitStatus 为 SIGNAL,直接返回true,返回ture 则会执行 doAcquireSharedInterruptibly 方法中的 注释【7】,执行parkAndCheckInterrupt
挂起当前线程,如下图:
后续的线程D 申请许可的时候,流程还是一样的,如下图:
上述就是获取许可证的流程,下面分析下释放许可的流程。
三、release 方法
semaphore.release() 调用的方法如下:
public void release() {
sync.releaseShared(1);
}
然后调用AQS类中的方法,如下:
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
我们假设此时线程A的任务执行完了,要释放许可了,首先执行tryReleaseShared
,,调用的是Semaphore 内部类Sync中的方法如下:
protected final boolean tryReleaseShared(int releases) {
for (;;) {
int current = getState(); // 获取当前许可证数量
// 当前许可证数量加上要释放的许可证数量,默认的加1
int next = current + releases;
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
// 通过cas更新 state 数量为next
if (compareAndSetState(current, next))
return true;
}
}
那么,线程A执行完 tryReleaseShared 方法后,state = 1。然后执行doReleaseShared
,如下:
private void doReleaseShared() {
for (;;) {
Node h = head;
// h 不等于 null且 不等于 tail
if (h != null && h != tail) {
int ws = h.waitStatus;
// ws = SIGNAL,表示后继结点需要唤醒
if (ws == Node.SIGNAL) {
// 设置 头结点 head 的waitStatus = 0,
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h); // 唤醒head结点的后继结点对应的线程
}
// ws == 0,那么就设置waitStatus= PROPAGATE
// 即表示要唤醒head后继结点的后继结点
// 这种情况可能发生的情况就是线程A释放许可刚通过cas操作
// compareAndSetWaitStatus(h, Node.SIGNAL, 0)设置waitStatus=0
// 此时,线程B也恰好释放许可,发现waitStatus=0,那么设置waitStatus=PROPAGATE
// waitStatus=PROPAGATE就表示要唤醒head后继结点的后继结点
// 想想也是这个道理,A和B都释放许可了,肯定要唤醒后继结点的后继结点啊,
// 不然已经有2张许可却只有C拿到许可,D还在傻乎乎的等C释放许可来唤醒D
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
unparkSuccessor
就是唤醒head后继结点对应的线程,如果该结点被取消了,那么从tail结点开始往前找,找到未被取消结点对应的线程来唤醒。
线程唤醒了,那么被唤醒的线程C要接着执行 doAcquireSharedInterruptibly
中的for循环,部分代码如下:
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, nod
parkAndCheckInterrupt())
throw new InterruptedException();
}
线程C继续for循环,由于线程A已经释放了许可,所以此时 r = 0,所以走到setHeadAndPropagate
,如下:
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
setHead(node); // 设置当前线程对应的结点为 head
// propagate > 0表示可以尝试唤醒 node 结点的后继结点
// (h = head) == null || h.waitStatus < 0) 可能会引起没必要的唤醒操作
// 比如线程A任务结束后释放许可,但是线程B任务还没结束,此时线程C获取到许可走到这里
// 执行完上面的 setHead 后,然后 h = head 即h执行线程C的结点,
// 而线程C对应的结点的 waitStatus = SIGNAL,所以也会执行doReleaseShared唤醒线程D
// 线程D唤醒后接着去执行 doAcquireSharedInterruptibly 中的for循环,
// 执行 tryAcquireShared 去拿许可证的时候发现是小于0,接着还是会挂起线程D
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared()) // 是否是共享的,就是用到了 NODE.SHARED来表示的
doReleaseShared(); // 唤醒 node结点的后继结点,这个上面分析过
}
}
上面setHeadAndPropagate
方法参数 propagate > 0的情况就是,线程A
和线程B
差不多同时释放了许可,那么需要唤醒下一个结点的线程。
总结:
- 获取许可的时,如果发现许可用完了,则会
new Node
且设置 nextWaiter = Node.SHARED,然后插入等待队列的尾部,如果等待队列未初始化,则 new 一个空节点作为head。 - 释放许可的时候,会唤醒head结点的后继结点,也可能会唤醒后继结点的后继结点,如上所分析的那样。
以上是关于Semaphore 使用及源码分析的主要内容,如果未能解决你的问题,请参考以下文章