java同步并发工具类CountDownLatchCyclicBarrier和Semaphore

Posted linghu-java

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了java同步并发工具类CountDownLatchCyclicBarrier和Semaphore相关的知识,希望对你有一定的参考价值。

闭锁CountDownLatch

  闭锁是一种同步工具类,可以延迟线程的进度直到其到达终止状态。闭锁的作用相当于一扇门:在闭锁到达结束状态之前,这扇门一直是关闭的,并且没有任何线程能通过,当到达结束状态时,这扇门会打开并允许所有的线程通过。当闭锁到达结束状态后,将不会再改变状态,因此这扇门将永远保持打开状态。闭锁可以用来确保某些活动直到其他活动都完成后才继续执行,例如:

确保某个计算在其需要的所有资源都被初始化之后才继续执行。二元闭锁(包括两个状态)可以用来表示“资源R已经被初始化”,而所有需要R的操作都必须现在这个闭锁上等待。 确保某个服务在其依赖的所有其他服务都已经启动之后才启动。每个服务都有一个相关的二元闭锁。当启动服务S时,将首先在S依赖的其他服务的闭锁上等待,在所有依赖的服务都启动后会释放闭锁S,这样其他依赖S的服务才能继续执行。 等待直到某个操作的所有参与者(例如,在多玩家游戏中的所有玩家)都就绪再继续执行。在这种情况下,当所有玩家都准备就绪时,闭锁将到达结束状态。

  CountDownLatch是一种灵活的闭锁实现,可以在上述各种情况中使用,它可以使一个或多个线程等待一组事件发生。闭锁状态包括一个计数器,该计数器被初始化为一个正数,表示需要等待的事件数量。countDown方法递减计数器,表示有一个事件已经发生了,而await方法等待计数器达到0,这表示所有需要等待的事件都已经发生。如果计数器的值非0,那么await会一直阻塞直到计数器为0,或者等待中的线程中断,或者等待超时。·

如下代码展示了CountDownLatch的使用。

 1 import java.util.concurrent.CountDownLatch;
 2  
 3 /**
 4  * @author bridge
 5  */
 6 public class CountDownLatchTest {
 7  
 8  
 9     public static void main(String[] args) {
10         final CountDownLatch latch = new CountDownLatch(2);
11         new Thread() {
12             public void run() {
13                 try {
14                     System.out.println("子线程" + Thread.currentThread().getName() + "正在执行");
15                     Thread.sleep(3000);
16                     System.out.println("子线程" + Thread.currentThread().getName() + "执行完毕");
17                     latch.countDown();
18                 } catch (InterruptedException e) {
19                     e.printStackTrace();
20                 }
21             }
22         }.start();
23  
24         new Thread() {
25             public void run() {
26                 try {
27                     System.out.println("子线程" + Thread.currentThread().getName() + "正在执行");
28                     Thread.sleep(3000);
29                     System.out.println("子线程" + Thread.currentThread().getName() + "执行完毕");
30                     latch.countDown();
31                 } catch (InterruptedException e) {
32                     e.printStackTrace();
33                 }
34             }
35         }.start();
36  
37         try {
38             System.out.println("等待2个子线程执行完毕...");
39             latch.await();
40             System.out.println("2个子线程已经执行完毕");
41             System.out.println("继续执行主线程");
42         } catch (InterruptedException e) {
43             e.printStackTrace();
44         }
45     }
46  
47  
48 }

执行结果如下:

1 子线程Thread-0正在执行
2 等待2个子线程执行完毕...
3 子线程Thread-1正在执行
4 子线程Thread-0执行完毕
5 子线程Thread-1执行完毕
6 2个子线程已经执行完毕
7 继续执行主线程

 

 

Semaphore

 

计数信号量(Counting Semaphore)用来控制同时访问某个特定资源的操作数量,或者同时执行某个指定操作的数量。计数信号量还可以用来实现某种资源池,或者对容器施加边界。

Semaphore中管理着一组虚拟的许可,许可的初始数量可通过构造函数来指定,在执行操作时可以首先获得许可,并在使用以后释放许可。如果没有一个许可给信号量,那么acquire将阻塞直到有许可(或者直到被中断或者操作超时)。release方法将返回一个许可给信号量。计算信号量的一种简化形式是二值信号量,即初始化为1的Semaphore。二值信号量可以用作互斥体(mutex),并具备不可重入的加锁语义:谁拥有这个唯一的许可,谁就拥有了互斥锁。

应用场景:

1.Semaphore可以用于实现资源池,例如数据库连接池。可以构造一个固定长度的资源池。将Semaphore的计数值初始化为池的大小,并从池中获取一个资源之前首先调用acquire方法获取一个许可,在将资源返回给池之后调用release释放许可,那么acquire将一直阻塞直到资源池不为空。

