Java并发源码解析
Posted 0zcm01ql
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Java并发源码解析相关的知识,希望对你有一定的参考价值。
Semaphore前情提要:在学习本章前,需要先了解笔者先前讲解过的ReentrantLock源码解析,ReentrantLock源码解析里介绍的方法有很多是本章的铺垫。下面,我们进入本章正题Semaphore。
从概念上来讲,信号量(Semaphore)会维护一组许可证用于限制线程对资源的访问,当我们有一资源允许线程并发访问,但我们希望能限制访问量,就可以用信号量对访问线程进行限制。当线程要访问资源时,要先调用信号量的acquire方法获取访问许可证,当线程访问完毕后,调用信号量的release归还许可证。使用信号量我们可以服务做限流,尤其像淘宝天猫这样平时访问量就很大的电商大户,在双十一的时候更要评估其服务能承受的访问量并对其做限流,避免因为访问量过大导致服务宕机。然而,Semaphore内部实际上并没有维护一组许可证对象,而是维护一个数字作为许可证数量,如果线程要获取许可证,则会根据线程请求的许可证数量扣减内部的维护的数量,如果足够扣除则线程获取许可证成功,否则线程必须陷入阻塞,直到信号量内部的许可证数量足够。
我们来看下面的代码,假设OrderService是一个远程服务,我们预估这个服务能承受的并发量是5000,访问一次远程服务需要获取一个许可证,执行methodA()的业务只需要请求一次远程服务,所以
public class OrderService {
private Semaphore semaphore = new Semaphore(5000);
public void methodA() {
try {
semaphore.acquire();
//methodA body
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
semaphore.release();
}
}
public void methodB() {
try {
semaphore.acquire(2);
//methodB body
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
semaphore.release(2);
}
}
}
如果是许可证为1的信号量可以把它当做互斥锁,这时信号量只有两个状态:0或者1,我们把1代表锁未被占用,0代表锁被占用。如果是用这种方式将信号量当做互斥锁我们可以用一个线程来获取锁,而另一个线程来释放锁,比如下面的<1>处和<2>处分别在不同的线程加锁和释放锁。某种程度上来说这一做法可以避免死锁,与传统java.util.concurrent.locks.Lock的实现会有很大的不同,传统的Lock实现,比如:ReentrantLock会要求解锁的线程必须要是原先加锁的线程,否则会抛出异常。
public static void main(String[] args) {
Semaphore semaphore = new Semaphore(1);
new Thread(() -> {
try {
semaphore.acquire();//<1>
System.out.println(Thread.currentThread().getName() + "获取独占锁");
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "线程1").start();
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
new Thread(() -> {
semaphore.release();//<2>
System.out.println(Thread.currentThread().getName() + "释放独占锁");
}, "线程2").start();
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
当信号量的许可证数量为0时,如果还有线程请求获取许可证,信号量会将线程放入一个队列,然后挂起线程,直到有许可证被归还,信号量会尝试唤醒队列中等待许可证最长时间的线程。所以信号量就分为公平(FairSync)和非公平(NonfairSync)两种模式。在公平模式下,如果有线程要获取信号量的许可证时,会先判断信号量维护的等待队列中是否已经有线程,如果有的话则乖乖入队,没有才尝试请求许可证;而非公平模式则是直接请求许可证,不管队列中是否已有线程在等待信号量的许可证。
而下面的代码也印证了笔者之前所说的,信号量本身并不会去维护一个许可证对象的集合,当我们把许可证数量传给信号量的构造函数时,最终会由静态内部类Sync调用其父类AQS的setState(permits)方法将许可证赋值给AQS内部的字段state,由这个字段决定信号量有多少个许可证,请求许可证的线程能否成功。
public class Semaphore implements java.io.Serializable {
private final Sync sync;
abstract static class Sync extends AbstractQueuedSynchronizer {
//...
Sync(int permits) {
setState(permits);
}
//...
}
static final class NonfairSync extends Sync {//非公平
NonfairSync(int permits) {
super(permits);
}
//...
}
static final class FairSync extends Sync {//公平
//...
FairSync(int permits) {
super(permits);
}
//...
}
public Semaphore(int permits) {
sync = new NonfairSync(permits);
}
public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
//...
}
从上面节选的代码来看,官方更推荐使用非公平的信号量,因为根据许可证数量创建信号量默认使用的非公平信号量,而相比于公平信号量,非公平信号量有更高的吞吐量。因此笔者先介绍非公平信号量,再介绍公平信号量。
我们先来看看acquire()和acquire(int permits) 这两个方法,可以看到不管我们是请求一个许可证,还是请求多个许可证,本质上都是调用Sync.
acquireSharedInterruptibly(int arg)方法。如果大家观察静态内部类Sync的代码可以发现:Sync并没有实现acquireSharedInterruptibly(int arg)方法,而是其父类AQS实现了此方法。
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {
//...
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
//...
protected int tryAcquireShared(int arg) {
throw new UnsupportedOperationException();
}
//...
}
所以我们来看看非公平锁实现的tryAcquireShared(int arg)方法,在非公平锁的ryAcquireShared(int arg)方法中会调用到Sync类实现的nonfairTryAcquireShared(int acquires)方法,这个方法会先获取当前信号量剩余的许可证数量available,然后减去请求的数量(available - acquires)得到剩余许可证数量remaining,如果remaining大于0代表信号量现有的许可证数量是允许分配调用线程请求的许可证数量,是允许分配的,所以<1>处的条件为false,会进行<2>处的CAS扣减,如果能扣减成功,则返回剩余许可证数量,返回的remaining如果大于等于0,则代表扣减成功,如果小于0代表请求失败,表示信号量现有的许可证数量不足调用线程所需。
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {
//...
abstract static class Sync extends AbstractQueuedSynchronizer {
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||//<1>
compareAndSetState(available, remaining))//<2>
return remaining;
}
}
}
//...
static final class NonfairSync extends Sync {
private static final long serialVersionUID = -2694183684443567898L;
NonfairSync(int permits) {
super(permits);
}
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
}
//...
}
如果在<1>处执行tryAcquireShared(arg)尝试获取许可证失败,则会调用<2>处的方法将当前线程挂起。
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)//<1>
doAcquireSharedInterruptibly(arg);//<2>
}
那么我们来看看如果调用tryAcquireShared(arg)请求许可证失败后,doAcquireSharedInterruptibly(int arg)里面完成的逻辑。如果有看过笔者前一章ReentrantLock源码解析的朋友在看到这个方法应该会觉非常熟悉,这里会先调用<1>处的addWaiter(Node mode)方法将当前请求许可证的线程封装成一个Node节点并入队,这里我们也首次看到使用Node.SHARED的地方,如果一个节点Node的nextWaiter指向的是静态常量Node.SHARED,则代表这个节点是一个共享节点,换句话说这个节点的线程可以和其他同为共享节点的线程共享资源。
当线程作为节点入队后,判断节点的前驱节点是否是头节点,如果是头节点则话则进入<2>处的分支,这里会再次调用tryAcquireShared(arg)请求许可证,之前说过如果tryAcquireShared(arg)返回的结果大于等于0代表请求许可证成功,否则请求失败。如果请求失败的话,之后的流程大家想必都清楚了,会先执行shouldParkAfterFailedAcquire(p, node)判断前驱节点p的等待状态是否为SIGNAL(-1),如果为SIGNAL则直接返回true,调用parkAndCheckInterrupt()阻塞当前线程,如果前驱节点p的等待状态为0,会先用CAS的方式修改为SIGNAL,然后再下一次循环中阻塞当前线程。
以上是关于Java并发源码解析的主要内容,如果未能解决你的问题,请参考以下文章