JUCSemaphore源码分析

Posted LL.LEBRON

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了JUCSemaphore源码分析相关的知识,希望对你有一定的参考价值。

Semaphore

1.Semaphore概述

简介:Semaphore信号量,它保存了一系列的许可(permits),每次调用acquire()都将消耗一个许可,每次调用release()都将归还一个许可。

特性:Semaphore通常用于限制同一时间对共享资源的访问次数上,也就是常说的限流。

类结构:

Semaphore中包含了一个实现了AQS的同步器Sync,以及它的两个子类FairSyncNonFairSync,这说明Semaphore也是区分公平模式和非公平模式的。

基本使用:

/**
 * @author xppll
 * @date 2022/1/12 15:22
 */
@Slf4j(topic = "c.TestSemaphore")
public class TestSemaphore 
    public static void main(String[] args) 
        //1.创建 semaphore 对象
        Semaphore semaphore = new Semaphore(3);

        //2.创建10个线程同时运行
        for (int i = 0; i < 10; i++) 
            new Thread(() -> 
                //3.获取许可,许可数减一
                try 
                    semaphore.acquire();
                 catch (InterruptedException e) 
                    e.printStackTrace();
                
                try 
                    log.debug("running...");
                    Sleeper.sleep(1);
                    log.debug("end...");
                 finally 
                    //4.释放许可
                    semaphore.release();
                
            ).start();
        
    

结果:

2.Semaphore应用

  • 使用 Semaphore 限流,在访问高峰期时,让请求线程阻塞,高峰期过去再释放许可,当然它只适合限制单机线程数量,并且仅是限制线程数,而不是限制资源数(例如连接数,请对比 Tomcat LimitLatch 的实现)
  • 用 Semaphore 实现简单连接池,对比『享元模式』下的实现(用wait notify),性能和可读性显然更好, 注意下面的实现中线程数和数据库连接数是相等的
@Slf4j
class Pool 
    //1.连接池大小
    private final int poolSize;
    //2.连接对象数组
    private Connection[] connections;
    //3.连接状态数组 0 空闲,1 繁忙
    private AtomicIntegerArray states;

    private Semaphore semaphore;

    //4.构造方法初始化
    public Pool(int poolSize) 
        this.poolSize = poolSize;
        //让许可数与资源数一致
        this.semaphore = new Semaphore(poolSize);
        this.connections = new Connection[poolSize];
        this.states = new AtomicIntegerArray(new int[poolSize]);
        for (int i = 0; i < poolSize; i++) 
            connections[i] = new MockConnection("连接" + (i + 1));
        
    

    //5.借连接
    public Connection borrow() 
        //获取许可
        try 
            semaphore.acquire();//没有许的线程,在此等待
         catch (InterruptedException e) 
            e.printStackTrace();
        
        for (int i = 0; i < poolSize; i++) 
            //获取空闲连接
            if (states.get(i) == 0) 
                if (states.compareAndSet(i, 0, 1)) 
                    log.debug("borrow", connections[i]);
                    return connections[i];
                
            
        
        //永远不会执行到这
        return null;
    

    //6.归还连接
    public void free(Connection conn) 
        for (int i = 0; i < poolSize; i++) 
            if (connections[i] == conn) 
                states.set(i, 0);
                log.debug("free", conn);
                semaphore.release();
                break;
            
        
    

3.Semaphore原理

Semaphore 有点像一个停车场,permits 就好像停车位数量,当线程获得了 permits 就像是获得了停车位,然后停车场显示空余车位减一

刚开始,permits(state)为 3,这时 5 个线程来获取资源

假设其中 Thread-1,Thread-2,Thread-4 cas 竞争成功,而 Thread-0 和 Thread-3 竞争失败,进入 AQS 队列 park 阻塞

这时 Thread-4 释放了 permits,状态如下

接下来 Thread-0 竞争成功,permits 再次设置为 0,设置自己为 head 节点,断开原来的 head 节点,unpark 接下来的 Thread-3 节点,但由于 permits 是 0,因此 Thread-3 在尝试不成功后再次进入 park 状态

