Java并发工具类:Semaphore

Posted 流楚丶格念

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Java并发工具类:Semaphore相关的知识,希望对你有一定的参考价值。

文章目录

Semaphore

Semaphore概念

Semaphore(信号量)是用来控制同时访问特定资源的线程数量,通过协调各个线程以保证合理地使用公共资源。

Semaphore通过使用计数器来控制对共享资源的访问。 如果计数器大于0,则允许访问。 如果为0,则拒绝访问。 计数器所计数的是允许访问共享资源的许可。 因此,要访问资源,必须从信号量中授予线程许可。

常用方法

常用方法:

方法说明
void acquire()从信号量获取一个许可,如果无可用许可前将一直阻塞等待,
void acquire(int permits)获取指定数目的许可,如果无可用许可前也将会一直阻塞等待
boolean tryAcquire()从信号量尝试获取一个许可,如果无可用许可,直接返回false,不会阻塞
boolean tryAcquire(int permits)尝试获取指定数目的许可,如果无可用许可直接返回false
boolean tryAcquire(int permits, long timeout, TimeUnit unit)在指定的时间内尝试从信号量中获取许可,如果在指定的时间内获取成功,返回true,否则返回false
void release()释放一个许可,别忘了在finally中使用,注意:多次调用该方法,会使信号量的许可数增加,达到动态扩展的效果,如:初始permits为1,调用了两次release,最大许可会改变为2
int availablePermits()获取当前信号量可用的许可

Semaphore原理

Semaphore相关类图如下:

根据UML类图,可以很明显的看出来Semaphore是直接使用AQS实现的。

在构造器部分,如同 CountDownLatch 构造函数传递的初始化计数个数count被赋给了AQS 的state 状态变量一样,Semaphore的信号量个数permits同样赋给了AQS 的state 值。

Semaphore():构造函数

源码如下:

public Semaphore(int permits) 
    sync = new NonfairSync(permits);


public Semaphore(int permits, boolean fair) 
    sync = fair ? new FairSync(permits) : new NonfairSync(permits);


  • permits 初始许可数,也就是最大访问线程数
  • fair 当设置为false时,创建的信号量为非公平锁;当设置为true时,信号量是公平锁

FairSync&NonfairSync实现

Semaphore还分别实现了公平模式FairSync和非公平模式NonfairSync两个内部类。

实际上公平与非公平只是在获取信号量的时候得到体现,它们的释放信号量的方法都是一样的,这就类似于ReentrantLock:公平与非公平只是在获取锁的时候得到体现,它们的释放锁的方法都是一样的

在创建Semaphore时可以使用一个fair变量指定是否使用公平策略,默认是非公平的模式。公平模式会确保所有等待的获取信号量的线程按照先进先出的顺序获取信号量,而非公平模式则没有这个保证。非公平模式的吞吐量比公平模式的吞吐量要高,而公平模式则可以避免线程饥饿。

实现源码如下:

/**
 * 保存某个AQS子类实例
 */
private final Sync sync;

/**
 * 创建具有给定的信号量数和非公平的公平模式的 Semaphore。
 *
 * @param permits 初始的可用信号量数目。此值可能为负数,在这种情况下,必须在授予任何获取信号量前进行释放信号量。
 */
public Semaphore(int permits) 
    //默认初始化NonfairSync实例
    sync = new NonfairSync(permits);


/**
 * 创建具有给定的信号量数和给定的公平设置的 Semaphore。
 *
 * @param permits 初始的可用信号量数目。此值可能为负数,在这种情况下,必须在授予任何获取信号量前进行释放信号量。
 * @param fair    如果此信号量保证在争用时按先进先出的顺序授予信号量,则为 true;否则为 false。
 */
public Semaphore(int permits, boolean fair) 
    //根据fair参数选择初始化一个公平FairSync类或者非公平NonfairSync类的实例
    sync = fair ? new FairSync(permits) : new NonfairSync(permits);



/**
 * 非公平模式的实现
 */
static final class NonfairSync extends Sync 
    private static final long serialVersionUID = -2694183684443567898L;

    NonfairSync(int permits) 
        super(permits);
    



/**
 * 公平模式的实现
 */
static final class FairSync extends Sync 
    private static final long serialVersionUID = 2014338818796000944L;

    FairSync(int permits) 
        super(permits);
    



/**
 * 信号量的同步实现。 使用 AQS 的state状态表示信号量。子分类为公平和非公平模式。
 */
abstract static class Sync extends AbstractQueuedSynchronizer 
    private static final long serialVersionUID = 1192457210091910933L;

    /**
     * 构造器
     *
     * @param permits 初始的可用信号量数目。
     */
    Sync(int permits) 
        //被设置为state值
        setState(permits);
    


