java同步并发工具类CountDownLatchCyclicBarrier和Semaphore
Posted linghu-java
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了java同步并发工具类CountDownLatchCyclicBarrier和Semaphore相关的知识,希望对你有一定的参考价值。
闭锁CountDownLatch
闭锁是一种同步工具类,可以延迟线程的进度直到其到达终止状态。闭锁的作用相当于一扇门:在闭锁到达结束状态之前,这扇门一直是关闭的,并且没有任何线程能通过,当到达结束状态时,这扇门会打开并允许所有的线程通过。当闭锁到达结束状态后,将不会再改变状态,因此这扇门将永远保持打开状态。闭锁可以用来确保某些活动直到其他活动都完成后才继续执行,例如:
确保某个计算在其需要的所有资源都被初始化之后才继续执行。二元闭锁(包括两个状态)可以用来表示“资源R已经被初始化”,而所有需要R的操作都必须现在这个闭锁上等待。 确保某个服务在其依赖的所有其他服务都已经启动之后才启动。每个服务都有一个相关的二元闭锁。当启动服务S时,将首先在S依赖的其他服务的闭锁上等待,在所有依赖的服务都启动后会释放闭锁S,这样其他依赖S的服务才能继续执行。 等待直到某个操作的所有参与者(例如,在多玩家游戏中的所有玩家)都就绪再继续执行。在这种情况下,当所有玩家都准备就绪时,闭锁将到达结束状态。
CountDownLatch是一种灵活的闭锁实现,可以在上述各种情况中使用,它可以使一个或多个线程等待一组事件发生。闭锁状态包括一个计数器,该计数器被初始化为一个正数,表示需要等待的事件数量。countDown方法递减计数器,表示有一个事件已经发生了,而await方法等待计数器达到0,这表示所有需要等待的事件都已经发生。如果计数器的值非0,那么await会一直阻塞直到计数器为0,或者等待中的线程中断,或者等待超时。·
如下代码展示了CountDownLatch的使用。
1 import java.util.concurrent.CountDownLatch; 2 3 /** 4 * @author bridge 5 */ 6 public class CountDownLatchTest { 7 8 9 public static void main(String[] args) { 10 final CountDownLatch latch = new CountDownLatch(2); 11 new Thread() { 12 public void run() { 13 try { 14 System.out.println("子线程" + Thread.currentThread().getName() + "正在执行"); 15 Thread.sleep(3000); 16 System.out.println("子线程" + Thread.currentThread().getName() + "执行完毕"); 17 latch.countDown(); 18 } catch (InterruptedException e) { 19 e.printStackTrace(); 20 } 21 } 22 }.start(); 23 24 new Thread() { 25 public void run() { 26 try { 27 System.out.println("子线程" + Thread.currentThread().getName() + "正在执行"); 28 Thread.sleep(3000); 29 System.out.println("子线程" + Thread.currentThread().getName() + "执行完毕"); 30 latch.countDown(); 31 } catch (InterruptedException e) { 32 e.printStackTrace(); 33 } 34 } 35 }.start(); 36 37 try { 38 System.out.println("等待2个子线程执行完毕..."); 39 latch.await(); 40 System.out.println("2个子线程已经执行完毕"); 41 System.out.println("继续执行主线程"); 42 } catch (InterruptedException e) { 43 e.printStackTrace(); 44 } 45 } 46 47 48 }
执行结果如下:
1 子线程Thread-0正在执行 2 等待2个子线程执行完毕... 3 子线程Thread-1正在执行 4 子线程Thread-0执行完毕 5 子线程Thread-1执行完毕 6 2个子线程已经执行完毕 7 继续执行主线程
Semaphore
计数信号量(Counting Semaphore)用来控制同时访问某个特定资源的操作数量,或者同时执行某个指定操作的数量。计数信号量还可以用来实现某种资源池,或者对容器施加边界。
Semaphore中管理着一组虚拟的许可,许可的初始数量可通过构造函数来指定,在执行操作时可以首先获得许可,并在使用以后释放许可。如果没有一个许可给信号量,那么acquire将阻塞直到有许可(或者直到被中断或者操作超时)。release方法将返回一个许可给信号量。计算信号量的一种简化形式是二值信号量,即初始化为1的Semaphore。二值信号量可以用作互斥体(mutex),并具备不可重入的加锁语义:谁拥有这个唯一的许可,谁就拥有了互斥锁。
应用场景:
1.Semaphore可以用于实现资源池,例如数据库连接池。可以构造一个固定长度的资源池。将Semaphore的计数值初始化为池的大小,并从池中获取一个资源之前首先调用acquire方法获取一个许可,在将资源返回给池之后调用release释放许可,那么acquire将一直阻塞直到资源池不为空。
2.使用Semaphore将任何一种容器变成有界阻塞容器。信号量的计数值会初始化为容器的最大值。add操作在向底层容器添加一个元素之前,首先要获取一个许可。如果add操作没有添加任何元素,那么会立刻释放许可。同样,remove操作释放一个许可,使更多的元素能够添加到容器中。
1 import java.util.Collections; 2 import java.util.HashSet; 3 import java.util.Set; 4 import java.util.concurrent.Semaphore; 5 6 /** 7 * 信号量实现有界阻塞容器 8 * 9 * @author bridge 10 */ 11 class BoundedHashSet<t> { 12 private final Set<t> set; 13 private final Semaphore sem; 14 15 public BoundedHashSet(int bound) { 16 this.set = Collections.synchronizedSet(new HashSet<t>()); 17 sem = new Semaphore(bound); 18 } 19 20 public boolean add(T o) throws InterruptedException { 21 //获取许可 22 sem.acquire(); 23 boolean wasAdded = false; 24 25 try { 26 wasAdded = set.add(o); 27 return wasAdded; 28 } finally { 29 //如果添加失败,释放许可 30 if (!wasAdded) 31 sem.release(); 32 } 33 34 } 35 36 public boolean remove(Object o) { 37 38 boolean wasRemoved = set.remove(o); 39 //如果移除成功则释放许可 40 if (wasRemoved) 41 sem.release(); 42 return wasRemoved; 43 } 44 45 } 46 47 public class BoundedHashSetTest { 48 49 public static void main(String[] args) { 50 final BoundedHashSet<string> set = new BoundedHashSet<string>(5); 51 new Thread() { 52 @Override 53 public void run() { 54 System.out.println("添加"); 55 try { 56 set.add("A"); 57 set.add("B"); 58 set.add("C"); 59 set.add("D"); 60 set.add("E"); 61 set.add("F"); 62 System.out.println("添加完毕"); 63 } catch (InterruptedException e) { 64 e.printStackTrace(); 65 } 66 } 67 }.start(); 68 try { 69 Thread.sleep(2000); 70 } catch (InterruptedException e) { 71 e.printStackTrace(); 72 } 73 new Thread() { 74 @Override 75 public void run() { 76 System.out.println("移除"); 77 set.remove("A"); 78 // set.remove("B"); 79 // set.remove("F"); 80 System.out.println("移除完毕"); 81 } 82 }.start(); 83 } 84 } 85
运行结果
添加
移除
移除完毕
添加完毕
栅栏(Barrier)类似于闭锁,它能阻塞一组线程直到某个事件发生。栅栏与闭锁的关键区别在于,所有线程必须同时到达栅栏位置,才能继续执行。闭锁用于等待事件,而栅栏用于等待其他线程。栅栏用于实现一些协议,例如几个家庭决定在某个地方集合:”所有人6:00在麦当劳碰头,到了以后要等待其他人,之后再讨论下一步要做的事情。”
CyclicBarrier可以使一定数量的参与方反复地在栅栏位置汇集。当线程到达栅栏位置时将调用await方法,这个方法将阻塞直到所有线程都到达栅栏位置。如果所有线程都到达了栅栏位置,那么栅栏将打开,此时所有线程都被释放,而栅栏将被重置以便下次使用。如果对await的调用超时,或者await阻塞的线程被中断,那么栅栏就被认为是打破了,所有阻塞的await调用都将终止并抛出BrokenBarrierExcepiton。
如果成功地通过栅栏,那么await将为每个线程返回一个唯一的到达索引号,我们可以利用这些索引号来“选举”产生一个领导线程,并在下一次迭代中由该领导线程执行一些特殊的工作。
CyclicBarrier还可以使你将一个栅栏操作传递给构造函数,这是一个Runnable,当成功通过栅栏时会(在一个子任务线程中)执行它,但在阻塞线程被释放之前是不能执行的。
下面的例子展示了CyclicBarrier的使用。
1 import java.util.concurrent.BrokenBarrierException; 2 import java.util.concurrent.CyclicBarrier; 3 4 /** 5 * @author bridge 6 */ 7 public class CyclicBarrierTest { 8 9 public static void main(String[] args) { 10 int N = 4; 11 CyclicBarrier cyclicBarrier = new CyclicBarrier(N); 12 for (int i = 0; i < N; i++) { 13 new Player(cyclicBarrier).start(); 14 } 15 16 } 17 18 19 } 20 21 class Player extends Thread { 22 23 private CyclicBarrier cyclicBarrier; 24 25 public Player(CyclicBarrier cyclicBarrier) { 26 this.cyclicBarrier = cyclicBarrier; 27 } 28 29 @Override 30 public void run() { 31 try { 32 Thread.sleep((long) (5000 * Math.random())); 33 System.out.println(Thread.currentThread().getName() + "准备好了!"); 34 cyclicBarrier.await(); 35 } catch (InterruptedException e) { 36 e.printStackTrace(); 37 } catch (BrokenBarrierException e) { 38 e.printStackTrace(); 39 } 40 System.out.println(Thread.currentThread().getName() + " 起跑!"); 41 } 42 }
以上是关于java同步并发工具类CountDownLatchCyclicBarrier和Semaphore的主要内容,如果未能解决你的问题,请参考以下文章