Semaphore源码解析
Posted lomoye
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Semaphore源码解析相关的知识,希望对你有一定的参考价值。
应用场景
举个栗子,30个人上一个公共厕所,但是只有3个坑位
public class SemaphoreTest1 {
private Semaphore semaphore = new Semaphore(3);
//上厕所
public void wc() {
try {
semaphore.acquire();
System.out.println("我占个坑位");
Thread.sleep(5000L);
System.out.println("我好了");
} catch (Exception e) {
e.printStackTrace();
} finally {
semaphore.release();
}
}
public static void main(String[] args) {
SemaphoreTest1 test = new SemaphoreTest1();
List<Thread> threadList = new ArrayList<>();
for (int i = 0; i < 30; i++) {
Thread thread = new Thread(() -> {
test.wc();
});
threadList.add(thread);
}
for (Thread thread : threadList) {
thread.start();
}
}
}
semaphore.acquire
/**
* Acquires a permit from this semaphore, blocking until one is
* available, or the thread is {@linkplain Thread#interrupt interrupted}.
*(从信号量获取一个许可,阻塞直到一个可用或者线程被中断)
* <p>Acquires a permit, if one is available and returns immediately,
* reducing the number of available permits by one.
*(获取一个许可,如果一个可用将立刻返回,减少一个可用许可的数量)
* <p>If no permit is available then the current thread becomes
* disabled for thread scheduling purposes and lies dormant until
* one of two things happens:
*(如果没有许可可用,那么当前线程将不可用将休眠至两件事情之一发生)
* <ul>
* <li>Some other thread invokes the {@link #release} method for this
* semaphore and the current thread is next to be assigned a permit; or
* <li>Some other thread {@linkplain Thread#interrupt interrupts}
* the current thread.
*(一些其它线程调用release方法在这个信号量上,当前线程将去获取一个许可;或者一些其它线程中断当前线程)
* </ul>
*
* <p>If the current thread:
*(如果当前线程)
* <ul>
* <li>has its interrupted status set on entry to this method; or
* <li>is {@linkplain Thread#interrupt interrupted} while waiting
* for a permit,
*(当等待获取许可的过程中设置了中断状态,将会抛出InterruptedException异常并清除掉中断中断)
* </ul>
* then {@link InterruptedException} is thrown and the current thread‘s
* interrupted status is cleared.
*
* @throws InterruptedException if the current thread is interrupted
*(当前线程被中断将抛出InterruptedException异常)
*/
public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
sync#acquireSharedInterruptibly
/**
* Acquires in shared mode, aborting if interrupted. Implemented
* by first checking interrupt status, then invoking at least once
* {@link #tryAcquireShared}, returning on success. Otherwise the
* thread is queued, possibly repeatedly blocking and unblocking,
* invoking {@link #tryAcquireShared} until success or the thread
* is interrupted.
* @param arg the acquire argument.
* This value is conveyed to {@link #tryAcquireShared} but is
* otherwise uninterpreted and can represent anything
* you like.
* @throws InterruptedException if the current thread is interrupted
*/
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
//中断检测
if (Thread.interrupted())
throw new InterruptedException();
//①tryAcquireShared 尝试在共享模式下获取锁
if (tryAcquireShared(arg) < 0)
//②doAcquireSharedInterruptibly 如果没有获取到锁,则再次尝试
doAcquireSharedInterruptibly(arg);
}
①NonFair#tryAcquireShared 这里我们看Semaphore的非公平实现
/**
* NonFair version
*/
static final class NonfairSync extends Sync {
private static final long serialVersionUID = -2694183684443567898L;
//构造函数初始化permit数量
NonfairSync(int permits) {
super(permits);
}
protected int tryAcquireShared(int acquires) {
//调用父类Sync的方法
return nonfairTryAcquireShared(acquires);
}
}
Sync#nonfairTryAcquireShared
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
int available = getState();
int remaining = available - acquires;
//剩余permit数量如果小于0,或者置换state为remaining成功,则返回剩余的permit量
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
tryAcquireShared方法如果返回值小于0,说明permit已经被占用完了,需要调用doAcquireSharedInterruptibly
②AQS#doAcquireSharedInterruptibly
/**
* Acquires in shared interruptible mode.
* @param arg the acquire argument
*/
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
//同步队列加入共享模式的节点
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
//前继节点
final Node p = node.predecessor();
//如果前继节点是头节点,则尝试获取锁
if (p == head) {
int r = tryAcquireShared(arg);
//r > 0 说明获取到锁了
if (r >= 0) {
//①将头节点设置为当前节点
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
//检查是否要阻塞,如果将进入休眠状态,休眠结束后续检查当前线程有无被中断
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
①AQS#setHeadAndPropagate
/**
* Sets head of queue, and checks if successor may be waiting
* in shared mode, if so propagating if either propagate > 0 or
* PROPAGATE status was set.
*
* @param node the node
* @param propagate the return value from a tryAcquireShared
*/
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
setHead(node);
/*
* Try to signal next queued node if:
* Propagation was indicated by caller,
* or was recorded (as h.waitStatus either before
* or after setHead) by a previous operation
* (note: this uses sign-check of waitStatus because
* PROPAGATE status may transition to SIGNAL.)
* and
* The next node is waiting in shared mode,
* or we don‘t know, because it appears null
*
* The conservatism in both of these checks may cause
* unnecessary wake-ups, but only when there are multiple
* racing acquires/releases, so most need signals now or soon
* anyway.
*/
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared();
}
}
semaphore.release
/**
* Releases a permit, returning it to the semaphore.
*
* <p>Releases a permit, increasing the number of available permits by
* one. If any threads are trying to acquire a permit, then one is
* selected and given the permit that was just released. That thread
* is (re)enabled for thread scheduling purposes.
*
* <p>There is no requirement that a thread that releases a permit must
* have acquired that permit by calling {@link #acquire}.
* Correct usage of a semaphore is established by programming convention
* in the application.
*/
public void release() {
sync.releaseShared(1);
}
sync#.releaseShared
/**
* Releases in shared mode. Implemented by unblocking one or more
* threads if {@link #tryReleaseShared} returns true.
*
* @param arg the release argument. This value is conveyed to
* {@link #tryReleaseShared} but is otherwise uninterpreted
* and can represent anything you like.
* @return the value returned from {@link #tryReleaseShared}
*/
public final boolean releaseShared(int arg) {
//①cas自旋设置state加上arg
if (tryReleaseShared(arg)) {
//②
doReleaseShared();
return true;
}
return false;
}
① AQS#tryReleaseShared,具体实现为Semaphore#tryReleaseShared
protected final boolean tryReleaseShared(int releases) {
//cas自旋设置state
for (;;) {
//获取当前的state
int current = getState();
//将当前state值加上releases
int next = current + releases;
//检查信号量溢出
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
//state设置为新的值
if (compareAndSetState(current, next))
return true;
}
}
②AQS#doReleaseShared
/**
* Release action for shared mode -- signals successor and ensures
* propagation. (Note: For exclusive mode, release just amounts
* to calling unparkSuccessor of head if it needs signal.)
*/
private void doReleaseShared() {
/*
* Ensure that a release propagates, even if there are other
* in-progress acquires/releases. This proceeds in the usual
* way of trying to unparkSuccessor of head if it needs
* signal. But if it does not, status is set to PROPAGATE to
* ensure that upon release, propagation continues.
* Additionally, we must loop in case a new node is added
* while we are doing this. Also, unlike other uses of
* unparkSuccessor, we need to know if CAS to reset status
* fails, if so rechecking.
*/
for (;;) {
//h 头节点
Node h = head;
//头节点不为null并且不等于尾节点
if (h != null && h != tail) {
//头节点的等待状态
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
//将Node.SIGNAL状态转换成0
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
//唤醒节点的后继节点
unparkSuccessor(h);
}
else if (ws == 0 &&
//这里为什么要引入PROPAGATE状态比较复杂 可以参考 https://www.cnblogs.com/micrari/p/6937995.html
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
博文推荐
1.https://segmentfault.com/a/1190000016447307
2.https://www.cnblogs.com/micrari/p/6937995.html
以上是关于Semaphore源码解析的主要内容,如果未能解决你的问题,请参考以下文章