并发限流利器Semaphore
Posted 陈皮的JavaLib
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了并发限流利器Semaphore相关的知识,希望对你有一定的参考价值。
1 Semaphore 是什么?
Semaphore
,一个计数信号量,JDK 1.5 开始提供的一个同步工具。Semaphore 信号量被用来限制对某些资源同时访问的线程数量。例如接口限流,控制一个文件允许的并发访问数等等。
从概念上讲,Semaphore 维护着一组许可证,每一个需要访问资源的线程都需要从 Semaphore 拿到许可证。
其实,没有真正意义上的许可证对象,Semaphore 是通过维护一个数字来代表可获取的许可证数量,并进行加减操作。每一个调用 acquire
方法的线程会阻塞直到拿到许可证为止;调用 release
方法则会向 Semaphore 增加一个许可证。
而且使用 Semaphore 时,锁还可以被其他线程释放(即增加许可证),这在某些特定情况下非常有用,例如可用于死锁的解除。
package com.chenpi;
import java.util.Random;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
/**
* @Description
* @Author 陈皮
* @Date 2021/7/11
* @Version 1.0
*/
public class ChenPiMain {
public static void main(String[] args) {
// 许可证数量为2,即代表同时只能有2个线程能访问资源
Semaphore semaphore = new Semaphore(2);
for (int i = 0; i < 5; i++) {
new Thread(() -> {
try {
semaphore.acquire();
System.out.println(Thread.currentThread().getName() + "开始执行...");
TimeUnit.SECONDS.sleep(new Random().nextInt(5) + 1);
System.out.println(Thread.currentThread().getName() + "结束执行...");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
semaphore.release();
}
}).start();
}
}
}
// 输出结果如下
Thread-1开始执行...
Thread-0开始执行...
Thread-1结束执行...
Thread-3开始执行...
Thread-3结束执行...
Thread-2开始执行...
Thread-0结束执行...
Thread-4开始执行...
Thread-4结束执行...
Thread-2结束执行...
2 Semaphore 充当互斥锁
只有一个许可证的信号量,可用作互斥锁(类似于 synchronized ),也通常被称为二进制信号量,因为它只有两种状态,一种状态是当前有一个许可证可被获取,另一种状态是当前没有许可证可被获取。
package com.chenpi;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Semaphore;
/**
* @Description
* @Author 陈皮
* @Date 2021/7/11
* @Version 1.0
*/
public class ChenPiMain {
// 共享资源
private static int count = 0;
// 线程数量
private static final int MAX_THREAD = 1000;
public static void main(String[] args) throws InterruptedException {
// 许可证数量为1,充当互斥锁
Semaphore semaphore = new Semaphore(1);
List<Thread> threads = new ArrayList<>(MAX_THREAD);
for (int i = 0; i < MAX_THREAD; i++) {
Thread thread = new Thread(() -> {
try {
semaphore.acquire();
// 每一个线程都对共享资源操作
count++;
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
semaphore.release();
}
});
threads.add(thread);
thread.start();
}
for (Thread thread : threads) {
thread.join();
}
System.out.println(count); // 输出结果1000
}
}
3 Semaphore 源码解析
Semaphore 的类结构图如下:
Semaphore 底层主要通过 AQS(AbstractQueuedSynchronizer)来实现的。我们在构造 Semaphore 时,传入的许可证数量,最终传递给了 AQS 的 state
属性,许可证数量即代表允许同时多少线程访问资源。
public class Semaphore implements java.io.Serializable {
public Semaphore(int permits) {
sync = new NonfairSync(permits);
}
static final class NonfairSync extends Sync {
NonfairSync(int permits) {
super(permits);
}
...
}
abstract static class Sync extends AbstractQueuedSynchronizer {
Sync(int permits) {
setState(permits);
}
...
}
...
}
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {
private volatile int state;0
protected final void setState(int newState) {
state = newState;
}
...
}
Semaphore 的构造方法接收的许可证数量参数,值可以为负数,这样的话需要保证在调用 acquires 方法之前调用过 releases 方法来增加许可证,不然会导致没有线程能获取到许可证。
// 许可证数量为负数
Semaphore semaphore = new Semaphore(-1);
// 所以需要先调用release,那么acquires方法才有意义
semaphore.release();
semaphore.release();
...
semaphore.acquire();
其实 Semaphore 的重点还是 AQS(AbstractQueuedSynchronizer),它只是对 AQS 的封装调整而已。Semaphore 类内部定义了一个抽象静态内部类 Sync
,它负责 Semaphore 的同步实现,Sync 还继承了抽象类 AQS,所以间接使用了 AQS 类的相关方法。
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 1192457210091910933L;
Sync(int permits) {
setState(permits);
}
...
}
Semaphore 的构造方法不仅接收许可证数量,还可以接收一个布尔类型的参数 fairness
,代表是否为公平锁模式还是非公平锁模式。
如果为公平锁则可以使最先调用 acquire 方法的线程优先获取许可证,即阻塞队列中排在前面的线程优先获取到许可证(FIFO)。
如果是非公平锁,则不保证线程获得许可证的顺序,即使阻塞队列中有等待的线程,一个新调用 acquire 的线程会先尝试获取许可证,如果许可证数量不满足才插入队列中进行排队。即一个线程 a 调用 acquire 方法尝试获取许可时,这时刚好有线程 b 释放了许可,并唤醒阻塞队列中第一个等待的线程 b,此时线程a 和线程 b 会共同竞争释放出来的许可证,即线程 a 没有进阻塞队列等待就和线程 b 一起竞争许可证了。
注意,FIFO 排序必须适用于这些方法中的特定内部执行点。 因此,一个线程可能在另一个线程之前调用 acquire 方法,但可能在另一个线程之后到达排序点,类似地,从方法返回时也是如此。所以不能绝对认为先调用 acquire 方法的线程一定先获取到许可证。
还有,不计时的 tryAcquire 方法不会遵循公平原则,它也是会获取目前可用的许可证,而不管前面是否有线程在排队。
public Semaphore(int permits) {
sync = new NonfairSync(permits);
}
public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
NonfairSync
和 FairSync
两个 final 类都是 Sync 的子类,即公平版本和非公平版本。他们都重写了 tryAcquireShared
方法,这个是定义在 AQS 类的方法(方法实现只抛出异常,即要求让实现类重写),
非公平锁模式下尝试获取共享锁的源代码如下,会发现线程直接尝试获取许可,如果许可不够才进入阻塞队列。
// 自旋 + CAS 尝试获取锁
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
// 可获得的许可证数量
int available = getState();
// 预期剩余的许可证数量
int remaining = available - acquires;
// 如果许可证数量小于0或者cas设置state成功,则退出
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
公平锁模式下尝试获取共享锁的源代码如下,会发现线程尝试获取许可之前,会先判断队列中是否有线程在等待。
// 自旋 + CAS 尝试获取锁
protected int tryAcquireShared(int acquires) {
for (;;) {
// 判断等待队列是否有线程
if (hasQueuedPredecessors())
return -1;
// 可获得的许可证数量
int available = getState();
// 预期剩余的许可证数量
int remaining = available - acquires;
// 如果许可证数量小于0或者cas设置state成功,则退出
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
通常情况下,semaphore 被用于控制资源的线程访问数量时,一般初始化为公平锁,这样不会产生饥饿线程。当然,当 semaphore 用于其他类型的同步控制的时候,非公平锁的吞吐量优于公平锁的。
对于释放许可证,源码如下,也是基于 cas 机制进行释放许可证。
public void release() {
// 释放一个许可证
sync.releaseShared(1);
}
public final boolean releaseShared(int arg) {
// 尝试释放
if (tryReleaseShared(arg)) {
// 唤醒阻塞的线程
doReleaseShared();
return true;
}
return false;
}
// 自旋 + CAS 尝试释放锁
protected final boolean tryReleaseShared(int releases) {
for (;;) {
int current = getState();
int next = current + releases;
// 负数代表越界
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
if (compareAndSetState(current, next))
return true;
}
}
内存一致性协议,release 方法调用 happen-before 于 acquire 方法调用。
默认情况下,我们一般一个线程获取一个许可,或者释放一个许可。当然,Semaphore 也支持一次获取或者释放多个许可的,源码如下所示:
public void acquire(int permits) throws InterruptedException {
if (permits < 0) throw new IllegalArgumentException();
sync.acquireSharedInterruptibly(permits);
}
public void release(int permits) {
if (permits < 0) throw new IllegalArgumentException();
sync.releaseShared(permits);
}
acquire()
方法是获取许可时,是可被中断的,如果想不被中断,可以使用 acquireUninterruptibly()
方法,
// 可被中断模式
public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
// 检测线程是否被中断
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
// 此方法内部也会抛出中断异常
doAcquireSharedInterruptibly(arg);
}
// 不可被中断模式
public void acquireUninterruptibly() {
sync.acquireShared(1);
}
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
上面提到的 acquire 如果获取不到许可证的时候,是会一直阻塞的。如果向此时获取不到许可的话,不阻塞马上返回或者等待指定的时间,可以调用如下方法:
// 尝试获取共享锁,不阻塞。而且不管公平还是非公平模式,都不遵循FIFO原则
public boolean tryAcquire() {
return sync.nonfairTryAcquireShared(1) >= 0;
}
// 计时的尝试获取共享锁
public boolean tryAcquire(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}
4 Semaphore 主要方法
- void acquire():获取一个许可证,在获得一个许可证前线程将一直阻塞,或者线程被中断。
- void acquire(int permits):获取指定数量的许可证,在获得一个许可证前线程将一直阻塞,或者线程被中断。
- boolean tryAcquire(int permits):尝试获取指定数量的许可证,不阻塞。
- boolean tryAcquire(int permits, long timeout, TimeUnit unit):尝试在指定时间内获取指定数量的许可证。
- void release():释放一个许可证,将其返回给信号量。
- void release(int permits):释放给定数量的许可证,将它们返回给信号量。
- int availablePermits():返回当前可用的许可证数,此方法通常用于调试和测试目的。
- boolean hasQueuedThreads():查询是否有线程正在等待获取。
- int drainPermits():一次性取走剩下的所有许可证。
- Collection<Thread> getQueuedThreads():获取在排队中的线程列表。
- int getQueueLength():获取排队列表的长度。
- boolean hasQueuedThreads():阻塞队列中是否还有等待的线程。
- void reducePermits(int reduction):减少指定数量的许可证,不会唤醒阻塞中的线程。
5 Semaphore 应用案例
下面介绍下 Semaphore 的简单应用,即某公司只有2个厕所坑位,每一个坑位同时只允许一个员工带薪拉屎使用,而且上厕所讲究先来后到,所以使用公平锁模式。
package com.chenpi;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Random;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
/**
* @Description
* @Author 陈皮
* @Date 2021/9/11
* @Version 1.0
*/
public class Pool {
// 代表厕所的个数
private static final int MAX_AVAILABLE = 2;
private final Semaphore available = new Semaphore(MAX_AVAILABLE, true);
// 争抢厕所
public Toilet getItem() throws InterruptedException {
available.acquire();
return getNextAvailableItem();
}
// 使用完退回厕所
public void putItem(Toilet x) {
if (markAsUnused(x)) {
available.release();
}
}
// 资源,2个厕所
protected Toilet[] items = {new Toilet("一号厕所"), new Toilet("二号厕所")};
// 标识厕所是否在使用中
protected boolean[] used = new boolean[MAX_AVAILABLE];
protected synchronized Toilet getNextAvailableItem() {
for (int i = 0; i < MAX_AVAILABLE; ++i) {
if (!used[i]) {
used[i] = true;
return items[i];
}
}
return null;
}
protected synchronized boolean markAsUnused(Toilet item) {
for (int i = 0; i < MAX_AVAILABLE; ++i) {
if (item == items[i]) {
if (used[i]) {
used[i] = false;
return true;
} else {
return false;
}
}
}
return false;
}
public static void main(String[] args) {
Pool pool = new Pool();
for (int i = 0; i < 10; i++) {
new Thread(() -> {
try {
Toilet toilet = pool.getItem();
SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss");
System.out.println(Thread.currentThread().getName() + "开始使用" + toilet.getName(以上是关于并发限流利器Semaphore的主要内容,如果未能解决你的问题,请参考以下文章