acquire():中断获取信号量

public void acquire()

可中断的获取一个信号量,没有则一直阻塞,直到在其他线程提供信号量并唤醒该线程或者线程被中断。获取一个信号量就立即返回,将可用的信号量数减 1。 如果调用此方法时已被中断或者等待时被中断,则抛出 InterruptedException,并且清除当前线程的已中断状态。

public void acquire(int permits)

可中断的获取permits 个信号量,内部调用AQS的acquireSharedInterruptibly方法,这实际上就是共享式可中断获取资源的模版方法,因此Semaphore和CountDownLatch一样都是基于共享资源模式。

源码如下:

/**
 * Semaphore的acquire方法
 * 从信号量获取一个信号量,没有则一直阻塞,直到在其他线程提供信号量并唤醒或者线程被中断。
 *
 * @throws InterruptedException 如果调用此方法时已被中断或者等待时被中断
 */
public void acquire() throws InterruptedException 
    //内部调用AQS的acquireSharedInterruptibly方法
    //这实际上就是共享式可中断获取资源模版方法
    sync.acquireSharedInterruptibly(1);


/**
 * 从信号量获取permits个信号量,没有则一直阻塞,直到在其他线程提供信号量并唤醒或者线程被中断。
 *
 * @param permits 需要获取的信号量数量
 * @throws InterruptedException 如果调用此方法时已被中断或者等待时被中断
 */
public void acquire(int permits) throws InterruptedException 
    if (permits < 0) throw new IllegalArgumentException();
    //参数就是permits
    sync.acquireSharedInterruptibly(permits);


/**
 1. AQS的acquireSharedInterruptibly方法
 2. 共享式可中断获取信号量资源的模版方法
 3.  4. @param arg 需要获取的信号量资源数量
 5. @throws InterruptedException 如果调用此方法时已被中断或者等待时被中断
 */
public final void acquireSharedInterruptibly(int arg)
        throws InterruptedException 
    //最开始就检查一次,如果当前线程是被中断状态,直接抛出异常
    if (Thread.interrupted())
        throw new InterruptedException();
    //调用tryAcquireShared尝试获取共享信号量资源,这个方法是子类自己重写的
    //如果返回值小于0,表示当前线程共享信号量资源失败,否则表示成功
    //Semaphore的FairSync和NonfairSync对tryAcquireShared分别做出了公平和不公平的实现
    if (tryAcquireShared(arg) < 0)
        //获取不到就执行doAcquireSharedInterruptibly方法
        doAcquireSharedInterruptibly(arg);


tryAcquire():尝试获取信号量

public boolean tryAcquire()

仅在调用时至少存在至少一个可用信号量,才尝试获取一个信号量。

public boolean tryAcquire(int permits)

仅在调用时至少存在permits个的信号量,才尝试获取permits个信号量。

实际上内部就是直接调用的nonfairTryAcquireShared方法,即公平模式和非公平模式的tryAcquire实现是一样的!并且该方法不会阻塞线程,获取成功返回true,获取失败返回false!

源码如下:

public boolean tryAcquire(int permits) 
    if (permits < 0) throw new IllegalArgumentException();
    //调用nonfairTryAcquireShared方法
    return sync.nonfairTryAcquireShared(permits) >= 0;


public boolean tryAcquire(int permits) 
    if (permits < 0) throw new IllegalArgumentException();
    //调用nonfairTryAcquireShared方法
    return sync.nonfairTryAcquireShared(permits) >= 0;


release():释放信号量

public void release()

释放一个信号量,信号量总数加1。释放成功后,将唤醒在同步队列中等待获取信号量的结点(线程)!

public void release(int permits)

释放permits个信号量,信号量总数加permits。释放成功后,将唤醒在同步队列中等待获取信号量的结点(线程)

公平模式和非公平模式的信号量的释放都是一样的。实际上内部调用AQS的releaseShared方法,这实际上就是共享式释放资源的模版方法。

源码如下:

/**
 * 释放一个信号量,信号量总数加1。
 */
public void release() 
    //内部调用AQS的releaseShared方法
    //这实际上就是共享式释放资源的模版方法
    sync.releaseShared(1);


/**
 * 释放permits个信号量,信号量总数加permits。
 *
 * @param permits 释放的信号量个数
 */
public void release(int permits) 
    if (permits < 0) throw new IllegalArgumentException();
    //参数就是permits
    sync.releaseShared(permits);



/**
 * AQS的共享模式下释放资源的模版方法。
 * 如果成功释放则会调用doReleaseShared
 */
