关于Semaphore

Posted zjoe80

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了关于Semaphore相关的知识,希望对你有一定的参考价值。

一.Semaphore是什么

Semaphore 是一个计数信号量,必须由获取它的线程释放。用于管理一组资源,内部是基于AQS的共享模式。它相当于给线程规定一个量从而控制允许活动的线程数。

Semaphore 是 synchronized 的加强版,作用是控制线程的并发数量。就这一点而言,单纯的synchronized 关键字是实现不了的。

Semaphore是一种在多线程环境下使用的设施,该设施负责协调各个线程,以保证它们能够正确、合理的使用公共资源的设施,也是操作系统中用于控制进程同步互斥的量。

 

二.Semaphore能做什么

常用于限制可以访问某些资源的线程数量,例如通过 Semaphore 限流;实现一个文件允许的并发访问数。

Semaphore 可以用来限流(流量控制),在一些公共资源有限的场景下,Semaphore 可以派上用场。

比如在做日志清洗时,可能有几十个线程在并发清洗,但是将清洗的数据存入到数据库时,可能只给数据库分配了 10 个连接池,

这样两边的线程数就不对等了,必须保证同时只能有 10 个线程获取数据库链接,否则就会存在大量线程无法连接上数据库。

 

三.Semaphore原理

以一个停车场是运作为例。为了简单起见,假设停车场只有三个车位,一开始三个车位都是空的。

这时如果同时来了五辆车,看门人允许其中三辆不受阻碍的进入,然后放下车拦,剩下的车则必须在入口等待,此后来的车也都不得不在入口处等待。

这时,有一辆车离开停车场,看门人得知后,打开车拦,放入一辆,如果又离开两辆,则又可以放入两辆,如此往复。

这个停车系统中,每辆车就好比一个线程,看门人就好比一个信号量,看门人限制了可以活动的线程。

假如里面依然是三个车位,但是看门人改变了规则,要求每次只能停两辆车,那么一开始进入两辆车,后面得等到有车离开才能有车进入,但是得保证最多停两辆车。

对于Semaphore类而言,就如同一个看门人,限制了可活动的线程数。

Semaphore的主要方法摘要:

(1)void acquire():从此信号量获取一个许可,在提供一个许可前一直将线程阻塞,否则线程被中断。

(2)void release():释放一个许可,将其返回给信号量。

(3)int availablePermits():返回此信号量中当前可用的许可数。

(4)boolean hasQueuedThreads():查询是否有线程正在等待获取。

Semaphore 跟锁(synchronized、Lock)有点相似,不同的地方是,锁同一时刻只允许一个线程访问某一资源,而 Semaphore 则可以控制同一时刻多个线程访问某一资源。

Semaphore(信号量)并不是 Java 语言特有的,几乎所有的并发语言都有。所以也就存在一个信号量模型的概念,如下图所示:

技术图片

信号量模型比较简单,可以概括为:一个计数器、一个队列、三个方法。

计数器:记录当前还可以运行多少个资源访问资源。

队列:待访问资源的线程

Semaphore 只有3个操作:初始化;增加;减少。

三个方法:

init():初始化计数器的值,可就是允许多少线程同时访问资源。

up():计数器加1,有线程归还资源时,如果计数器的值大于或者等于 0 时,从等待队列中唤醒一个线程。

down():计数器减 1,有线程占用资源时,如果此时计数器的值小于 0 ,线程将被阻塞。这三个方法都是原子性的,由实现方保证原子性。

Semaphore 是基于 AbstractQueuedSynchronizer 接口实现信号量模型的。

AbstractQueuedSynchronizer 提供了一个基于 FIFO 队列,可以用于构建锁或者其他相关同步装置的基础框架,利用了一个 int 来表示状态,通过类似 acquire 和 release 的方式来操纵状态。

在 Semaphore 类中,实现了两种信号量:公平的信号量和非公平的信号量,公平的信号量就是大家排好队,先到先进,非公平的信号量就是不一定先到先进,允许插队。

非公平的信号量效率会高一些,所以默认使用的是非公平信号量。具体的可以查看 Semaphore 类实现源码。

 

Semaphore用于管理信号量,在并发编程中,可以控制返访问同步代码的线程数量。Semaphore在实例化时传入一个int值,也就是指明信号数量。