4.内部类Sync

//java.util.concurrent.Semaphore.Sync
abstract static class Sync extends AbstractQueuedSynchronizer 
    private static final long serialVersionUID = 1192457210091910933L;

    //构造方法,传入许可次数,放入state中
    Sync(int permits) 
        setState(permits);
    

    //获取许可次数
    final int getPermits() 
        return getState();
    

    //非公平模式尝试获取许可
    final int nonfairTryAcquireShared(int acquires) 
        for (;;) 
            //获得许可个数
            int available = getState();
            //减去这次所需的许可数
            int remaining = available - acquires;
            //如果许可数小于0直接返回
            //或者许可数不小于0,尝试原子更新state的值,成功了返回剩余许可
            if (remaining < 0 ||
                compareAndSetState(available, remaining))
                return remaining;
        
    

    //释放许可
    protected final boolean tryReleaseShared(int releases) 
        for (;;) 
            //获得许可个数
            int current = getState();
            //加上这次所需的许可数
            int next = current + releases;
            //检测是否溢出
            if (next < current) // overflow
                throw new Error("Maximum permit count exceeded");
            //如果原子更新state的值成功,就说明释放许可成功,则返回true
            if (compareAndSetState(current, next))
                return true;
        
    

    //减少许可
    final void reducePermits(int reductions) 
        for (;;) 
            //获得许可个数
            int current = getState();
            //减去需要减少的许可数
            int next = current - reductions;
            //检测是否溢出
            if (next > current) // underflow
                throw new Error("Permit count underflow");
            //原子更新state的值,成功了返回true
            if (compareAndSetState(current, next))
                return;
        
    

    //销毁许可
    final int drainPermits() 
        for (;;) 
            //获得许可个数
            int current = getState();
            //如果许可数等于0直接返回
            //或者许可数不为0,尝试原子更新state值为0
            if (current == 0 || compareAndSetState(current, 0))
                return current;
        
    

从内部类Sync的方法,我们可以知道:

  1. 许可是在构造方法时传入的
  2. 许可存放在状态变量state中
  3. 尝试获取一个许可的时候,则state的值减1
  4. 当state的值为0的时候,则无法再获取许可
  5. 释放一个许可的时候,则state的值加1
  6. 许可的个数可以动态改变

5.内部类NonfairSync

//java.util.concurrent.Semaphore.NonfairSync
static final class NonfairSync extends Sync 
    private static final long serialVersionUID = -2694183684443567898L;

    //构造方法,调用父类Sync的构造方法
    NonfairSync(int permits) 
        super(permits);
    

    //尝试获取许可,调用父类Sync的nonfairTryAcquireShared()方法
    protected int tryAcquireShared(int acquires) 
        return nonfairTryAcquireShared(acquires);
    

非公平模式下,直接调用父类的nonfairTryAcquireShared()尝试获取许可。

6.内部类FairSync

//java.util.concurrent.Semaphore.FairSync
static final class FairSync extends Sync 
    private static final long serialVersionUID = 2014338818796000944L;

    //构造方法,调用父类Sync的构造方法
    FairSync(int permits) 
        super(permits);
    

    //尝试获取许可
    protected int tryAcquireShared(int acquires) 
        for (; ; ) 
            //公平锁需要检查前面是否有排队的,如果有的话,获取许可失败,直接返回-1
            if (hasQueuedPredecessors())
                return -1;
            //获得许可个数
            int available = getState();
            //减去这次所需的许可数
            int remaining = available - acquires;
            //如果许可数小于0直接返回
            //或者许可数不小于0,尝试原子更新state的值,成功了返回剩余许可
            if (remaining < 0 ||
                compareAndSetState(available, remaining))
                return remaining;
        
    

公平模式下,先检测前面是否有排队的,如果有排队的则获取许可失败,进入队列排队,否则尝试原子更新state的值。

7.构造方法