public final boolean releaseShared(int arg) 
    //tryReleaseShared释放信号量资源,该方法由子类自己实现
    if (tryReleaseShared(arg)) 
        //释放成功,必定调用doReleaseShared尝试唤醒后继结点,即阻塞的线程
        doReleaseShared();
        return true;
    
    return false;


/**
 * Sync的tryReleaseShared实现
 *
 * @param releases 要释放的资源数量
 * @return true 成功 false 失败
 */
protected final boolean tryReleaseShared(int releases) 
    for (; ; ) 
        //很简单,就是尝试CAS的增加state值,增加releases
        int current = getState();
        int next = current + releases;
        //这里可以知道,信号量资源数量不可超过int的最大值
        if (next < current) // overflow
            throw new Error("Maximum permit count exceeded");
        //CAS的增加state值,CAS成功之后返回true,否则循环重试
        if (compareAndSetState(current, next))
            return true;
    


Semaphore代码案例

Semaphore可以用来控制多线程对于共享资源访问的并发量!

案例:若一个工厂有5台机器,但是有8个工人,一台机器同时只能被一个工人使用,只有使用完了,其他工人才能继续使用,每个工人之多工作10秒,最后统计工作量。

我们可以通过Semaphore与之前的CountDownLatch搭配线程池来轻松实现。我们能发现,采用非公平模式的Semaphore时工人的总工作量大部分情况下要高于采用公平模式的工人总工作量,即非公平模式的执行效率更高(这是不一定的)。我们还能发现,在非公平模式工人的总工作量高于公平模式的工人总工作量时,非公平模式下总会有某些工人工(特别是工人0、1、2)作量更多,而另一些工人工作量更少,这就是线程饥饿(就是现在卷起来的环境)!

代码如下:

package com.yyl.threadTest;

import java.util.concurrent.*;
import java.util.concurrent.atomic.LongAdder;
import java.util.concurrent.locks.LockSupport;

public class SemaphoreTest 

    /**
     * 机器数目,实际上就是信号量为5,非公平模式
     */
    // private static Semaphore semaphore = new Semaphore(5, false);
    /**
     * 机器数目,实际上就是信号量为5,公平模式
     */
    private static Semaphore semaphore = new Semaphore(5, true);

    /**
     * 当所有工人都完成任务,那么统计工作量
     */
    private static CountDownLatch countDownLatch = new CountDownLatch(10);

    /**
     * 工人数目,8
     */
    private static final int NUM = 10;

    /**
     * 当前时间
     */
    private static final long NOW = System.nanoTime();

    /**
     * 纳秒单位
     */
    private static final long NANOUNIT = 1000000000;

    /**
     * 工作量
     */
    private static final LongAdder WORKLOAD = new LongAdder();

    /**
     * 工人线程:实现工作方法
     */
    static class Worker implements Runnable 
        public Worker(int num) 
            this.num = num;
        

        private int num;
        private long timed = 20 * NANOUNIT;

        @Override
        public void run() 
            while (true) 
                //获取信号量
                try 
                    if (semaphore.tryAcquire(timed, TimeUnit.NANOSECONDS)) 
                        System.out.println("工人" + this.num + "占用一个机器在生产...");
                        //占用一定时间
                        // LockSupport.parkNanos((long) (NANOUNIT * num * 0.5));
                        //统一调整为2秒,将会看到更明显的Semaphore效果
                        LockSupport.parkNanos((long) (NANOUNIT * 2));

                        System.out.println("工人" + this.num + "生产完毕,释放出机器");
                        //释放信号量
                        //每个工人最多执行20秒
                        WORKLOAD.increment();
                        if ((timed = timed - (System.nanoTime() - NOW)) <= 0) 
                            semaphore.release();
                            countDownLatch.countDown();
                            break;
                        
                        semaphore.release();
                     else 
                        countDownLatch.countDown();
                        break;
                    
                 catch (InterruptedException e) 
                    e.printStackTrace();
                

            
        
    

    public static void main(String[] args) throws InterruptedException 
        // 线程池模拟工人工作
        ExecutorService executorService = Executors.newCachedThreadPool();
        for (int i = 0; i < NUM; i++) 
            executorService.execute(new Worker(i));
        
        executorService.shutdown();
        countDownLatch.await();
        System.out.println("工作完毕,空闲机器为:" + semaphore.availablePermits());
        System.out.println("总工作量为:" + WORKLOAD.sum());
    


运行结果如下:

以上是关于Java并发工具类:Semaphore的主要内容,如果未能解决你的问题,请参考以下文章

Java并发多线程编程——并发工具类Semaphore(信号量)

Java并发工具类控制并发线程数的Semaphore

Java并发工具类:Semaphore

Java并发工具类Semaphore

Java多线程并发工具类-信号量Semaphore对象讲解

Java并发工具类之并发数控制神器Semaphore