Java并发编程-Semaphore

Posted lgjava

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Java并发编程-Semaphore相关的知识,希望对你有一定的参考价值。

基于AQS的前世今生,来学习并发工具类Semaphore。本文将从Semaphore的应用场景、源码原理解析来学习这个并发工具类。

1、 应用场景

  Semaphore用来控制同时访问某个特定资源的操作数量,或者同时执行某个指定操作的数量。还可以用来实现某种资源池限制,或者对容器施加边界。

1.1   当成锁使用

  控制同时访问某个特定资源的操作数量,代码如下:

 1 public class SemaphoreLock 
 2     public static void main(String[] args) 
 3         //1、信号量为1时 相当于普通的锁  信号量大于1时 共享锁
 4         Output o = new Output();
 5         for (int i = 0; i < 5; i++) 
 6             new Thread(() -> o.output()).start();
 7         
 8     
 9 
10 class Output 
11     Semaphore semaphore = new Semaphore(1);
12 
13     public void output() 
14         try 
15             semaphore.acquire();
16             System.out.println(Thread.currentThread().getName() + " start at " + System.currentTimeMillis());
17             Thread.sleep(1000);
18             System.out.println(Thread.currentThread().getName() + " stop at " + System.currentTimeMillis());
19         catch(Exception e) 
20             e.printStackTrace();
21         finally 
22             semaphore.release();
23         
24     
25 

1.2   线程通信信号

  线程间通信,代码如下:

 1 public class SemaphoreCommunication 
 2     public static void main(String[] args) 
 3         //2、线程间进行通信
 4         Semaphore semaphore = new Semaphore(1);
 5         new SendingThread(semaphore,"SendingThread");
 6         new ReceivingThread(semaphore,"ReceivingThread");
 7     
 8 
 9 class SendingThread extends Thread 
10     Semaphore semaphore;
11     String name;
12 
13     public SendingThread(Semaphore semaphore,String name) 
14         this.semaphore = semaphore;
15         this.name = name;
16         new Thread(this).start();
17     
18 
19     public void run() 
20         try 
21             semaphore.acquire();
22             for (int i = 0; i < 5; i++) 
23                 System.out.println(name + ":" + i);
24                 Thread.sleep(1000);
25             
26          catch (Exception e) 
27             e.printStackTrace();
28         
29         semaphore.release();
30     
31 
32 
33 class ReceivingThread extends Thread 
34     Semaphore semaphore;
35     String name;
36 
37     public ReceivingThread(Semaphore semaphore,String name) 
38         this.semaphore = semaphore;
39         this.name = name;
40         new Thread(this).start();
41     
42 
43     public void run() 
44         try 
45             semaphore.acquire();
46             for (int i = 0; i < 5; i++) 
47                 System.out.println(name + ":" + i);
48                 Thread.sleep(1000);
49             
50          catch (Exception e) 
51             e.printStackTrace();
52         
53         semaphore.release();
54     
55 

1.3   资源池限制

  对资源池进行资源限制,代码如下:

 1 public class SemaphoreConnect 
 2     public static void main(String[] args) throws Exception 
 3         //3、模拟连接池数量限制
 4         ExecutorService executorService = Executors.newCachedThreadPool();
 5         for (int i = 0; i < 200; i++) 
 6             executorService.submit(new Runnable() 
 7                 @Override
 8                 public void run() 
 9                     Connection.getInstance().connect();
10                 
11             );
12         
13         executorService.shutdown();
14         executorService.awaitTermination(1, TimeUnit.DAYS);
15     
16 
17 class Connection 
18     private static Connection instance = new Connection();
19     private Semaphore semaphores = new Semaphore(10,true);
20     private int connections = 0;
21 
22     private Connection() 
23     
24 
25     public static Connection getInstance() 
26         return instance;
27     
28 
29     public void connect() 
30         try 
31             semaphores.acquire();
32             doConnect();
33          catch (InterruptedException e) 
34             e.printStackTrace();
35         finally 
36             semaphores.release();
37         
38     
39 
40     private void doConnect() 
41         synchronized (this) 
42             connections ++;
43             System.out.println("current get connections is : " + connections);
44         
45 
46         try 
47             Thread.sleep(2000);
48          catch (InterruptedException e) 
49             e.printStackTrace();
50         
51 
52         synchronized (this) 
53             connections --;
54             System.out.println("after release current  connections is : " + connections);
55         
56     
57 

