JDK类库源码分析系列2-AbstractQueuedSynchronizer-Semaphore

Posted _微风轻起

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了JDK类库源码分析系列2-AbstractQueuedSynchronizer-Semaphore相关的知识,希望对你有一定的参考价值。

一、简介说明

​ 这一篇我们来介绍下AbstractQueuedSynchronizer的另一个应用类Semaphore。其有些类似上一篇的CountDownLatch,不过CountDownLatch其是初始化一个数,然后往下降一直到0。而Semaphore是初始化一个数N,然后我们将这个值理解为容量,这个容量最大只能为N了,可以往下降到0,当同时也能添加到最大N。我们可以借助这个来实现最大的并发数量控制,有点类似与令牌桶。

​ 下面我们就来具体梳理下这个类。

一、基本结构

1、Semaphore

public class Semaphore implements java.io.Serializable {

​ 可以看到这个CountDownLatch的继承实现关系很简单,就一个序列化接口。

二、变量

1、sync

private final Sync sync;

​ 其变量与CountDownLatch一样,也就一个sync

三、Sync子类

1、结构

abstract static class Sync extends AbstractQueuedSynchronizer {

​ 其也是直接继承的AQS

2、方法

1)、Sync(int count)

Sync(int permits) {
    setState(permits);
}

​ 这个构造方法是直接将入参赋值给AQSstatus属性,permits也就是表示许可的数量限制。

2)、getPermits()

final int getPermits() {
    return getState();
}

​ 获取剩余的许可数量,也就是获取status的值。

3)、nonfairTryAcquireShared(int acquires)

final int nonfairTryAcquireShared(int acquires) {
    for (;;) {
        int available = getState();
        int remaining = available - acquires;
        if (remaining < 0 ||
            compareAndSetState(available, remaining))
            return remaining;
    }
}

​ 以非公平的方式尝试获取共享模式锁,并返回最新值remaining,我们可以看到其是一个for(;;),这个就是将status通过CAS设置新值remaining(available - acquires)。

4)、tryReleaseShared(int releases)

protected final boolean tryReleaseShared(int releases) {
    for (;;) {
        int current = getState();
        int next = current + releases;
        if (next < current) // overflow
            throw new Error("Maximum permit count exceeded");
        if (compareAndSetState(current, next))
            return true;
    }
}

​ 以共享模式释放锁。不过我们可以看到这个释放是current + releases,也就是添加,这里与其他是有区别的。释放,其就是我们释放一份资源,而这份资源就可以被其他的线程获取,也就是可被使用的资源多了一份。

5)、reducePermits(int reductions)

final void reducePermits(int reductions) {
    for (;;) {
        int current = getState();
        int next = current - reductions;
        if (next > current) // underflow
            throw new Error("Permit count underflow");
        if (compareAndSetState(current, next))
            return;
    }
}

​ 这个方法就是提供的减少资源的方法。

6)、drainPermits()

final int drainPermits() {
    for (;;) {
        int current = getState();
        if (current == 0 || compareAndSetState(current, 0))
            return current;
    }
}

​ 这个是排空资源,也就是将资源status置为0

四、FairSync子类

1、结构

static final class FairSync extends Sync {

​ 其是继承前面的Sync

2、方法

1)、FairSync(int permits)

FairSync(int permits) {
    super(permits);
}

​ 进行初始化设值status

2)、tryAcquireShared(int acquires)

protected int tryAcquireShared(int acquires) {
    for (;;) {
        if (hasQueuedPredecessors())
            return -1;
        int available = getState();
        int remaining = available - acquires;
        if (remaining < 0 ||
            compareAndSetState(available, remaining))
            return remaining;
    }
}

​ 以共享的模式尝试获取资源,返回本次更新的值。首先是通过hasQueuedPredecessors()方法判断有没有前置节点,有的话直接返回-1表示获取失败,需要等待它前面的先获取,然后就通过CAS获取设置status

五、NonfairSync

static final class NonfairSync extends Sync {
    private static final long serialVersionUID = -2694183684443567898L;

    NonfairSync(int permits) {
        super(permits);
    }

    protected int tryAcquireShared(int acquires) {
        return nonfairTryAcquireShared(acquires);
    }
}

​ 以非公平的方式获取资源,其是直接调用的SyncnonfairTryAcquireShared(acquires)方法,关键就是没有hasQueuedPredecessors()的判断。

六、方法

1、Semaphore(int permits)

public Semaphore(int permits) {
    sync = new NonfairSync(permits);
}

​ 这个方法默认的是NonfairSync

2、Semaphore(int permits, boolean fair)

public Semaphore(int permits, boolean fair) {
    sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}

​ 自己选择看是以公平还是非公平。

3、acquire()

public void acquire() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}

​ 获取资源,可以看到其入参是1

4、acquireUninterruptibly()

public void acquireUninterruptibly() {
    sync.acquireShared(1);
}

​ 这个获取资源的方式是没有抛出InterruptedException异常。

5、tryAcquire()

public boolean tryAcquire() {
    return sync.nonfairTryAcquireShared(1) >= 0;
}

​ 尝试获取资源,如果获取>0,就表示获取成功。

6、tryAcquire(long timeout, TimeUnit unit)

public boolean tryAcquire(long timeout, TimeUnit unit)
    throws InterruptedException {
    return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}

​ 有时间限制的获取。

7、release()

public void release() {
    sync.releaseShared(1);
}

​ 释放资源,入参也是类似acquire()方法为1

8、availablePermits()

public int availablePermits() {
    return sync.getPermits();
}

​ 还剩余的资源许可数。

9、drainPermits()

public int drainPermits() {
    return sync.drainPermits();
}

​ 这个方式是直接调用前面的,将资源许可置为0

10、reducePermits(int reduction)

protected void reducePermits(int reduction) {
    if (reduction < 0) throw new IllegalArgumentException();
    sync.reducePermits(reduction);
}

​ 消减资源许可。

以上是关于JDK类库源码分析系列2-AbstractQueuedSynchronizer-Semaphore的主要内容,如果未能解决你的问题,请参考以下文章

JDK类库源码分析系列2-AbstractQueuedSynchronizer-CountDownLatch

JDK类库源码分析系列2-AbstractQueuedSynchronizer-ReentrantReadWriteLock

JDK类库源码分析系列2-AbstractQueuedSynchronizer-Semaphore

JDK源码分析实战系列-ThreadLocal

源码分析系列1:HashMap源码分析(基于JDK1.8)

《JDK源码分析》相关系列目录(JAVA 小虚竹)