并发限流利器Semaphore

Posted 陈皮的JavaLib

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了并发限流利器Semaphore相关的知识,希望对你有一定的参考价值。

1 Semaphore 是什么?


Semaphore,一个计数信号量,JDK 1.5 开始提供的一个同步工具。Semaphore 信号量被用来限制对某些资源同时访问的线程数量。例如接口限流,控制一个文件允许的并发访问数等等。

从概念上讲,Semaphore 维护着一组许可证,每一个需要访问资源的线程都需要从 Semaphore 拿到许可证。

其实,没有真正意义上的许可证对象,Semaphore 是通过维护一个数字来代表可获取的许可证数量,并进行加减操作。每一个调用 acquire 方法的线程会阻塞直到拿到许可证为止;调用 release 方法则会向 Semaphore 增加一个许可证。

而且使用 Semaphore 时,锁还可以被其他线程释放(即增加许可证),这在某些特定情况下非常有用,例如可用于死锁的解除。

package com.chenpi;

import java.util.Random;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;

/**
 * @Description
 * @Author 陈皮
 * @Date 2021/7/11
 * @Version 1.0
 */
public class ChenPiMain {

  public static void main(String[] args) {

    // 许可证数量为2,即代表同时只能有2个线程能访问资源
    Semaphore semaphore = new Semaphore(2);

    for (int i = 0; i < 5; i++) {
      new Thread(() -> {
        try {
          semaphore.acquire();
          System.out.println(Thread.currentThread().getName() + "开始执行...");
          TimeUnit.SECONDS.sleep(new Random().nextInt(5) + 1);
          System.out.println(Thread.currentThread().getName() + "结束执行...");
        } catch (InterruptedException e) {
          e.printStackTrace();
        } finally {
          semaphore.release();
        }
      }).start();
    }
  }
}

// 输出结果如下
Thread-1开始执行...
Thread-0开始执行...
Thread-1结束执行...
Thread-3开始执行...
Thread-3结束执行...
Thread-2开始执行...
Thread-0结束执行...
Thread-4开始执行...
Thread-4结束执行...
Thread-2结束执行...

2 Semaphore 充当互斥锁


只有一个许可证的信号量,可用作互斥锁(类似于 synchronized ),也通常被称为二进制信号量,因为它只有两种状态,一种状态是当前有一个许可证可被获取,另一种状态是当前没有许可证可被获取。

package com.chenpi;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Semaphore;

/**
 * @Description
 * @Author 陈皮
 * @Date 2021/7/11
 * @Version 1.0
 */
public class ChenPiMain {

  // 共享资源
  private static int count = 0;
  // 线程数量
  private static final int MAX_THREAD = 1000;

  public static void main(String[] args) throws InterruptedException {

    // 许可证数量为1,充当互斥锁
    Semaphore semaphore = new Semaphore(1);

    List<Thread> threads = new ArrayList<>(MAX_THREAD);

    for (int i = 0; i < MAX_THREAD; i++) {
      Thread thread = new Thread(() -> {
        try {
          semaphore.acquire();
          // 每一个线程都对共享资源操作  
          count++;
        } catch (InterruptedException e) {
          e.printStackTrace();
        } finally {
          semaphore.release();
        }
      });
      threads.add(thread);
      thread.start();
    }

    for (Thread thread : threads) {
      thread.join();
    }

    System.out.println(count); // 输出结果1000

  }
}

3 Semaphore 源码解析


Semaphore 的类结构图如下:

Semaphore 底层主要通过 AQS(AbstractQueuedSynchronizer)来实现的。我们在构造 Semaphore 时,传入的许可证数量,最终传递给了 AQS 的 state 属性,许可证数量即代表允许同时多少线程访问资源。

public class Semaphore implements java.io.Serializable {
    public Semaphore(int permits) {
        sync = new NonfairSync(permits);
    }

    static final class NonfairSync extends Sync {

        NonfairSync(int permits) {
            super(permits);
        }
        ...
    }

    abstract static class Sync extends AbstractQueuedSynchronizer {

        Sync(int permits) {
            setState(permits);
        }
        ...
    }
    ...
}

public abstract class AbstractQueuedSynchronizer
    extends AbstractOwnableSynchronizer
    implements java.io.Serializable {
    
    private volatile int state;0
    
    protected final void setState(int newState) {
        state = newState;
    }
    ...
}

Semaphore 的构造方法接收的许可证数量参数,值可以为负数,这样的话需要保证在调用 acquires 方法之前调用过 releases 方法来增加许可证,不然会导致没有线程能获取到许可证。

// 许可证数量为负数
Semaphore semaphore = new Semaphore(-1);
// 所以需要先调用release,那么acquires方法才有意义
semaphore.release();
semaphore.release();
...
semaphore.acquire();