1.4  容器边界限制

  对容器进行边界限制,代码如下:

 1 public class SemaphoreBoundedList 
 2     public static void main(String[] args) 
 3         //4、容器边界限制
 4         final BoundedList ba = new BoundedList(5);
 5         Runnable runnable1 = new Runnable() 
 6                 public void run() 
 7                     try 
 8                         ba.add("John");
 9                         ba.add("Martin");
10                         ba.add("Adam");
11                         ba.add("Prince");
12                         ba.add("Tod");
13                         System.out.println("Available Permits : " + ba.getSemaphore().availablePermits());
14                         ba.add("Tony");
15                         System.out.println("Final list: " + ba.getArrayList());
16                     catch (InterruptedException ie) 
17                         Thread.interrupted();
18                     
19                 
20         ;
21         Runnable runnable2 = new Runnable() 
22             public void run() 
23                 try 
24                     System.out.println("Before removing elements: "+ ba.getArrayList());
25                     Thread.sleep(5000);
26                     ba.remove("Martin");
27                     ba.remove("Adam");
28                 catch (InterruptedException ie) 
29                     Thread.interrupted();
30                 
31             
32         ;
33         Thread thread1 = new Thread(runnable1);
34         Thread thread2 = new Thread(runnable2);
35         thread1.start();
36         thread2.start();
37     
38 
39 class BoundedList<T> 
40     private final Semaphore semaphore;
41     private List arrayList;
42 
43     BoundedList(int limit) 
44         this.arrayList = Collections.synchronizedList(new ArrayList());
45         this.semaphore = new Semaphore(limit);
46     
47 
48 
49     public boolean add(T t) throws InterruptedException 
50         boolean added = false;
51         semaphore.acquire();
52         try 
53             added = arrayList.add(t);
54             return added;
55          finally 
56             if (!added)
57                 semaphore.release();
58         
59 
60     
61 
62 
63     public boolean remove(T t) 
64         boolean wasRemoved = arrayList.remove(t);
65         if (wasRemoved)
66             semaphore.release();
67         return wasRemoved;
68     
69 
70     public void remove(int index) 
71         arrayList.remove(index);
72         semaphore.release();
73     
74 
75     public List getArrayList() 
76         return arrayList;
77     
78 
79 
80     public Semaphore getSemaphore() 
81         return semaphore;
82     
83 

2、 源码原理解析

2.1 获取信号

  获取信号的方法如下:

public void acquire() throws InterruptedException 
   sync.acquireSharedInterruptibly(1);//共享式获取AQS的同步状态

调用的是AQS的acquireSharedInterruptibly方法:

public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException 
        if (Thread.interrupted())//线程中断 说明信号量对线程中断敏感
            throw new InterruptedException();
        if (tryAcquireShared(arg) < 0) //获取信号量失败 线程进入同步队列自旋等待
            doAcquireSharedInterruptibly(arg);
    

其中tryAcquireShared依赖的是Sync的实现,Sync提供了公平和非公平式的方式,先看非公平式。

protected int tryAcquireShared(int acquires) 
            return nonfairTryAcquireShared(acquires);
        
final int nonfairTryAcquireShared(int acquires) 
            for (;;) 
                int available = getState();//同步状态 当前的信号量许可数
                int remaining = available - acquires;//减去释放的信号量 剩余信号量许可数
                if (remaining < 0 ||//剩余信号量小于0 直接返回remaining 不做CAS
                    compareAndSetState(available, remaining))//CAS更新
                    return remaining;
            
        

再看下公平式的。

