Semaphore 源码解读

Posted 小毕超

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Semaphore 源码解读相关的知识,希望对你有一定的参考价值。

一、Semaphore

Semaphore 通过设置一个固定数值的信号量,并发时线程通过 acquire() 获取一个信号量,如果能成功获得则可以继续执行,否则将阻塞等待,当某个线程使用 release() 释放一个信号量时,被阻塞的线程则可以被唤醒重新争抢信号量。根据该特征可以有效控制线程的并发数。

Semaphore 是如何控制并发的呢,本篇文章带领大家一起解读下 Semaphore 的源码。

在进行源码分析前,先回顾下 Semaphore 是如何使用的,例如下面一个案例:

public class Test 

    public static void main(String[] args) 
        Semaphore semaphore = new Semaphore(3);
        for (int i = 0; i < 10; i++) 
            new Thread(() -> 
                try 
                    semaphore.acquire();
                    System.out.println("线程:" + Thread.currentThread().getName() + " 执行, 当前时间:" + LocalDateTime.now().toString());
                    Thread.sleep(1000);
                 catch (Exception e) 
                    e.printStackTrace();
                 finally 
                    semaphore.release();
                
            , String.valueOf(i)).start();
        
    

运行之后,可以看到下面日志:


可以看到每次都是 3 个并发。

在本专栏前面讲解 AQS 源码的时候提到 Semaphore 是基于 AQS 实现的,那是如何使用的 AQS 呢?

AQS 中,如果需要使用AQS的特征则需要子类根据使用的场景,重写下面方法:

//查询是否正在独占资源,condition会使用
boolean isHeldExclusively()	
//独占模式,尝试获取资源,成功则返回true,失败则返回false
boolean tryAcquire(int arg)
//独占模式,尝试释放资源,成功则返回true,失败则返回false
boolean tryRelease(int arg)
//共享模式,尝试获取资源,如果返回负数表示失败,否则表示成功。
int tryAcquireShared(int arg)
//共享模式,尝试释放资源,成功则返回true,失败则返回false。
boolean tryReleaseShared(int arg)

由于这里 Semaphore 的特性,所以下面我们只需关注共享模式下的几个方法即可。

说明:由于 Semaphore 的实现依赖于 AQS ,因此需要对 AQS 有一定的了解,不了解的小伙伴可以看下这篇对 AQS 源码分析的文章,和当前文章在同一专栏:

https://blog.csdn.net/qq_43692950/article/details/129367736

二、Semaphore 中 Sync、FairSync、NonfairSync

2.1 Sync、FairSync、NonfairSync

在声明 Semaphore 时,有两种方式,一种是使用只有一个 permits 参数的构造函数,一种则需要多增加一个 fair 参数:

new Semaphore(3);
new Semaphore(3, true);

当使用只有一个 permits 参数的构造函数声明时,则是创建了一个 NonfairSync 对象:

通过需要多增加一个 fair 参数的构造函数时,则可以根据传入的 fair 选择创建一个 FairSync 对象:

这里也不难理解 NonfairSyncFairSync 其实可以理解为 Semaphore 中的非公平锁和公平锁两种类型。

点到这两个类中,可以看到都继承自 Sync 类:


Sync 类,则继承自 AQS


到这里,我们就可以寻找几个关键的方法,在AQS中共享模式下,两大关键的方法是交由子类进行实现的,分别是 tryAcquireShared 尝试获取资源,和 tryReleaseShared 尝试释放资源。

首先来看 tryAcquireShared 尝试获取资源:

通过 Sync 类的实现源码发现并没有重写 tryAcquireShared方法,那该方法肯定在下面的FairSyncNonfairSync 子类中,分别看下源码确实存在重写的方法:


2.2 NonfairSync 下的 tryAcquireShared

这里先分析下 NonfairSynctryAcquireShared 实现逻辑,可以看到又调用了 nonfairTryAcquireShared 就是 Sync 类中的 nonfairTryAcquireShared ,从命名上可以分析出就是非公平锁的尝试获取资源的操作,直观就是非公平锁下获取锁的操作:

进入到 Sync 类中的 nonfairTryAcquireShared 方法中,可以明显看到一个自旋的操作,在循环中首先获取到 AQS 中的共享资源 state ,并对其进行 - acquires (默认为 1 ,后面会进行说明)操作,其实就是 -1 操作,如果减去的值小于 0 或者修改 state 成功,就返回当前减去的值,否则就自旋的方式再次重试:

