JUCSemaphore源码分析
Posted LL.LEBRON
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了JUCSemaphore源码分析相关的知识,希望对你有一定的参考价值。
文章目录
- Semaphore
- 1.Semaphore概述
- 2.Semaphore应用
- 3.Semaphore原理
- 4.内部类Sync
- 5.内部类NonfairSync
- 6.内部类FairSync
- 7.构造方法
- 8.其他方法
- 8.1 acquire()方法
- 8.2 acquireUninterruptibly()方法
- 8.3 tryAcquire()方法
- 8.4 tryAcquire(long timeout, TimeUnit unit)方法
- 8.5 release()方法
- 8.6 acquire(int permits)方法
- 8.7 acquireUninterruptibly(int permits)方法
- 8.8 tryAcquire(int permits)方法
- 8.9 tryAcquire(int permits, long timeout, TimeUnit unit)方法
- 8.10 release(int permits)方法
- 8.11 availablePermits()方法
- 8.12 drainPermits()方法
- 8.13 reducePermits(int reduction)方法
Semaphore
1.Semaphore概述
简介:Semaphore
,信号量,它保存了一系列的许可(permits),每次调用acquire()都将消耗一个许可,每次调用release()都将归还一个许可。
特性:Semaphore
通常用于限制同一时间对共享资源的访问次数上,也就是常说的限流。
类结构:
Semaphore中包含了一个实现了AQS的同步器Sync
,以及它的两个子类FairSync
和NonFairSync
,这说明Semaphore也是区分公平模式和非公平模式的。
基本使用:
/**
* @author xppll
* @date 2022/1/12 15:22
*/
@Slf4j(topic = "c.TestSemaphore")
public class TestSemaphore
public static void main(String[] args)
//1.创建 semaphore 对象
Semaphore semaphore = new Semaphore(3);
//2.创建10个线程同时运行
for (int i = 0; i < 10; i++)
new Thread(() ->
//3.获取许可,许可数减一
try
semaphore.acquire();
catch (InterruptedException e)
e.printStackTrace();
try
log.debug("running...");
Sleeper.sleep(1);
log.debug("end...");
finally
//4.释放许可
semaphore.release();
).start();
结果:
2.Semaphore应用
- 使用 Semaphore 限流,在访问高峰期时,让请求线程阻塞,高峰期过去再释放许可,当然它只适合限制单机线程数量,并且仅是限制线程数,而不是限制资源数(例如连接数,请对比 Tomcat LimitLatch 的实现)
- 用 Semaphore 实现简单连接池,对比『享元模式』下的实现(用wait notify),性能和可读性显然更好, 注意下面的实现中线程数和数据库连接数是相等的
@Slf4j
class Pool
//1.连接池大小
private final int poolSize;
//2.连接对象数组
private Connection[] connections;
//3.连接状态数组 0 空闲,1 繁忙
private AtomicIntegerArray states;
private Semaphore semaphore;
//4.构造方法初始化
public Pool(int poolSize)
this.poolSize = poolSize;
//让许可数与资源数一致
this.semaphore = new Semaphore(poolSize);
this.connections = new Connection[poolSize];
this.states = new AtomicIntegerArray(new int[poolSize]);
for (int i = 0; i < poolSize; i++)
connections[i] = new MockConnection("连接" + (i + 1));
//5.借连接
public Connection borrow()
//获取许可
try
semaphore.acquire();//没有许的线程,在此等待
catch (InterruptedException e)
e.printStackTrace();
for (int i = 0; i < poolSize; i++)
//获取空闲连接
if (states.get(i) == 0)
if (states.compareAndSet(i, 0, 1))
log.debug("borrow", connections[i]);
return connections[i];
//永远不会执行到这
return null;
//6.归还连接
public void free(Connection conn)
for (int i = 0; i < poolSize; i++)
if (connections[i] == conn)
states.set(i, 0);
log.debug("free", conn);
semaphore.release();
break;
3.Semaphore原理
Semaphore 有点像一个停车场,permits 就好像停车位数量,当线程获得了 permits 就像是获得了停车位,然后停车场显示空余车位减一
刚开始,permits(state)为 3,这时 5 个线程来获取资源
假设其中 Thread-1,Thread-2,Thread-4 cas 竞争成功,而 Thread-0 和 Thread-3 竞争失败,进入 AQS 队列 park 阻塞
这时 Thread-4 释放了 permits,状态如下
接下来 Thread-0 竞争成功,permits 再次设置为 0,设置自己为 head 节点,断开原来的 head 节点,unpark 接下来的 Thread-3 节点,但由于 permits 是 0,因此 Thread-3 在尝试不成功后再次进入 park 状态
4.内部类Sync
//java.util.concurrent.Semaphore.Sync
abstract static class Sync extends AbstractQueuedSynchronizer
private static final long serialVersionUID = 1192457210091910933L;
//构造方法,传入许可次数,放入state中
Sync(int permits)
setState(permits);
//获取许可次数
final int getPermits()
return getState();
//非公平模式尝试获取许可
final int nonfairTryAcquireShared(int acquires)
for (;;)
//获得许可个数
int available = getState();
//减去这次所需的许可数
int remaining = available - acquires;
//如果许可数小于0直接返回
//或者许可数不小于0,尝试原子更新state的值,成功了返回剩余许可
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
//释放许可
protected final boolean tryReleaseShared(int releases)
for (;;)
//获得许可个数
int current = getState();
//加上这次所需的许可数
int next = current + releases;
//检测是否溢出
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
//如果原子更新state的值成功,就说明释放许可成功,则返回true
if (compareAndSetState(current, next))
return true;
//减少许可
final void reducePermits(int reductions)
for (;;)
//获得许可个数
int current = getState();
//减去需要减少的许可数
int next = current - reductions;
//检测是否溢出
if (next > current) // underflow
throw new Error("Permit count underflow");
//原子更新state的值,成功了返回true
if (compareAndSetState(current, next))
return;
//销毁许可
final int drainPermits()
for (;;)
//获得许可个数
int current = getState();
//如果许可数等于0直接返回
//或者许可数不为0,尝试原子更新state值为0
if (current == 0 || compareAndSetState(current, 0))
return current;
从内部类Sync的方法,我们可以知道:
- 许可是在构造方法时传入的
- 许可存放在状态变量state中
- 尝试获取一个许可的时候,则state的值减1
- 当state的值为0的时候,则无法再获取许可
- 释放一个许可的时候,则state的值加1
- 许可的个数可以动态改变
5.内部类NonfairSync
//java.util.concurrent.Semaphore.NonfairSync
static final class NonfairSync extends Sync
private static final long serialVersionUID = -2694183684443567898L;
//构造方法,调用父类Sync的构造方法
NonfairSync(int permits)
super(permits);
//尝试获取许可,调用父类Sync的nonfairTryAcquireShared()方法
protected int tryAcquireShared(int acquires)
return nonfairTryAcquireShared(acquires);
非公平模式下,直接调用父类的nonfairTryAcquireShared()
尝试获取许可。
6.内部类FairSync
//java.util.concurrent.Semaphore.FairSync
static final class FairSync extends Sync
private static final long serialVersionUID = 2014338818796000944L;
//构造方法,调用父类Sync的构造方法
FairSync(int permits)
super(permits);
//尝试获取许可
protected int tryAcquireShared(int acquires)
for (; ; )
//公平锁需要检查前面是否有排队的,如果有的话,获取许可失败,直接返回-1
if (hasQueuedPredecessors())
return -1;
//获得许可个数
int available = getState();
//减去这次所需的许可数
int remaining = available - acquires;
//如果许可数小于0直接返回
//或者许可数不小于0,尝试原子更新state的值,成功了返回剩余许可
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
公平模式下,先检测前面是否有排队的,如果有排队的则获取许可失败,进入队列排队,否则尝试原子更新state的值。
7.构造方法
//构造方法,需要传入许可次数,默认非公平锁
public Semaphore(int permits)
sync = new NonfairSync(permits);
//构造方法,第二个参数可以决定是否公平,true->公平锁
public Semaphore(int permits, boolean fair)
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
- 创建Semaphore时需要传入许可次数。
- Semaphore默认也是非公平模式,但是你可以调用第二个构造方法声明其为公平模式。
8.其他方法
以下的方法都是针对非公平模式来描述。
8.1 acquire()方法
获取一个许可,默认使用的是可中断方式,如果尝试获取许可失败,会进入AQS的队列中排队。
public void acquire() throws InterruptedException
sync.acquireSharedInterruptibly(1);
8.2 acquireUninterruptibly()方法
获取一个许可,非中断方式,如果尝试获取许可失败,会进入AQS的队列中排队。
public void acquireUninterruptibly()
sync.acquireShared(1);
8.3 tryAcquire()方法
尝试获取一个许可,使用Sync的非公平模式尝试获取许可方法,不论是否获取到许可都返回,只尝试一次,不会进入队列排队。
public boolean tryAcquire()
return sync.nonfairTryAcquireShared(1) >= 0;
8.4 tryAcquire(long timeout, TimeUnit unit)方法
尝试获取一个许可,先尝试一次获取许可,如果失败则会等待timeout时间,这段时间内都没有获取到许可,则返回false,否则返回true;
public boolean tryAcquire(long timeout, TimeUnit unit)
throws InterruptedException
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
8.5 release()方法
释放一个许可,释放一个许可时state的值会加1,并且会唤醒下一个等待获取许可的线程。
public void release()
sync.releaseShared(1);
8.6 acquire(int permits)方法
一次获取多个许可,可中断方式。
public void acquire(int permits) throws InterruptedException
if (permits < 0) throw new IllegalArgumentException();
sync.acquireSharedInterruptibly(permits);
8.7 acquireUninterruptibly(int permits)方法
一次获取多个许可,非中断方式。
public void acquireUninterruptibly(int permits)
if (permits < 0) throw new IllegalArgumentException();
sync.acquireShared(permits);
8.8 tryAcquire(int permits)方法
一次尝试获取多个许可,只尝试一次。
public boolean tryAcquire(int permits)
if (permits < 0) throw new IllegalArgumentException();
return sync.nonfairTryAcquireShared(permits) >= 0;
8.9 tryAcquire(int permits, long timeout, TimeUnit unit)方法
尝试获取多个许可,并会等待timeout时间,这段时间没获取到许可则返回false,否则返回true。
public boolean tryAcquire(int permits, long timeout, TimeUnit unit)
throws InterruptedException
if (permits < 0) throw new IllegalArgumentException();
return sync.tryAcquireSharedNanos(permits, unit.toNanos(timeout));
8.10 release(int permits)方法
一次释放多个许可,state的值会相应增加permits的数量。
public void release(int permits)
if (permits < 0) throw new IllegalArgumentException();
sync.releaseShared(permits);
8.11 availablePermits()方法
获取可用的许可次数。
public int availablePermits()
return sync.getPermits();
8.12 drainPermits()方法
销毁当前可用的许可次数,对于已经获取的许可没有影响,会把当前剩余的许可全部销毁。
public int drainPermits()
return sync.drainPermits();
8.13 reducePermits(int reduction)方法
减少许可的次数。
protected void reducePermits(int reduction)
if (reduction < 0) throw new IllegalArgumentException();
sync.reducePermits(reduction);
以上是关于JUCSemaphore源码分析的主要内容,如果未能解决你的问题,请参考以下文章