protected int tryAcquireShared(int acquires) 
            for (;;) 
                if (hasQueuedPredecessors())//判断同步队列如果存在前置节点 获取信号量失败  其他和非公平式是一致的
                    return -1;
                int available = getState();
                int remaining = available - acquires;
                if (remaining < 0 ||
                    compareAndSetState(available, remaining))
                    return remaining;
            
        

最后来看下,如果未获取到信号量的处理方法doAcquireSharedInterruptibly。

 1 private void doAcquireSharedInterruptibly(int arg)
 2         throws InterruptedException 
 3         final Node node = addWaiter(Node.SHARED);//线程进入同步队列
 4         boolean failed = true;
 5         try 
 6             for (;;) //自旋
 7                 final Node p = node.predecessor();
 8                 if (p == head) //当前节点的前置节点是AQS的头节点 即自己是AQS同步队列的第一个节点
 9                     int r = tryAcquireShared(arg); //再去获取信号量
10                     if (r >= 0) //获取成功
11                         setHeadAndPropagate(node, r);//退出自旋
12                         p.next = null; // help GC
13                         failed = false;
14                         return;
15                     
16                 
17                 if (shouldParkAfterFailedAcquire(p, node) &&
18                     parkAndCheckInterrupt())
19                     throw new InterruptedException();
20             
21          finally 
22             if (failed)
23                 cancelAcquire(node); //获取失败 就取消获取
24         
25     

2.2 释放信号

  释放信号的方法如下:

public void release() 
        sync.releaseShared(1);
    

调用的是AQS的releaseShared方法:

public final boolean releaseShared(int arg) 
        if (tryReleaseShared(arg)) //释放信号量
            doReleaseShared();//唤醒后续的线程节点
            return true;
        
        return false;

tryReleaseShared交由子类Sync实现,代码如下:

protected final boolean tryReleaseShared(int releases) 
            for (;;) 
                int current = getState();//当前信号量许可数
                int next = current + releases; //当前信号量许可数+释放的信号量许可数
                if (next < current) // overflow 这个分支我看着永远走不进来呢
                    throw new Error("Maximum permit count exceeded");
                if (compareAndSetState(current, next))//CAS更新当前信号量许可数
                    return true;
            
        

释放成功后,则继续调用doReleaseShared,唤醒后续线程节点可以来争取信号量了。

 1 private void doReleaseShared() 
 2         for (;;) 
 3             Node h = head; //头节点
 4             if (h != null && h != tail) //同步队列中存在线程等待
 5                 int ws = h.waitStatus; //头节点线程状态
 6                 if (ws == Node.SIGNAL) //头节点线程状态为SIGNAL 唤醒后续线程节点
 7                     if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
 8                         continue;            // loop to recheck cases
 9                     unparkSuccessor(h); //唤醒下个节点
10                 
11                 else if (ws == 0 &&
12                          !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
13                     continue;                // loop on failed CAS
14             
15             if (h == head)                   // loop if head changed
16                 break;
17         
18     

总结:Semaphore使用AQS同步状态来保存信号量的当前计数。它里面定义的acquireSharedInterruptibly方法会减少计数,当计数为非正值时阻塞线程,releaseShared方法会增加计数,在计数不超过信号量限制时要解除线程的阻塞。

参考资料:

https://github.com/lingjiango/ConcurrentProgramPractice

https://www.caveofprogramming.com/java-multithreading/java-multithreading-semaphores-part-12.html

https://java2blog.com/java-semaphore-example/

http://tutorials.jenkov.com/java-util-concurrent/semaphore.html

以上是关于Java并发编程-Semaphore的主要内容,如果未能解决你的问题,请参考以下文章

Java并发编程-Semaphore

Java并发编程-Semaphore

Java并发多线程编程——Semaphore

Java并发编程:CountDownLatchCyclicBarrier和 Semaphore

Java并发编程:CountDownLatchCyclicBarrier和 Semaphore

Java并发编程:CountDownLatchCyclicBarrier和Semaphore