//构造方法,需要传入许可次数,默认非公平锁
public Semaphore(int permits) 
    sync = new NonfairSync(permits);


//构造方法,第二个参数可以决定是否公平,true->公平锁
public Semaphore(int permits, boolean fair) 
    sync = fair ? new FairSync(permits) : new NonfairSync(permits);

  • 创建Semaphore时需要传入许可次数。
  • Semaphore默认也是非公平模式,但是你可以调用第二个构造方法声明其为公平模式。

8.其他方法

以下的方法都是针对非公平模式来描述。

8.1 acquire()方法

获取一个许可,默认使用的是可中断方式,如果尝试获取许可失败,会进入AQS的队列中排队。

public void acquire() throws InterruptedException 
    sync.acquireSharedInterruptibly(1);

8.2 acquireUninterruptibly()方法

获取一个许可,非中断方式,如果尝试获取许可失败,会进入AQS的队列中排队。

public void acquireUninterruptibly() 
    sync.acquireShared(1);

8.3 tryAcquire()方法

尝试获取一个许可,使用Sync的非公平模式尝试获取许可方法,不论是否获取到许可都返回,只尝试一次,不会进入队列排队。

public boolean tryAcquire() 
    return sync.nonfairTryAcquireShared(1) >= 0;

8.4 tryAcquire(long timeout, TimeUnit unit)方法

尝试获取一个许可,先尝试一次获取许可,如果失败则会等待timeout时间,这段时间内都没有获取到许可,则返回false,否则返回true;

public boolean tryAcquire(long timeout, TimeUnit unit)
        throws InterruptedException 
    return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));

8.5 release()方法

释放一个许可,释放一个许可时state的值会加1,并且会唤醒下一个等待获取许可的线程。

public void release() 
    sync.releaseShared(1);

8.6 acquire(int permits)方法

一次获取多个许可,可中断方式。

public void acquire(int permits) throws InterruptedException 
    if (permits < 0) throw new IllegalArgumentException();
    sync.acquireSharedInterruptibly(permits);

8.7 acquireUninterruptibly(int permits)方法

一次获取多个许可,非中断方式。

public void acquireUninterruptibly(int permits) 
    if (permits < 0) throw new IllegalArgumentException();
    sync.acquireShared(permits);

8.8 tryAcquire(int permits)方法

一次尝试获取多个许可,只尝试一次。

public boolean tryAcquire(int permits) 
    if (permits < 0) throw new IllegalArgumentException();
    return sync.nonfairTryAcquireShared(permits) >= 0;

8.9 tryAcquire(int permits, long timeout, TimeUnit unit)方法

尝试获取多个许可,并会等待timeout时间,这段时间没获取到许可则返回false,否则返回true。

public boolean tryAcquire(int permits, long timeout, TimeUnit unit)
        throws InterruptedException 
    if (permits < 0) throw new IllegalArgumentException();
    return sync.tryAcquireSharedNanos(permits, unit.toNanos(timeout));

8.10 release(int permits)方法

一次释放多个许可,state的值会相应增加permits的数量。

public void release(int permits) 
    if (permits < 0) throw new IllegalArgumentException();
    sync.releaseShared(permits);

8.11 availablePermits()方法

获取可用的许可次数。

public int availablePermits() 
    return sync.getPermits();

8.12 drainPermits()方法

销毁当前可用的许可次数,对于已经获取的许可没有影响,会把当前剩余的许可全部销毁。

public int drainPermits() 
    return sync.drainPermits();

8.13 reducePermits(int reduction)方法

减少许可的次数。

protected void reducePermits(int reduction) 
    if (reduction < 0) throw new IllegalArgumentException();
    sync.reducePermits(reduction);

以上是关于JUCSemaphore源码分析的主要内容,如果未能解决你的问题,请参考以下文章

Mesos源码分析

Mybatis源码分析

Spring源码分析专题——目录

ARouter源码分析

Handler源码分析

Eureka源码分析(六) TimedSupervisorTask