主要方法有两个:acquire()和release()。acquire()用于请求信号,每调用一次,信号量便少一个。

release()用于释放信号,调用一次信号量加一个。信号量用完以后,后续使用acquire()方法请求信号的线程便会加入阻塞队列挂起。

Semaphore对于信号量的控制是基于AQS(AbstractQueuedSynchronizer)来做的。Semaphore有一个内部类Sync继承了AQS。

而且Semaphore中还有两个内部类FairSync和NonfairSync继承Sync,也就是说Semaphore有公平锁和非公平锁之分。

在 Semaphore 类中,实现了两种信号量:公平的信号量和非公平的信号量,公平的信号量就是大家排好队,先到先进,非公平的信号量就是不一定先到先进,允许插队。

非公平的信号量效率会高一些,所以默认使用的是非公平信号量。

以下是Semaphore中内部类的结构:

技术图片

 

四.Semaphore使用

例子一:

每个人的个人信息,那么一个人占用一个线程,并用Semphore类创建对象从而初始化信号量,控制可活动的线程数。

具体代码如下:

package concurrent;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.LinkedBlockingQueue;
public class SemaphoreDemo {
private static final Semaphore semaphore=new Semaphore(3);
private static final ThreadPoolExecutor threadPool=new ThreadPoolExecutor(5,10,60,TimeUnit.SECONDS,new LinkedBlockingQueue<Runnable>()); private static class InformationThread extends Thread{ private final String name; private final int age; public InformationThread(String name,int age) { this.name=name; this.age=age; } public void run() { try { semaphore.acquire(); System.out.println(Thread.currentThread().getName()+":大家好,我是"+name+"我今年"+age+"岁当前时间为:"+System.currentTimeMillis()); Thread.sleep(1000); System.out.println(name+"要准备释放许可证了,当前时间为:"+System.currentTimeMillis()); System.out.println("当前可使用的许可数为:"+semaphore.availablePermits()); semaphore.release(); } catch(InterruptedException e) { e.printStackTrace(); } } } public static void main(String[] args) { String[] name= {"李明","王五","张杰","王强","赵二","李四","张三"}; int[] age= {26,27,33,45,19,23,41}; for(int i=0;i<7;i++) { Thread t1=new InformationThread(name[i],age[i]); threadPool.execute(t1); } } }

运行结果:

pool-1-thread-3:大家好,我是张杰我今年33岁当前时间为:1520424000186
pool-1-thread-1:大家好,我是李明我今年26岁当前时间为:1520424000186
pool-1-thread-2:大家好,我是王五我今年27岁当前时间为:1520424000186
张杰要准备释放许可证了,当前时间为:1520424001187
李明要准备释放许可证了,当前时间为:1520424001187
王五要准备释放许可证了,当前时间为:1520424001187
当前可使用的许可数为:0
当前可使用的许可数为:0
当前可使用的许可数为:0
pool-1-thread-4:大家好,我是王强我今年45岁当前时间为:1520424001187
pool-1-thread-2:大家好,我是张三我今年41岁当前时间为:1520424001187
pool-1-thread-1:大家好,我是李四我今年23岁当前时间为:1520424001187
李四要准备释放许可证了,当前时间为:1520424002187
王强要准备释放许可证了,当前时间为:1520424002187
当前可使用的许可数为:0
张三要准备释放许可证了,当前时间为:1520424002187
pool-1-thread-5:大家好,我是赵二我今年19岁当前时间为:1520424002187
当前可使用的许可数为:0
当前可使用的许可数为:0
赵二要准备释放许可证了,当前时间为:1520424003188
当前可使用的许可数为:2

以上是非公平信号量,将建立Semaphore对象的语句改为如下语句:

private static final Semaphore semaphore=new Semaphore(3,true);

运行结果:

pool-1-thread-2:大家好,我是王五我今年27岁当前时间为:1520424286454
pool-1-thread-3:大家好,我是张杰我今年33岁当前时间为:1520424286454
pool-1-thread-1:大家好,我是李明我今年26岁当前时间为:1520424286454
pool-1-thread-1:李明要准备释放许可证了,当前时间为:1520424287455
当前可使用的许可数为:0
pool-1-thread-2:王五要准备释放许可证了,当前时间为:1520424287455
pool-1-thread-3:张杰要准备释放许可证了,当前时间为:1520424287455
当前可使用的许可数为:0
当前可使用的许可数为:1
pool-1-thread-1:大家好,我是李四我今年23岁当前时间为:1520424287455
pool-1-thread-5:大家好,我是赵二我今年19岁当前时间为:1520424287455
pool-1-thread-4:大家好,我是王强我今年45岁当前时间为:1520424287455
pool-1-thread-4:王强要准备释放许可证了,当前时间为:1520424288456
当前可使用的许可数为:0
pool-1-thread-1:李四要准备释放许可证了,当前时间为:1520424288456
pool-1-thread-3:大家好,我是张三我今年41岁当前时间为:1520424288456
pool-1-thread-5:赵二要准备释放许可证了,当前时间为:1520424288456
当前可使用的许可数为:0
当前可使用的许可数为:0
pool-1-thread-3:张三要准备释放许可证了,当前时间为:1520424289456
当前可使用的许可数为:2

实现单例模式

将创建信号量对象语句修改如下:

private static final Semaphore semaphore=new Semaphore(1);

运行结果:

pool-1-thread-1:大家好,我是李明我今年26岁当前时间为:1520424379699
pool-1-thread-1:李明要准备释放许可证了,当前时间为:1520424380700
当前可使用的许可数为:0
pool-1-thread-2:大家好,我是王五我今年27岁当前时间为:1520424380700
pool-1-thread-2:王五要准备释放许可证了,当前时间为:1520424381701
当前可使用的许可数为:0
pool-1-thread-3:大家好,我是张杰我今年33岁当前时间为:1520424381701
pool-1-thread-3:张杰要准备释放许可证了,当前时间为:1520424382702
当前可使用的许可数为:0
pool-1-thread-4:大家好,我是王强我今年45岁当前时间为:1520424382702
pool-1-thread-4:王强要准备释放许可证了,当前时间为:1520424383702
当前可使用的许可数为:0
pool-1-thread-5:大家好,我是赵二我今年19岁当前时间为:1520424383702
pool-1-thread-5:赵二要准备释放许可证了,当前时间为:1520424384702
当前可使用的许可数为:0
pool-1-thread-1:大家好,我是李四我今年23岁当前时间为:1520424384702
pool-1-thread-1:李四要准备释放许可证了,当前时间为:1520424385702
当前可使用的许可数为:0
pool-1-thread-2:大家好,我是张三我今年41岁当前时间为:1520424385702
pool-1-thread-2:张三要准备释放许可证了,当前时间为:1520424386703
当前可使用的许可数为:0

如上可知,如果将给定许可数设置为1,就如同一个单例模式。

 

例子二:

假如有一个需求,要读取几万个文件的数据,因为都是IO密集型任务,可以启动几十个线程并发的读取,但是如果读到内存后,还需要存储到数据库中,

而数据库的连接数只有10个,这时必须控制只有十个线程同时获取数据库连接保存数据,否则会报错无法获取数据库连接。

代码如下:

public class SemaphoreTest {

        private static final int THREAD_COUNT = 30;

        private static ExecutorService threadPool = Executors.newFixedThreadPool(THREAD_COUNT);
        private static Semaphore s = new Semaphore(10);
        public static void main(String[] args) {
            for (int i = 0; i < THREAD_COUNT; i++) {
                threadPool.execute(new Runnable() {
                    @Override
                    public void run() {
                        try {
                            s.acquire();
                            System.out.println("save data");
                            s.release();
                        } catch (InterruptedException e) {
                        }
                    }
                });
            }
            threadPool.shutdown();
        }
    }

在代码中,虽然有30个线程在执行,但是只允许10个并发的执行。Semaphore的构造方法Semaphore(int permits) 接受一个整型的数字,表示可用的许可证数量。

Semaphore(10)表示允许10个线程获取许可证,也就是最大并发数是10。

Semaphore的用法也很简单,首先线程使用Semaphore的acquire()获取一个许可证,使用完之后调用release()归还许可证。还可以用tryAcquire()方法尝试获取许可证。

 

以上是关于关于Semaphore的主要内容,如果未能解决你的问题,请参考以下文章

关于Semaphore

用synchronized实现Semaphore

用synchronized实现Semaphore

关于代码片段的时间复杂度

关于片段生命周期

关于js----------------分享前端开发常用代码片段