其实 Semaphore 的重点还是 AQS(AbstractQueuedSynchronizer),它只是对 AQS 的封装调整而已。Semaphore 类内部定义了一个抽象静态内部类 Sync,它负责 Semaphore 的同步实现,Sync 还继承了抽象类 AQS,所以间接使用了 AQS 类的相关方法。

abstract static class Sync extends AbstractQueuedSynchronizer {
    private static final long serialVersionUID = 1192457210091910933L;

    Sync(int permits) {
        setState(permits);
    }
    ...
}

Semaphore 的构造方法不仅接收许可证数量,还可以接收一个布尔类型的参数 fairness ,代表是否为公平锁模式还是非公平锁模式。

如果为公平锁则可以使最先调用 acquire 方法的线程优先获取许可证,即阻塞队列中排在前面的线程优先获取到许可证(FIFO)。

如果是非公平锁,则不保证线程获得许可证的顺序,即使阻塞队列中有等待的线程,一个新调用 acquire 的线程会先尝试获取许可证,如果许可证数量不满足才插入队列中进行排队。即一个线程 a 调用 acquire 方法尝试获取许可时,这时刚好有线程 b 释放了许可,并唤醒阻塞队列中第一个等待的线程 b,此时线程a 和线程 b 会共同竞争释放出来的许可证,即线程 a 没有进阻塞队列等待就和线程 b 一起竞争许可证了。

注意,FIFO 排序必须适用于这些方法中的特定内部执行点。 因此,一个线程可能在另一个线程之前调用 acquire 方法,但可能在另一个线程之后到达排序点,类似地,从方法返回时也是如此。所以不能绝对认为先调用 acquire 方法的线程一定先获取到许可证。

还有,不计时的 tryAcquire 方法不会遵循公平原则,它也是会获取目前可用的许可证,而不管前面是否有线程在排队。

public Semaphore(int permits) {
    sync = new NonfairSync(permits);
}

public Semaphore(int permits, boolean fair) {
    sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}

NonfairSyncFairSync 两个 final 类都是 Sync 的子类,即公平版本和非公平版本。他们都重写了 tryAcquireShared 方法,这个是定义在 AQS 类的方法(方法实现只抛出异常,即要求让实现类重写),

非公平锁模式下尝试获取共享锁的源代码如下,会发现线程直接尝试获取许可,如果许可不够才进入阻塞队列。

// 自旋 + CAS 尝试获取锁
final int nonfairTryAcquireShared(int acquires) {
    for (;;) {
        // 可获得的许可证数量
        int available = getState();
        // 预期剩余的许可证数量
        int remaining = available - acquires;
        // 如果许可证数量小于0或者cas设置state成功,则退出
        if (remaining < 0 ||
            compareAndSetState(available, remaining))
            return remaining;
    }
}

公平锁模式下尝试获取共享锁的源代码如下,会发现线程尝试获取许可之前,会先判断队列中是否有线程在等待。

// 自旋 + CAS 尝试获取锁
protected int tryAcquireShared(int acquires) {
    for (;;) {
        // 判断等待队列是否有线程
        if (hasQueuedPredecessors())
            return -1;
        // 可获得的许可证数量
        int available = getState();
        // 预期剩余的许可证数量
        int remaining = available - acquires;
        // 如果许可证数量小于0或者cas设置state成功,则退出
        if (remaining < 0 ||
            compareAndSetState(available, remaining))
            return remaining;
    }
}

通常情况下,semaphore 被用于控制资源的线程访问数量时,一般初始化为公平锁,这样不会产生饥饿线程。当然,当 semaphore 用于其他类型的同步控制的时候,非公平锁的吞吐量优于公平锁的。

对于释放许可证,源码如下,也是基于 cas 机制进行释放许可证。

public void release() {
    // 释放一个许可证
    sync.releaseShared(1);
}

public final boolean releaseShared(int arg) {
    // 尝试释放
    if (tryReleaseShared(arg)) {
        // 唤醒阻塞的线程
        doReleaseShared();
        return true;
    }
    return false;
}

// 自旋 + CAS 尝试释放锁
protected final boolean tryReleaseShared(int releases) {
    for (;;) {
        int current = getState();
        int next = current + releases;
        // 负数代表越界
        if (next < current) // overflow
            throw new Error("Maximum permit count exceeded");
        if (compareAndSetState(current, next))
            return true;
    }
}

内存一致性协议,release 方法调用 happen-before 于 acquire 方法调用。

默认情况下,我们一般一个线程获取一个许可,或者释放一个许可。当然,Semaphore 也支持一次获取或者释放多个许可的,源码如下所示:

public void acquire(int permits) throws InterruptedException {
    if (permits < 0) throw new IllegalArgumentException();
    sync.acquireSharedInterruptibly(permits);
}

public void release(int permits) {
    if (permits < 0) throw new IllegalArgumentException();
    sync.releaseShared(permits);
}

