JAVA多线程提高十:同步工具CyclicBarrier与CountDownLatch
Posted pony1223
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了JAVA多线程提高十:同步工具CyclicBarrier与CountDownLatch相关的知识,希望对你有一定的参考价值。
今天继续学习其它的同步工具:CyclicBarrier与CountDownLatch
一、CyclicBarrier
CyclicBarrier是一个同步辅助类,它允许一组线程互相等待,直到到达某个公共屏障点 (common barrier point)。在涉及一组固定大小的线程的程序中,这些线程必须不时地互相等待,此时 CyclicBarrier 很有用。因为该 barrier 在释放等待线程后可以重用,所以称它为循环 的 barrier。
CyclicBarrier类似于CountDownLatch也是个计数器, 不同的是CyclicBarrier数的是调用了CyclicBarrier.await()进入等待的线程数, 当线程数达到了CyclicBarrier初始时规定的数目时,所有进入等待状态的线程被唤醒并继续。 CyclicBarrier就象它名字的意思一样,可看成是个障碍, 所有的线程必须到齐后才能一起通过这个障碍。 CyclicBarrier初始时还可带一个Runnable的参数,此Runnable任务在CyclicBarrier的数目达到后,所有其它线程被唤醒前被执行。
构造方法摘要:
造方法摘要 |
---|
CyclicBarrier(int parties) 创建一个新的 CyclicBarrier,它将在给定数量的参与者(线程)处于等待状态时启动,但它不会在每个 barrier 上执行预定义的操作。 |
CyclicBarrier(int parties, Runnable barrierAction) 创建一个新的 CyclicBarrier,它将在给定数量的参与者(线程)处于等待状态时启动,并在启动 barrier 时执行给定的屏障操作,该操作由最后一个进入 barrier 的线程执行 |
方法摘要
返回值 | 方法 |
---|---|
int | await() 在所有参与者都已经在此 barrier 上调用 await 方法之前,将一直等待。 |
int | await(long timeout, TimeUnit unit) 在所有参与者都已经在此屏障上调用 await 方法之前,将一直等待。 |
int | getNumberWaiting() 返回当前在屏障处等待的参与者数目。 |
int | getParties() 返回要求启动此 barrier 的参与者数目。 |
boolean | isBroken() 查询此屏障是否处于损坏状态。 |
void | reset() 将屏障重置为其初始状态。 |
代码示例 :
示例一:
import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Semaphore; public class CyclicBarrierTest { public static void main(String[] args) { ExecutorService service = Executors.newCachedThreadPool(); final CyclicBarrier cb = new CyclicBarrier(3);//创建CyclicBarrier对象并设置3个公共屏障点 for(int i=0;i<3;i++){ Runnable runnable = new Runnable(){ public void run(){ try { Thread.sleep((long)(Math.random()*10000)); System.out.println("线程" + Thread.currentThread().getName() + "即将到达集合地点1,当前已有" + cb.getNumberWaiting() + "个已经到达,正在等候"); cb.await();//到此如果没有达到公共屏障点,则该线程处于等待状态,如果达到公共屏障点则所有处于等待的线程都继续往下运行 Thread.sleep((long)(Math.random()*10000)); System.out.println("线程" + Thread.currentThread().getName() + "即将到达集合地点2,当前已有" + cb.getNumberWaiting() + "个已经到达,正在等候"); cb.await(); Thread.sleep((long)(Math.random()*10000)); System.out.println("线程" + Thread.currentThread().getName() + "即将到达集合地点3,当前已有" + cb.getNumberWaiting() + "个已经到达,正在等候"); cb.await(); } catch (Exception e) { e.printStackTrace(); } } }; service.execute(runnable); } service.shutdown(); } }
输出:
线程pool-1-thread-2即将到达集合地点1,当前已有0个已经到达,正在等候 线程pool-1-thread-1即将到达集合地点1,当前已有1个已经到达,正在等候 线程pool-1-thread-3即将到达集合地点1,当前已有2个已经到达,正在等候 线程pool-1-thread-3即将到达集合地点2,当前已有0个已经到达,正在等候 线程pool-1-thread-1即将到达集合地点2,当前已有1个已经到达,正在等候 线程pool-1-thread-2即将到达集合地点2,当前已有2个已经到达,正在等候 线程pool-1-thread-3即将到达集合地点3,当前已有0个已经到达,正在等候 线程pool-1-thread-2即将到达集合地点3,当前已有1个已经到达,正在等候 线程pool-1-thread-1即将到达集合地点3,当前已有2个已经到达,正在等候
示例二:
如果在构造CyclicBarrier对象的时候传了一个Runnable对象进去,则每次到达公共屏障点的时候都最先执行这个传进去的Runnable,然后再执行处于等待的Runnable。如果把上面的例子改成下面这样:
package com.thread; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Semaphore; public class CyclicBarrierTest { public static void main(String[] args) { ExecutorService service = Executors.newCachedThreadPool(); //final CyclicBarrier cb = new CyclicBarrier(3);//创建CyclicBarrier对象并设置3个公共屏障点 final CyclicBarrier cb = new CyclicBarrier(3,new Runnable(){ @Override public void run() { System.out.println("********我最先执行***********"); } }); for(int i=0;i<3;i++){ Runnable runnable = new Runnable(){ public void run(){ try { Thread.sleep((long)(Math.random()*10000)); System.out.println("线程" + Thread.currentThread().getName() + "即将到达集合地点1,当前已有" + cb.getNumberWaiting() + "个已经到达,正在等候"); cb.await();//到此如果没有达到公共屏障点,则该线程处于等待状态,如果达到公共屏障点则所有处于等待的线程都继续往下运行 Thread.sleep((long)(Math.random()*10000)); System.out.println("线程" + Thread.currentThread().getName() + "即将到达集合地点2,当前已有" + cb.getNumberWaiting() + "个已经到达,正在等候"); cb.await(); //这里CyclicBarrier对象又可以重用 Thread.sleep((long)(Math.random()*10000)); System.out.println("线程" + Thread.currentThread().getName() + "即将到达集合地点3,当前已有" + cb.getNumberWaiting() + "个已经到达,正在等候"); cb.await(); } catch (Exception e) { e.printStackTrace(); } } }; service.execute(runnable); } service.shutdown(); } }
结果
线程pool-1-thread-1即将到达集合地点1,当前已有0个已经到达,正在等候 线程pool-1-thread-3即将到达集合地点1,当前已有1个已经到达,正在等候 线程pool-1-thread-2即将到达集合地点1,当前已有2个已经到达,正在等候 ********我最先执行*********** 线程pool-1-thread-1即将到达集合地点2,当前已有0个已经到达,正在等候 线程pool-1-thread-3即将到达集合地点2,当前已有1个已经到达,正在等候 线程pool-1-thread-2即将到达集合地点2,当前已有2个已经到达,正在等候 ********我最先执行*********** 线程pool-1-thread-1即将到达集合地点3,当前已有0个已经到达,正在等候 线程pool-1-thread-3即将到达集合地点3,当前已有1个已经到达,正在等候 线程pool-1-thread-2即将到达集合地点3,当前已有2个已经到达,正在等候 ********我最先执行***********
二、CountDownLatch
一个同步辅助类,在完成一组正在其他线程中执行的操作之前,它允许一个或多个线程一直等待。
用给定的计数 初始化 CountDownLatch。由于调用了 countDown() 方法,所以在当前计数到达零之前,await 方法会一直受阻塞。之后,会释放所有等待的线程,await 的所有后续调用都将立即返回。这种现象只出现一次——计数无法被重置。如果需要重置计数,请考虑使用 CyclicBarrier。
CountDownLatch 是一个通用同步工具,它有很多用途。将计数 1 初始化的 CountDownLatch 用作一个简单的开/关锁存器,或入口:在通过调用 countDown() 的线程打开入口前,所有调用 await 的线程都一直在入口处等待。用 N 初始化的 CountDownLatch 可以使一个线程在 N 个线程完成某项操作之前一直等待,或者使其在某项操作完成 N 次之前一直等待。
CountDownLatch 的一个有用特性是,它不要求调用 countDown 方法的线程等到计数到达零时才继续,而在所有线程都能通过之前,它只是阻止任何线程继续通过一个 await。
构造方法摘要
CountDownLatch(int count) 构造一个用给定计数初始化的 CountDownLatch。
方法摘要
返回值 | 方法 |
---|---|
void | await() 使当前线程在锁存器倒计数至零之前一直等待,除非线程被中断。 |
boolean | await(long timeout, TimeUnit unit) 使当前线程在锁存器倒计数至零之前一直等待,除非线程被中断或超出了指定的等待时间。 |
void | countDown() 递减锁存器的计数,如果计数到达零,则释放所有等待的线程。 |
long | getCount() 返回当前计数。 |
String | toString() 返回标识此锁存器及其状态的字符串。 |
代码示例
一种典型用法是,将一个问题分成 N 个部分,用执行每个部分并让锁存器倒计数的 Runnable 来描述每个部分,然后将所有 Runnable 加入到 Executor 队列。当所有的子部分完成后,协调线程就能够通过 await。(当线程必须用这种方法反复倒计数时,可改为使用 CyclicBarrier。)
示例一:
import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class CountdownLatchTest1 { public static void main(String[] args) { ExecutorService service = Executors. newFixedThreadPool(3); final CountDownLatch latch = new CountDownLatch(3); for (int i = 0; i < 3; i++) { Runnable runnable = new Runnable() { @Override public void run() { try { System. out.println("子线程" + Thread.currentThread().getName() + "开始执行"); Thread. sleep((long) (Math. random() * 10000)); System. out.println("子线程" + Thread.currentThread().getName() + "执行完成"); latch.countDown(); // 当前线程调用此方法,则计数减一 } catch (InterruptedException e) { e.printStackTrace(); } } }; service.execute(runnable); } try { System. out.println("主线程" + Thread.currentThread().getName() + "等待子线程执行完成..." ); latch.await(); // 阻塞当前线程,直到计时器的值为0 System. out.println("主线程" + Thread.currentThread().getName() + "开始执行..."); } catch (InterruptedException e) { e.printStackTrace(); } } }
示例二:百米赛跑,4名运动员选手到达场地等待裁判口令,裁判一声口令,选手听到后同时起跑,当所有选手到达终点,裁判进行汇总汇总排名。
import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class CountdownLatchTest2 { public static void main(String[] args) { ExecutorService service = Executors. newCachedThreadPool(); final CountDownLatch cdOrder = new CountDownLatch(1); final CountDownLatch cdAnswer = new CountDownLatch(4); for (int i = 0; i < 4; i++) { Runnable runnable = new Runnable() { public void run() { try { System. out.println("选手" + Thread.currentThread().getName() + "正等待裁判发布口令"); cdOrder.await(); System. out.println("选手" + Thread.currentThread().getName() + "已接受裁判口令"); Thread. sleep((long) (Math. random() * 10000)); System. out.println("选手" + Thread.currentThread().getName() + "到达终点"); cdAnswer.countDown(); } catch (Exception e) { e.printStackTrace(); } } }; service.execute(runnable); } try { Thread. sleep((long) (Math. random() * 10000)); System. out.println("裁判" + Thread.currentThread ().getName() + "即将发布口令" ); cdOrder.countDown(); System. out.println("裁判" + Thread.currentThread ().getName() + "已发送口令,正在等待所有选手到达终点" ); cdAnswer.await(); System. out.println("所有选手都到达终点" ); System. out.println("裁判" + Thread.currentThread ().getName() + "汇总成绩排名" ); } catch (Exception e) { e.printStackTrace(); } service.shutdown(); } }
三、CountDownLatch与CyclicBarrier对比
CountDownLatch | CyclicBarrier |
---|---|
减计数方式 | 加计数方式 |
计算为0时释放所有等待的线程 | 计数达到指定值时释放所有等待线程 |
计数为0时,无法重置 | 计数达到指定值时,计数置为0重新开始 |
调用countDown()方法计数减一,调用await()方法只进行阻塞,对计数没任何影响 | 调用await()方法计数加1,若加1后的值不等于构造方法的值,则线程阻塞 |
不可重复利用 | 可重复利用 |
其它学习资料:
CyclicBarrier的用法
CountDownLatch(倒计时计数器)使用说明
CyclicBarrier和CountDownLatch区别
参考:
《Java多线程与并发库高级应用》张孝祥
以上是关于JAVA多线程提高十:同步工具CyclicBarrier与CountDownLatch的主要内容,如果未能解决你的问题,请参考以下文章