Java并发编程--Semaphore

Posted 在周末

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Java并发编程--Semaphore相关的知识,希望对你有一定的参考价值。

概述

  信号量(Semaphore)控制同时访问资源的线程数量,支持公平和非公平两种方式获取许可。

使用

  提供的方法

 1 public Semaphore(int permits)    //permits为许可数,默认非公平方式
 2 public Semaphore(int permits, boolean fair)
 3 
 4 //获取一个许可。若获取成功,permits-1,直接返回;否则当前线程阻塞直到有permits被释放,除非线程被中断
 5 //如果线程被中断,则抛出 InterruptedException,并且清除当前线程的已中断状态。 
 6 public void acquire() throws InterruptedException
 7 //忽略中断
 8 public void acquireUninterruptibly()
 9 //尝试获取一个许可,成功返回true,否则返回false。
10 //即使已将此信号量设置为使用公平排序策略,但是调用 tryAcquire() 也将 立即获取许可(如果有一个可用),而不管当前是否有正在等待的线程。
11 public boolean tryAcquire()
12 //超时尝试获取一个许可,该方法遵循公平设置
13 public boolean tryAcquire(long timeout, TimeUnit unit)
14 //释放一个许可
15 public void release()
16 
17 //以上方法都是获取或释放一个许可,每个方法都存在对应的获取或释放指定个数许可的方法。例如public boolean tryAcquire(int permits)

  使用示例:

    使用信号量实现对内容池(例如线程池)的访问。

 1 class Pool {
 2     private static final int MAX_AVAILABLE = 100;    //许可数为100,在本例中也是内容池的item的个数。
 3     private final Semaphore available = new Semaphore(MAX_AVAILABLE, true);
 4 
 5     //获取池中的一个item,首先从Semaphore获取许可;获取许可成功后,从池中获取一个可用的item,并把该item标记为已使用。
 6     public Object getItem() throws InterruptedException {
 7         available.acquire();
 8         return getNextAvailableItem();
 9     }
10 
11     //将指定的item释放到池中,如果markAsUnused返回true,释放Semaphore的一个许可。
12     public void putItem(Object x) {
13         if (markAsUnused(x))
14             available.release();
15     }
16 
17     // Not a particularly efficient data structure; just for demo
18 
19     protected Object[] items = ... whatever kinds of items being managed    //内容池,例如:连接池,每个item代表一个连接。
20     protected boolean[] used = new boolean[MAX_AVAILABLE];    //标记池中的每个item是否已经被占用
21 
22     protected synchronized Object getNextAvailableItem() {
23         for (int i = 0; i < MAX_AVAILABLE; ++i) {
24             if (!used[i]) {
25                 used[i] = true;
26                 return items[i];
27             }
28         }
29         return null; // not reached
30     }
31 
32     protected synchronized boolean markAsUnused(Object item) {
33         for (int i = 0; i < MAX_AVAILABLE; ++i) {
34             if (item == items[i]) {
35                 if (used[i]) {
36                     used[i] = false;
37                     return true;
38                 } else
39                     return false;
40             }
41         }
42         return false;
43     }
44 }

实现原理

  基于AQS实现,用同步状态(state)表示许可数(permits),使用AQS的共享式获取和释放同步状态来实现permits的获取和释放。

  域

1 private final Sync sync;

 

    Sync是Semaphore的抽象内部类,继承了AQS。它有两个子类NonfairSync和FairSync,分别是非公平同步器和公平同步器。

    Sync的源码:

abstract static class Sync extends AbstractQueuedSynchronizer {
    private static final long serialVersionUID = 1192457210091910933L;

    Sync(int permits) {
        setState(permits);
    }

    final int getPermits() {
        return getState();
    }
    
    //非公平共享式获取同步状态。自旋CAS获取同步状态直到成功或许可不足。
    //返回值语义:负数代表获取失败、0代表获取成功但没有剩余资源、正数代表获取成功,还有剩余资源。
    final int nonfairTryAcquireShared(int acquires) {
        for (;;) {
            int available = getState();
            int remaining = available - acquires;
            if (remaining < 0 ||
                compareAndSetState(available, remaining)) //此处CAS时有可能同步队列中已有等待的线程,就导致的不公平性
                return remaining;
        }
    }
    
    //自定义共享式释放同步状态。自旋CAS释放同步状态直到成功,除非overflow
    //返回值语义:true表示成功,不可能释放失败,除非overflow
    protected final boolean tryReleaseShared(int releases) {
        for (;;) {
            int current = getState();
            int next = current + releases;
            if (next < current) // overflow
                throw new Error("Maximum permit count exceeded");
            if (compareAndSetState(current, next))
                return true;
        }
    }

    final void reducePermits(int reductions) {
        for (;;) {
            int current = getState();
            int next = current - reductions;
            if (next > current) // underflow
                throw new Error("Permit count underflow");
            if (compareAndSetState(current, next))
                return;
        }
    }

    final int drainPermits() {
        for (;;) {
            int current = getState();
            if (current == 0 || compareAndSetState(current, 0))
                return current;
        }
    }
}

 

    NonfairSync(非公平)源码:

static final class NonfairSync extends Sync {
    private static final long serialVersionUID = -2694183684443567898L;

    NonfairSync(int permits) {
        super(permits);
    }

    protected int tryAcquireShared(int acquires) {
        return nonfairTryAcquireShared(acquires);
    }
}

    FairSync(公平)源码:

 1 static final class FairSync extends Sync {
 2     private static final long serialVersionUID = 2014338818796000944L;
 3 
 4     FairSync(int permits) {
 5         super(permits);
 6     }
 7     //自定义共享式释放同步状态。
 8     protected int tryAcquireShared(int acquires) {
 9         for (;;) {
10             //首先检查同步队列中是否有前驱。如果有则返回失败,将当前线程加入到同步队列的尾部,保证先尝试获取同步状态的线程先成功。
11             if (hasQueuedPredecessors())
12                 return -1;
13             int available = getState();
14             int remaining = available - acquires;
15             if (remaining < 0 ||
16                 compareAndSetState(available, remaining))
17                 return remaining;
18         }
19     }
20 }

  方法

    除tryAcquire外,都是通过调用AQS提供的方法实现获取失败时的阻塞和唤醒机制,具体策略建AQS的源码。

 1 //阻塞式获取一个许可,响应中断
 2 public void acquire() throws InterruptedException {
 3     sync.acquireSharedInterruptibly(1);    //调用AQS提供的可响应中断共享式获取同步状态方法
 4 }
 5 
 6 //阻塞式获取一个许可,忽略中断
 7 public void acquireUninterruptibly() {
 8     sync.acquireShared(1);
 9 }
10 
11 //非阻塞式获取一个许可
12 public boolean tryAcquire() {
13     return sync.nonfairTryAcquireShared(1) >= 0;
14 }
15 
16 //释放一个许可
17 public void release() {
18     sync.releaseShared(1);
19 }

 

 

参考资料

  JDK DOC

  《Java并发编程的艺术》

 

以上是关于Java并发编程--Semaphore的主要内容,如果未能解决你的问题,请参考以下文章

Java并发编程-Semaphore

并发编程Semaphore详解

Java并发多线程编程——并发工具类Semaphore(信号量)

Java并发编程:CountDownLatchCyclicBarrier和Semaphore

Java并发编程:CountDownLatchCyclicBarrier和 Semaphore

Java并发编程:CountDownLatchCyclicBarrier和 Semaphore