acquire() 方法是获取许可时,是可被中断的,如果想不被中断,可以使用 acquireUninterruptibly() 方法,

// 可被中断模式
public void acquire() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}

public final void acquireSharedInterruptibly(int arg)
    throws InterruptedException {
    // 检测线程是否被中断
    if (Thread.interrupted())
        throw new InterruptedException();
    if (tryAcquireShared(arg) < 0)
        // 此方法内部也会抛出中断异常
        doAcquireSharedInterruptibly(arg);
}


// 不可被中断模式
public void acquireUninterruptibly() {
    sync.acquireShared(1);
}

public final void acquireShared(int arg) {
    if (tryAcquireShared(arg) < 0)
        doAcquireShared(arg);
}

上面提到的 acquire 如果获取不到许可证的时候,是会一直阻塞的。如果向此时获取不到许可的话,不阻塞马上返回或者等待指定的时间,可以调用如下方法:

// 尝试获取共享锁,不阻塞。而且不管公平还是非公平模式,都不遵循FIFO原则
public boolean tryAcquire() {
    return sync.nonfairTryAcquireShared(1) >= 0;
}

// 计时的尝试获取共享锁
public boolean tryAcquire(long timeout, TimeUnit unit)
    throws InterruptedException {
    return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}

4 Semaphore 主要方法


  • void acquire():获取一个许可证,在获得一个许可证前线程将一直阻塞,或者线程被中断。
  • void acquire(int permits):获取指定数量的许可证,在获得一个许可证前线程将一直阻塞,或者线程被中断。
  • boolean tryAcquire(int permits):尝试获取指定数量的许可证,不阻塞。
  • boolean tryAcquire(int permits, long timeout, TimeUnit unit):尝试在指定时间内获取指定数量的许可证。
  • void release():释放一个许可证,将其返回给信号量。
  • void release(int permits):释放给定数量的许可证,将它们返回给信号量。
  • int availablePermits():返回当前可用的许可证数,此方法通常用于调试和测试目的。
  • boolean hasQueuedThreads():查询是否有线程正在等待获取。
  • int drainPermits():一次性取走剩下的所有许可证。
  • Collection<Thread> getQueuedThreads():获取在排队中的线程列表。
  • int getQueueLength():获取排队列表的长度。
  • boolean hasQueuedThreads():阻塞队列中是否还有等待的线程。
  • void reducePermits(int reduction):减少指定数量的许可证,不会唤醒阻塞中的线程。

5 Semaphore 应用案例


下面介绍下 Semaphore 的简单应用,即某公司只有2个厕所坑位,每一个坑位同时只允许一个员工带薪拉屎使用,而且上厕所讲究先来后到,所以使用公平锁模式。

package com.chenpi;

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Random;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;

/**
 * @Description
 * @Author 陈皮
 * @Date 2021/9/11
 * @Version 1.0
 */
public class Pool {
  // 代表厕所的个数
  private static final int MAX_AVAILABLE = 2;
  private final Semaphore available = new Semaphore(MAX_AVAILABLE, true);

  // 争抢厕所  
  public Toilet getItem() throws InterruptedException {
    available.acquire();
    return getNextAvailableItem();
  }

  // 使用完退回厕所  
  public void putItem(Toilet x) {
    if (markAsUnused(x)) {
      available.release();
    }
  }

  // 资源,2个厕所
  protected Toilet[] items = {new Toilet("一号厕所"), new Toilet("二号厕所")};
  // 标识厕所是否在使用中
  protected boolean[] used = new boolean[MAX_AVAILABLE];

  protected synchronized Toilet getNextAvailableItem() {
    for (int i = 0; i < MAX_AVAILABLE; ++i) {
      if (!used[i]) {
        used[i] = true;
        return items[i];
      }
    }
    return null;
  }

  protected synchronized boolean markAsUnused(Toilet item) {
    for (int i = 0; i < MAX_AVAILABLE; ++i) {
      if (item == items[i]) {
        if (used[i]) {
          used[i] = false;
          return true;
        } else {
          return false;
        }
      }
    }
    return false;
  }

  public static void main(String[] args) {

    Pool pool = new Pool();

    for (int i = 0; i < 10; i++) {
      new Thread(() -> {
        try {
          Toilet toilet = pool.getItem();
          SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss");
          System.out.println(Thread.currentThread().getName() + "开始使用" + toilet.getName(以上是关于并发限流利器Semaphore的主要内容,如果未能解决你的问题,请参考以下文章

java高并发系列 - 第15天:JUC中的Semaphore,最简单的限流工具类,必备技能

Java使用Semaphore对单接口进行限流

并发编程之Semaphore源码解析

并发编程之Semaphore源码解析

高并发服务限流实践

用synchronized实现Semaphore