Java Review - 并发编程_ 信号量Semaphore原理&源码剖析
Posted 小小工匠
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Java Review - 并发编程_ 信号量Semaphore原理&源码剖析相关的知识,希望对你有一定的参考价值。
文章目录
概述
Semaphore信号量也是Java中的一个同步器,与CountDownLatch和CycleBarrier不同的是,它内部的计数器是递增的,并且在一开始初始化Semaphore时可以指定一个初始值,但是并不需要知道需要同步的线程个数,而是在需要同步的地方调用acquire方法时指定需要同步的线程个数。
小Demo
同样下面的例子也是在主线程中开启两个子线程让它们执行,等所有子线程执行完毕后主线程再继续向下运行。
import java.time.LocalTime;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
/**
* @author 小工匠
* @version 1.0
* @description: TODO
* @date 2021/12/14 23:59
* @mark: show me the code , change the world
*/
public class SemphoreTest
// 1 创建Sempaphore实例 当前信号量计数器的值为0
private static Semaphore semaphore = new Semaphore(0);
public static void main(String[] args) throws InterruptedException
ExecutorService executorService = Executors.newFixedThreadPool(2);
// 线程1 提交到线程池
executorService.submit(() ->
System.out.println(Thread.currentThread().getName() + " 执行结束 " + LocalTime.now());
// 在每个线程内部调用信号量的release方法,这相当于让计数器值递增1
semaphore.release();
);
// 线程2 提交到线程池
executorService.submit(() ->
try
TimeUnit.SECONDS.sleep(3);
System.out.println(Thread.currentThread().getName() + " 执行结束 " + LocalTime.now());
// 在每个线程内部调用信号量的release方法,这相当于让计数器值递增1
semaphore.release();
catch (InterruptedException e)
e.printStackTrace();
);
// 1 等待子线程执行任务完成后返回
semaphore.acquire(2);
System.out.println(Thread.currentThread().getName() + "任务执行结束 " + LocalTime.now()) ;
// 关闭线程池
executorService.shutdown();
-
首先创建了一个信号量实例,构造函数的入参为0,说明当前信号量计数器的值为0
-
然后main函数向线程池添加两个线程任务,在每个线程内部调用信号量的release方法,这相当于让计数器值递增1
-
最后在main线程里面调用信号量的acquire方法,传参为2说明调用acquire方法的线程会一直阻塞,直到信号量的计数变为2才会返回
看到这里也就明白了,如果构造Semaphore时传递的参数为N,并在M个线程中调用了该信号量的release方法,那么在调用acquire使M个线程同步时传递的参数应该是M+N。
下面举个例子来模拟【CyclicBarrier复用】的功能,代码如下
import java.time.LocalTime;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
/**
* @author 小工匠
* @version 1.0
* @description: TODO
* @date 2021/12/14 23:59
* @mark: show me the code , change the world
*/
public class SemphoreTest2
// 1 创建Sempaphore实例
private static Semaphore semaphore = new Semaphore(0);
public static void main(String[] args) throws InterruptedException
ExecutorService executorService = Executors.newFixedThreadPool(2);
// 线程1 提交到线程池
executorService.submit(() ->
System.out.println(Thread.currentThread().getName() + " 执行结束 " + LocalTime.now());
// 在每个线程内部调用信号量的release方法,这相当于让计数器值递增1
semaphore.release();
);
// 线程2 提交到线程池
executorService.submit(() ->
try
TimeUnit.SECONDS.sleep(3);
System.out.println(Thread.currentThread().getName() + " 执行结束 " + LocalTime.now());
// 在每个线程内部调用信号量的release方法,这相当于让计数器值递增1
semaphore.release();
catch (InterruptedException e)
e.printStackTrace();
);
// 1 等待子线程执行任务完成后返回
semaphore.acquire(2);
// 线程3 提交到线程池
executorService.submit(() ->
System.out.println(Thread.currentThread().getName() + " 执行结束 " + LocalTime.now());
// 在每个线程内部调用信号量的release方法,这相当于让计数器值递增1
semaphore.release();
);
// 线程4 提交到线程池
executorService.submit(() ->
try
TimeUnit.SECONDS.sleep(3);
System.out.println(Thread.currentThread().getName() + " 执行结束 " + LocalTime.now());
// 在每个线程内部调用信号量的release方法,这相当于让计数器值递增1
semaphore.release();
catch (InterruptedException e)
e.printStackTrace();
);
// 2等待子线程执行任务完成后返回
semaphore.acquire(2);
System.out.println(Thread.currentThread().getName() + "任务执行结束 " + LocalTime.now()) ;
// 关闭线程池
executorService.shutdown();
-
首先将线程1和线程2加入到线程池。主线程执行代码(1)后被阻塞。线程1和线程2调用release方法后信号量的值变为了2,这时候主线程的aquire方法会在获取到2个信号量后返回(返回后当前信号量值为0)。
-
然后主线程添加线程3和线程4到线程池,之后主线程执行代码(2)后被阻塞(因为主线程要获取2个信号量,而当前信号量个数为0)。当线程3和线程4执行完release方法后,主线程才返回。
从本例子可以看出,Semaphore在某种程度上实现了CyclicBarrier的复用功能。
类关系概述
由该类图可知,Semaphore还是使用AQS实现的。Sync只是对AQS的一个修饰,并且Sync有两个实现类,用来指定获取信号量时是否采用公平策略。
例如,下面的代码在创建Semaphore时会使用一个变量指定是否使用公平策略。
public Semaphore(int permits)
sync = new NonfairSync(permits);
public Semaphore(int permits, boolean fair)
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
Sync(int permits)
setState(permits);
Semaphore默认采用非公平策略,如果需要使用公平策略则可以使用带两个参数的构造函数来构造Semaphore对象。
另外,如CountDownLatch构造函数传递的初始化信号量个数permits被赋给了AQS的state状态变量一样,这里AQS的state值也表示当前持有的信号量个数。
核心方法源码解读
void acquire()
public void acquire() throws InterruptedException
// 传递参数为1 ,说明要获取一个信号量资源
sync.acquireSharedInterruptibly(1);
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException
// 1 . 如果线程被中断,抛出被中断异常
if (Thread.interrupted())
throw new InterruptedException();
// 2 否则调用Syn子类方法尝试重新获取
if (tryAcquireShared(arg) < 0)
// 如果获取失败,则放入阻塞队列,然后再次尝试,如果失败则调用park方法挂起当前线程
doAcquireSharedInterruptibly(arg);
acquire()
在内部调用了Sync的acquireSharedInterruptibly
方法,后者会对中断进行响应(如果当前线程被中断,则抛出中断异常)。
尝试获取信号量资源的AQS的方法tryAcquireShared
是由Sync的子类实现的,所以这里分别从两方面来讨论。
非公平策略NonfairSync类的tryAcquireShared
方法
继续看下 nonfairTryAcquireShared
final int nonfairTryAcquireShared(int acquires)
for (;;)
// 获取当前信号量的值
int available = getState();
// 计算当前剩余值
int remaining = available - acquires;
// 如果当前值<0 或者 CAS设置成功则返回
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
-
先获取当前信号量值(available),然后减去需要获取的值(acquires),得到剩余的信号量个数(remaining)
-
如果剩余值小于0则说明当前信号量个数满足不了需求,那么直接返回负数,这时当前线程会被放入AQS的阻塞队列而被挂起。
-
如果剩余值大于0,则使用CAS操作设置当前信号量值为剩余值,然后返回剩余值。
另外,由于NonFairSync是非公平获取的,也就是说先调用aquire方法获取信号量的线程不一定比后来者先获取到信号量。
举个例子:
- 线程A先调用了aquire()方法获取信号量,但是当前信号量个数为0,那么线程A会被放入AQS的阻塞队列
- 过一段时间后线程C调用了release()方法释放了一个信号量,如果当前没有其他线程获取信号量,那么线程A就会被激活,然后获取该信号量
- 但是假如线程C释放信号量后,线程C调用了aquire方法,那么线程C就会和线程A去竞争这个信号量资源。
如果采用非公平策略,由nonfairTryAcquireShared的代码可知,线程C完全可以在线程A被激活前,或者激活后先于线程A获取到该信号量,也就是在这种模式下阻塞线程和当前请求的线程是竞争关系,而不遵循先来先得的策略。
公平策略FairSync
类的tryAcquireShared
方法
/**
* Fair version
*/
static final class FairSync extends Sync
private static final long serialVersionUID = 2014338818796000944L;
FairSync(int permits)
super(permits);
protected int tryAcquireShared(int acquires)
for (;;)
if (hasQueuedPredecessors())
return -1;
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
可见公平性还是靠hasQueuedPredecessors
这个函数来保证的。前几篇博文里重点介绍了hasQueuedPredecessors
。 公平策略是看当前线程节点的前驱节点是否也在等待获取该资源,如果是则自己放弃获取的权限,然后当前线程会被放入AQS阻塞队列,否则就去获取。
void acquire(int permits)
该方法与acquire()方法不同,后者只需要获取一个信号量值,而前者则获取permits个。
public void acquire(int permits) throws InterruptedException
if (permits < 0) throw new IllegalArgumentException();
sync.acquireSharedInterruptibly(permits);
void acquireUninterruptibly()
该方法与acquire()类似,不同之处在于该方法对中断不响应,也就是当当前线程调用了acquireUninterruptibly
获取资源时(包含被阻塞后),其他线程调用了当前线程的interrupt(
)方法设置了当前线程的中断标志,此时当前线程并不会抛出InterruptedException
异常而返回。
public void acquireUninterruptibly()
sync.acquireShared(1);
看看响应中断的
void acquireUninterruptibly(int permits)
该方法与acquire(int permits)方法的不同之处在于,该方法对中断不响应。
public void acquireUninterruptibly(int permits)
if (permits < 0) throw new IllegalArgumentException();
sync.acquireShared(permits);
void release()
该方法的作用是把当前Semaphore对象的信号量值增加1,如果当前有线程因为调用aquire方法被阻塞而被放入了AQS的阻塞队列,则会根据公平策略选择一个信号量个数能被满足的线程进行激活,激活的线程会尝试获取刚增加的信号量。
public void release()
// 默认释放1个信号量
sync.releaseShared(1);
/**
* Releases in shared mode. Implemented by unblocking one or more
* threads if @link #tryReleaseShared returns true.
*
* @param arg the release argument. This value is conveyed to
* @link #tryReleaseShared but is otherwise uninterpreted
* and can represent anything you like.
* @return the value returned from @link #tryReleaseShared
*/
public final boolean releaseShared(int arg)
// 2尝试释放资源
if (tryReleaseShared(arg))
// 3 资源释放成功,则调用park方法唤醒AQS 队列里最先挂起的线程
doReleaseShared();
return true;
return false;
protected final boolean tryReleaseShared(int releases)
for (;;)
int current = getState();
int next = current + releases;
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
if (compareAndSetState(current, next)) // cas
return true;
由代码release()->sync.releaseShared(1)可知,release方法每次只会对信号量值增加1,tryReleaseShared方法是无限循环,使用CAS保证了release方法对信号量递增1的原子性操作。tryReleaseShared方法增加信号量值成功后会执行代码(3)doReleaseShared();
,即调用AQS的方法来激活因为调用aquire方法而被阻塞的线程。
private void doReleaseShared()
/*
* Ensure that a release propagates, even if there are other
* in-progress acquires/releases. This proceeds in the usual
* way of trying to unparkSuccessor of head if it needs
* signal. But if it does not, status is set to PROPAGATE to
* ensure that upon release, propagation continues.
* Additionally, we must loop in case a new node is added
* while we are doing this. Also, unlike other uses of
* unparkSuccessor, we need to know if CAS to reset status
* fails, if so rechecking.
*/
for (;;)
Node h = head;
if (h != null && h != tail)
int ws = h.waitStatus;
if (ws == Node.SIGNAL)
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
if (h == head) // loop if head changed
break;
void release(int permits)
该方法与不带参数的release方法的不同之处在于,前者每次调用会在信号量值原来的基础上增加permits,而后者每次增加1。
public void release(int permits)
if (permits < 0) throw new IllegalArgumentException();
sync.releaseShared(permits);
另外可以看到,这里的sync.releaseShared是共享方法,这说明该信号量是线程共享的,信号量没有和固定线程绑定,多个线程可以同时使用CAS去更新信号量的值而不会被阻塞。
小结
Semaphore也是使用AQS实现的,并且获取信号量时有公平策略和非公平策略之分。
以上是关于Java Review - 并发编程_ 信号量Semaphore原理&源码剖析的主要内容,如果未能解决你的问题,请参考以下文章
Java Review - 并发编程_ 信号量Semaphore原理&源码剖析