一、Semaphore功能介绍
Semaphore类相当于线程计数器,在获取Semaphore对象时设定可以产生的线程总数(线程并不是Semaphore类生成的,它只是统计线程的数量),创建Semaphore类对象如下方法所示:
//创建一个Semaphore对象,Sync sync对象赋值为NonfairSync对象 Semaphore sp = new Semaphore(1); //创建一个Semaphore对象,Sync sync对象赋值为FairSync对象 Semaphore sp = new Semaphore(1,true);
在创建线程以前调用Semaphore类的acquire()方法来判断是否还可以创建线程,acquire()方法每调用一次当前可创建的线程总数减一,并且这个方法是一个阻塞式的方法,如果当前线程数量已经达到上限线程会被阻塞,当满足创建线程的条件时程序就会继续,在线程运行结束以后调用Semaphore类release()方法来释放占用的可创建线程的数量。
结论:Semaphore类可以控制并发情况下创建的线程总数
二、Semaphore类方法分解
如下是Semaphore类的构造方法:
public Semaphore(int permits) { sync = new NonfairSync(permits); } public Semaphore(int permits, boolean fair) { sync = fair ? new FairSync(permits) : new NonfairSync(permits); }
如下是对NonfairSync类和FairSync类的源码,从代码看似乎两个类对tryAcquireShared(int acquires)方法的实现完全不同,其实它们的实现基本相同,NonfairSync类调用的父类的nonfairTryAcquireShared(acquires)方法,此方法的实现如下所示,对比来看区别在于FairSync类在方法入口调用了hasQueuedPredecessors()方法添加了if判断,hasQueuedPredecessors代码如下所示。
/** * NonFair version */ static final class NonfairSync extends Sync { private static final long serialVersionUID = -2694183684443567898L; NonfairSync(int permits) { super(permits); } protected int tryAcquireShared(int acquires) { return nonfairTryAcquireShared(acquires); } } /** * Fair version */ static final class FairSync extends Sync { private static final long serialVersionUID = 2014338818796000944L; FairSync(int permits) { super(permits); } protected int tryAcquireShared(int acquires) { for (;;) { if (hasQueuedPredecessors()) return -1; int available = getState(); int remaining = available - acquires; if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } } }
abstract static class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = 1192457210091910933L; final int nonfairTryAcquireShared(int acquires) { for (;;) { int available = getState(); int remaining = available - acquires; if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } } }
public final boolean hasQueuedPredecessors() { // The correctness of this depends on head being initialized // before tail and on head.next being accurate if the current // thread is first in queue. Node t = tail; // Read fields in reverse initialization order Node h = head; Node s; return h != t && ((s = h.next) == null || s.thread != Thread.currentThread()); }
1、acquire()/acquire(int)方法介绍
如下所示,acquire()方法调用的是父类的acquireSharedInterruptibly(int arg)方法,这个方法调用子类的tryAcquireShared(int arg)如果没有线程数达到上限时则执行doAcquireSharedInterruptibly(arg),如下所示这个方法里面有一个死循环,当可创建的线程数量满足参数arg时,跳出死循环,创建线程的代码继续。
结论:acquire()是一个阻塞式的方法,从此信号量获取给定数目的许可,在提供这些许可前一直将线程阻塞,或者当前线程中断时抛出InterruptedException异常,中断阻塞。
。
public void acquire() throws InterruptedException { sync.acquireSharedInterruptibly(1); }
public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (tryAcquireShared(arg) < 0) doAcquireSharedInterruptibly(arg); } private void doAcquireSharedInterruptibly(int arg) throws InterruptedException { final Node node = addWaiter(Node.SHARED); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GC failed = false; return; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }
2、acquireUninterruptibly()/acquireUninterruptibly()方法介绍
这两个方法和acquire()的两个方法基本是一样的,唯一不同是,这两个调用的方法acquireShared(int)没有了当前线程是否中断的if判断并且当前这个方法不抛InterruptedException异常,所以在当前线程被中断时当前阻塞的方法不会中断。
结论:acquireUninterruptibly是一个阻塞式的方法,从此信号量获取给定数目的许可,在提供这些许可前一直将线程阻塞。
public final void acquireShared(int arg) { if (tryAcquireShared(arg) < 0) doAcquireShared(arg); }
三、样例演示
如下代码是一个简单的样例,运行下面代码,从打印信息的顺序就可以验证获取信号量的方法是一个阻塞时的,其它方法的功能验证,网友自己完成吧!
public class ThreadTest { public static void main(String[] args) throws Exception { semaphoreTest(); } public static void semaphoreTest() throws InterruptedException { final Semaphore semaphore = new Semaphore(1); System.out.println("1"); semaphore.acquire(); Thread t1 = new Thread() { @Override public void run() { try { sleep(3000L); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("释放"); semaphore.release(); } }; t1.start(); semaphore.acquire(); System.out.println("2"); } }