2.使用Semaphore将任何一种容器变成有界阻塞容器。信号量的计数值会初始化为容器的最大值。add操作在向底层容器添加一个元素之前,首先要获取一个许可。如果add操作没有添加任何元素,那么会立刻释放许可。同样,remove操作释放一个许可,使更多的元素能够添加到容器中。

  

 1 import java.util.Collections;
 2 import java.util.HashSet;
 3 import java.util.Set;
 4 import java.util.concurrent.Semaphore;
 5  
 6 /**
 7  * 信号量实现有界阻塞容器
 8  *
 9  * @author bridge
10  */
11 class BoundedHashSet<t> {
12     private final Set<t> set;
13     private final Semaphore sem;
14  
15     public BoundedHashSet(int bound) {
16         this.set = Collections.synchronizedSet(new HashSet<t>());
17         sem = new Semaphore(bound);
18     }
19  
20     public boolean add(T o) throws InterruptedException {
21         //获取许可
22         sem.acquire();
23         boolean wasAdded = false;
24  
25         try {
26             wasAdded = set.add(o);
27             return wasAdded;
28         } finally {
29             //如果添加失败,释放许可
30             if (!wasAdded)
31                 sem.release();
32         }
33  
34     }
35  
36     public boolean remove(Object o) {
37  
38         boolean wasRemoved = set.remove(o);
39         //如果移除成功则释放许可
40         if (wasRemoved)
41             sem.release();
42         return wasRemoved;
43     }
44  
45 }
46  
47 public class BoundedHashSetTest {
48  
49     public static void main(String[] args) {
50         final BoundedHashSet<string> set = new BoundedHashSet<string>(5);
51         new Thread() {
52             @Override
53             public void run() {
54                 System.out.println("添加");
55                 try {
56                     set.add("A");
57                     set.add("B");
58                     set.add("C");
59                     set.add("D");
60                     set.add("E");
61                     set.add("F");
62                     System.out.println("添加完毕");
63                 } catch (InterruptedException e) {
64                     e.printStackTrace();
65                 }
66             }
67         }.start();
68         try {
69             Thread.sleep(2000);
70         } catch (InterruptedException e) {
71             e.printStackTrace();
72         }
73         new Thread() {
74             @Override
75             public void run() {
76                 System.out.println("移除");
77                 set.remove("A");
78 //                set.remove("B");
79 //                set.remove("F");
80                 System.out.println("移除完毕");
81             }
82         }.start();
83     }
84 }
85 

 

运行结果

添加

移除
移除完毕
添加完毕
 
 

CyclicBarrier

 

栅栏(Barrier)类似于闭锁,它能阻塞一组线程直到某个事件发生。栅栏与闭锁的关键区别在于,所有线程必须同时到达栅栏位置,才能继续执行。闭锁用于等待事件,而栅栏用于等待其他线程。栅栏用于实现一些协议,例如几个家庭决定在某个地方集合:”所有人6:00在麦当劳碰头,到了以后要等待其他人,之后再讨论下一步要做的事情。”

CyclicBarrier可以使一定数量的参与方反复地在栅栏位置汇集。当线程到达栅栏位置时将调用await方法,这个方法将阻塞直到所有线程都到达栅栏位置。如果所有线程都到达了栅栏位置,那么栅栏将打开,此时所有线程都被释放,而栅栏将被重置以便下次使用。如果对await的调用超时,或者await阻塞的线程被中断,那么栅栏就被认为是打破了,所有阻塞的await调用都将终止并抛出BrokenBarrierExcepiton。

如果成功地通过栅栏,那么await将为每个线程返回一个唯一的到达索引号,我们可以利用这些索引号来“选举”产生一个领导线程,并在下一次迭代中由该领导线程执行一些特殊的工作。

CyclicBarrier还可以使你将一个栅栏操作传递给构造函数,这是一个Runnable,当成功通过栅栏时会(在一个子任务线程中)执行它,但在阻塞线程被释放之前是不能执行的。

下面的例子展示了CyclicBarrier的使用。

 

 1 import java.util.concurrent.BrokenBarrierException;
 2 import java.util.concurrent.CyclicBarrier;
 3  
 4 /**
 5  * @author bridge
 6  */
 7 public class CyclicBarrierTest {
 8  
 9     public static void main(String[] args) {
10         int N = 4;
11         CyclicBarrier cyclicBarrier = new CyclicBarrier(N);
12         for (int i = 0; i < N; i++) {
13             new Player(cyclicBarrier).start();
14         }
15  
16     }
17  
18  
19 }
20  

21 class Player extends Thread {
22  
23     private CyclicBarrier cyclicBarrier;
24  
25     public Player(CyclicBarrier cyclicBarrier) {
26         this.cyclicBarrier = cyclicBarrier;
27     }
28  
29     @Override
30     public void run() {
31         try {
32             Thread.sleep((long) (5000 * Math.random()));
33             System.out.println(Thread.currentThread().getName() + "准备好了!");
34             cyclicBarrier.await();
35         } catch (InterruptedException e) {
36             e.printStackTrace();
37         } catch (BrokenBarrierException e) {
38             e.printStackTrace();
39         }
40         System.out.println(Thread.currentThread().getName() + " 起跑!");
41     }
42 }

  

 

执行结果

 

Thread-1准备好了!
Thread-3准备好了!
Thread-2准备好了!
Thread-0准备好了!
Thread-0 起跑!
Thread-1 起跑!
Thread-2 起跑!
Thread-3 起跑!

 

 





以上是关于java同步并发工具类CountDownLatchCyclicBarrier和Semaphore的主要内容,如果未能解决你的问题,请参考以下文章

JAVA并发同步工具类

java并发编程CountDownLatch类源码解析

java并发之同步辅助类CountDownLatch

Java并发和高并发学习总结- J.U.C之工具类

Java并发之CountDownLatch

Java并发编程--CountDownLatch