24.Semaphore的作用和原理
Posted 纵横千里,捭阖四方
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了24.Semaphore的作用和原理相关的知识,希望对你有一定的参考价值。
1.Semaphore的功能和作用
Semaphore就是信号灯的意思,主要功能是用来限制对某个资源同时访问的线性数量,它有两个核心方法:
-
acquire()方法,获取一个令牌。
-
release()方法,释放一个令牌。
如下图所示,当多个线程访问某个限制访问流量的资源时,需要先调用acquire()方法获得一个访问令牌,如果能正常获得,则表示允许访问,如果令牌不够,则会阻塞当前线程。当某个获得令牌的线程通过release()方法释放一个令牌后(令牌数量是固定的),被阻塞在acquire()方法的线程就有机会获得这个释放的令牌,从而获得访问权限。
我们看一个使用Semaphore的例子
public class SemaphoreExample
public static void main(String[] args)
Semaphore semaphore=new Semaphore(2);
ExecutorService service= Executors.newCachedThreadPool();
for (int i = 0; i < 1000; i++)
service.execute(new SomeTask(semaphore));
service.shutdown();
static class SomeTask implements Runnable
private Semaphore semaphore;
public SomeTask(Semaphore semaphore)
this.semaphore=semaphore;
@Override
public void run()
try
semaphore.acquire();
System.out.println(Thread.currentThread().getName()+" 获得一个令牌");
TimeUnit.SECONDS.sleep(1);
catch (InterruptedException e)
e.printStackTrace();
finally
System.out.println(Thread.currentThread().getName()+" 释放一个令牌");
semaphore.release(1000);
这里定义了数量为2的令牌实例,然后定义了一个线程池来执行SomeTask任务。在SomeTask中,使用了semaphore.acquire()方法来限制最大访问线程数量,用来模拟远远超过令牌数的线程来访问SomeTask的场景。
Semaphore方法的核心就是一个许可证管理,通过acquire()方法获得一个许可证,通过release()方法释放一个许可证,实际上并没有真实的令牌发给线程,只是维护了一个可分配数量进行计数维护。
在Semaphore中有两个接口,一起看一下:
-
Semaphore(int permits, boolean fair),permits就是令牌数,fair表示公平性,也就是在令牌被释放的临界点是否允许提前抢占到令牌。
-
acquire(int permits) :获取指定数量的令牌,如果数量不足,则会阻塞当前线程。
-
tryAcquire(int permits) :尝试获取指定数量的令牌,此过程是非阻塞的,如果令牌数不够就返回false。
-
release(int permits):释放指定permits数量的令牌。
-
drainPermits():当前线程获得剩下的所有可用令牌。
-
hasQueuedThread():判断当前Semaphore实例上是否存在正在等待令牌的线程。
Semaphore常见的应用场景就是实现线程之间的限流,或者限制某些共享资源的访问数量。
2 Semaphore原理分析
Semaphore实际上也是基于AQS的共享锁来实现的,因为在Semaphore中允许多个线程获得令牌被唤醒。所以在基于AQS的实现上我们可以推测出,在构建Semaphore实例时传递的参数是permits,其实还是AQS中state属性,假设初始化是permits=5,那么每次调用release()方法,都是针对state进行递减。因此当state=5时,意味着所有的令牌都已经用完,后续的线程都会以共享锁类型添加到CLH队列中,而当state<5时,说明已经有其他线程获得令牌了,可以从CLH队列中唤醒头部的线程。
从根本上说,Semaphore就是通过重写AQS中的下面两个方法来实现不同的业务场景的。
-
tryAcquireShared()方法:抢占共享锁。
-
tryReleaseShared()方法:释放共享锁。
2.1 令牌获取过程
由于共享锁的整体源码已经分析过了,这里只列出Semaphore中不一样的内容。
public Semaphore(int permits)
sync = new NonfairSync(permits);
可以看到默认Semaphore是非公平策略,我们继续看NonfairSync类。
static final class NonfairSync extends Sync
private static final long serialVersionUID = -2694183684443567898L;
NonfairSync(int permits)
super(permits);
protected int tryAcquireShared(int acquires)
return nonfairTryAcquireShared(acquires);
在非公平同步策略中,tryAcquireShared()方法直接调用nonfairTryAcquireShared()方法竞争共享锁,代码如下:
final int nonfairTryAcquireShared(int acquires)
for (;;)
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
不管当前AQS的CLH队列中是否有线程排队,对非公平策略来说,直接尝试竞争令牌,有可能再临界点的时候提前抢占到令牌。另外从nonfairTryAcquireShared()方法的实现中发现,所谓的抢占令牌资源,其实就是判断state变量的值。
-
remaining = available - acquires,用当前的令牌数量减去本次需要抢占的令牌数。
-
如果remaining<0,则说明令牌数量不够,直接返回remaining。
-
否则就更新state的值,该值表示本次抢占的令牌数量。
-
-
返回的remaining如果小于0,则直接让当前线程进入同步队列。
下面的的代码表示公平策略下的竞争令牌的方式,可以发现在通过CAS更新令牌数之前,多个了对hasQueuedPredecessors()方法的判断,该方法的返回结果表示当前同步队列中是否有其他线程在排队,如果有就返回true,这就是FIFO的特性。
static final class FairSync extends Sync
private static final long serialVersionUID = 2014338818796000944L;
FairSync(int permits)
super(permits);
protected int tryAcquireShared(int acquires)
for (;;)
if (hasQueuedPredecessors())
return -1;
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
tryAcquireShared()方法返回的只如果小于0,说明令牌数不够,则调用doAcquireSharedInterruptibly()方法将当前线程加入到同步队列中,而同步队列的整个执行过程和上一节的CountDownLatch的执行过程完全一致。
//在Semaphore类中
public void acquire() throws InterruptedException
sync.acquireSharedInterruptibly(1);
//在AQS中
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
2.2 释放令牌的过程
通过release()方法释放令牌,本质上是对state字段的值进行累加,代码如下:
public void release()
sync.releaseShared(1);
#在AQS中
public final boolean releaseShared(int arg)
if (tryReleaseShared(arg))
doReleaseShared();
return true;
return false;
# Semaphore类中
protected final boolean tryReleaseShared(int releases)
for (;;)
int current = getState();
int next = current + releases;
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
if (compareAndSetState(current, next))
return true;
我们从上述代码中发现 ,线程每调用一次release()方法就会释放一个令牌,实际上是对state变量的值进行累加,最终通过自旋的方式实现更新过程的原子性。
注意这时候release()方法并没有限制state累加的数量不能超过构造方法限制的permits数量,这意味着通过release()可以扩大令牌的数量,例如初始化时permits数量为5,调用release(1000)使得令牌数量变成1000,只要不超过int类型的整数值(next<current)就不会有问题。
另外,不是必须通过acquire()方法的线程来调用release()方法,任意一个线程都可以调用release()方法来释放令牌。这个是专门为开发者设计的“后门”,可以增加程序的灵活性。
还有就是增加的令牌数 ,可以通过reducePermits()方法进行减少,代码如下:
protected void reducePermits(int reduction)
if (reduction < 0) throw new IllegalArgumentException();
sync.reducePermits(reduction);
final void reducePermits(int reductions)
for (;;)
int current = getState();
int next = current - reductions;
if (next > current) // underflow
throw new Error("Permit count underflow");
if (compareAndSetState(current, next))
return;
这就意味着,通过release()和reducePermits()两个方法可以动态地对state令牌数实现增加和减少的调整。
以上是关于24.Semaphore的作用和原理的主要内容,如果未能解决你的问题,请参考以下文章
《非线性泛函分析导论(完): 形变定理与 MinMax 原理》
临界区(critical section 每个线程中访问 临界资源 的那段代码)和互斥锁(mutex)的区别(进程间互斥量共享内存虚拟地址)