上一步的操作主要做了什么目的呢,其实从 Sync 的构造方法就可以看出,创建 Semaphore 传递的 permits 参数被赋值给了 AQS 中的 state ,那此时 state 就记录的当前剩余信号量的大小,获取资源就要进行 -1 标识消耗了一个,最后将减去的值返回出去表示剩余的资源,如果信号量小于 0 了,则表示获取资源失败,直观就是获取锁失败。因为在 AQS 中对 tryAcquireShared 方法的判断是小于 0 时,进行线程的入列和挂起等待。


2.3 FairSync下的 tryAcquireShared

FairSync 类下的 tryAcquireShared 方法中,和前面 NonfairSync 类似,但不同的是,会首先进行 hasQueuedPredecessors 方法的判断:

下面进到 hasQueuedPredecessors 的方法中,可以看到是由 AQS 提供的方法,主要就是判断当前节点线程的前面是否还有等待的线程,因为 FairSync 实现的是公平锁的原则,如果当前线程前面还有等待线程,则获取锁资源也轮不到自个,让前面的老大先来,所以直接返回 -1 表示获取资源失败:

2.4 tryReleaseShared

到这里已经了解到了 tryAcquireShared尝试获取资源的逻辑,上面提到了两个重要方法,还有一个 tryReleaseShared 没有分析,还是首先看 Sync 类中是否有重写该方法:

通过源码可以看到,在 Sync 类中就已经对 tryReleaseShared 进行了重写,而 NonfairSyncFairSync 中都没有重写该方法,那释放资源就是走的 Sync 类下的 tryReleaseShared 方法:


在该方法同样使用了自旋,首先获取到 AQS 中的共享资源 state ,然后进行 + releases (默认情况下为 1 ,后面会说明),其实就是进行 +1 操作,并使用新的值修改 state ,如果修改失败的话则在自旋中继续修改,直到成功后返回 true ,表示释放资源成功。

看到这里就会发现获取资源和释放资源,无非就是对 AQS 中的共享资源 state 进行操作。理解了这两大核心的方法后,下面就可以看如何运用在 Semaphore 中的了。

三、semaphore.acquire()

通过 semaphore.acquire() 可以获取一个信号量,如果获取不到则阻塞等待,那semaphore.acquire() 主要做了什么呢?

下面点到该方法中,可以看到又调用了 sync.acquireSharedInterruptibly 方法,其实就是 AQS 中的 acquireSharedInterruptibly 方法,注意这里传递的参数为 1 ,对应上面括号中的说明:

AQSacquireSharedInterruptibly 方法中,首先会使用子类的 tryAcquireShared 方法获取资源,如果资源数小于 0 ,则认为获取失败,下面使用 doAcquireSharedInterruptibly 进行加入队列并挂起阻塞:

关于AQS如何加入队列和挂起,可以参考文章开始的链接中对 AQS 源码的解读。

四、semaphore.release()

上面在获取不到可用的资源时,则会被 AQS 挂起,因此这里还需要进行释放资源。

下面点到 semaphore.release() 方法中,可以看到又调用了 sync.releaseShared ,其实就是 AQS 中的 releaseShared 方法,注意这里参数默认为 1 ,对应上面括号中的说明:


AQSreleaseShared 方法中,会首先调用子类的 tryReleaseShared 释放资源,释放成功后,会使用 doReleaseShared 进行挂起线程的唤醒:

关于releaseShared方法的源码解读可以参考文章开始的链接中对 AQS 源码的解读。

三、总结

通过阅读 Semaphore的源码可以发现,大量依赖于 AQS 中提供的方法,如果有阅读过本专栏对 ReentrantLock 锁源码的分析,可以发现相似度极高,都是使用 AQS 所提供的的特征实现某些场景的应用。

以上是关于Semaphore 源码解读的主要内容,如果未能解决你的问题,请参考以下文章

Java Review - 并发编程_ 信号量Semaphore原理&源码剖析

Java Review - 并发编程_ 信号量Semaphore原理&源码剖析

AQS 源码解读

死磕 java同步系列之Semaphore源码解析

死磕 java同步系列之Semaphore源码解析

Java并发:Semaphore信号量源码分析