JAVA 多线程(10):join 的哥们和朋友 countDownLatchCyclicBarrierSemaphoreExchanger
Posted jony-it
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了JAVA 多线程(10):join 的哥们和朋友 countDownLatchCyclicBarrierSemaphoreExchanger相关的知识,希望对你有一定的参考价值。
Join 方法可以使当前线程等待子线程,如果子线程未结束,则会一致处在wait状态。
因为其内部是通过wait 方法实现的,当执行完毕后会调用notifyAll 释放锁。
CountDownLatch 允许一个或多个线程等待其他线程完成操作,相比join ,能做的事情更多。
private static CountDownLatch countDownLatch = new CountDownLatch(2); public static void main(String[] args){ Thread t = new Thread(new Runnable() { @Override public void run() { System.out.println("begin"); countDownLatch.countDown(); System.out.println("middle"); countDownLatch.countDown(); } }); t.start(); try { countDownLatch.await(); System.out.println("end"); } catch (InterruptedException e) { e.printStackTrace(); } }
输出:
由结果看出,实现的效果与join相同。
对比join ,其构造函数有一个int参数,表示计数器,可以手动控制其需要等待的次数。每次调用countDown 会减去1,其wait方法会一直等待计数器变成0。
用法更加灵活,计数参数N 可以代表N个线程、N个步骤,总之可以自由的控制。
如果调用await(long time,TimeUnit unit)方法等待,那么当前线程在等待一定时间后就不会再做等待,而是继续执行当前线程(需要注意的地方是,此时子线程还是在执行中的)
Thread t = new Thread(new Runnable() { @Override public void run() { System.out.println("begin"); countDownLatch.countDown(); try { Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("middle"); countDownLatch.countDown(); } }); t.start(); try { countDownLatch.await(2, TimeUnit.SECONDS); System.out.println("end"); } catch (InterruptedException e) { e.printStackTrace(); }
输出:
结果看出,主线程main在等待2秒后,发现子线程还是没有执行完毕,则继续执行,此时子线程并没有关闭,所以在等待到达5秒后,继续执行子线程输出middle。
CyclicBarrier 同步屏障
说明:中文意思为循环的屏障,要求指定线程到达屏障点,会可继续执行,如:
private static CyclicBarrier cyclicBarrier = new CyclicBarrier(4); public static void main(String[] args){ Runnable runnable = new Runnable() { @Override public void run() { System.out.println(Thread.currentThread().getName()+"begin"); try { cyclicBarrier.await(); System.out.println(Thread.currentThread().getName() + "end"); } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } } }; Thread t = new Thread(runnable,"A"); Thread t2 = new Thread(runnable,"B"); t.start(); t2.start(); try { cyclicBarrier.await(); System.out.println("main end"); } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } }
输出:
由结果看出,因为屏障点设置的数量为4,实际上执行了await方法(调用一次则减一)只有3个,2个子线程和一个main线程,所以大家都没得玩了,如果把计算点改为3,就正常了(当计数器为0时,会通知所有阻塞的线程可以继续执行了)
如果计算器过小,比如有3个线程调用了wait,而计数器设置为2,那么前面2个先执行wait的线程会停止阻塞,计算器又会从2开始计算,也就是说,还需要一个线程调用await来释放它,如下:
private static CyclicBarrier cyclicBarrier = new CyclicBarrier(2); public static void main(String[] args){ Runnable runnable = new Runnable() { @Override public void run() { System.out.println(Thread.currentThread().getName()+"begin"); try { cyclicBarrier.await(); System.out.println(Thread.currentThread().getName() + "end"); } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } } }; Thread t = new Thread(runnable,"A"); Thread t2 = new Thread(runnable,"B"); Thread t3 = new Thread(runnable,"C"); t.start(); t2.start(); t3.start(); try { cyclicBarrier.await(); System.out.println("main end"); } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } }
输出
关于CyclicBarrier 的构造函数还有一种用法,就是优先执行,也就是说如果我们设置计算器为2,在线程A与线程B调用await后,在停止阻塞AB之前会优先执行默认的线程,如下:
private static CyclicBarrier cyclicBarrier = new CyclicBarrier(2, new Runnable() { @Override public void run() { System.out.println("先执行我"); } }); public static void main(String[] args){ Runnable runnable = new Runnable() { @Override public void run() { System.out.println(Thread.currentThread().getName()+"begin"); try { cyclicBarrier.await(); System.out.println(Thread.currentThread().getName() + "end"); } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } } }; Thread t = new Thread(runnable,"A"); t.start(); try { cyclicBarrier.await(); System.out.println("main end"); } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } }
输出:
A线程先执行了输出了begin,然后到达屏障开始等待,此时主线程也同时调用了await,计算器的值2 - 1 - 1 = 0 ,准备释放A 和main,但是要先执行第二个参数线程,所以会先输出“先执行我”,然后A和main线程从block状态转换为runable 状态。
场景:计算多个线程的结果,利用第二个参数(线程)。如:
private CyclicBarrier barrier = new CyclicBarrier(4,this); private Executor executor = Executors.newFixedThreadPool(4); // 保存 private ConcurrentMap<String,Integer> map = new ConcurrentHashMap<>(); private void count(){ Runnable runnable = new Runnable() { @Override public void run() { // 记入运算结果 map.put(Thread.currentThread().getName(),1); try { barrier.await(); } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } } }; for (int i = 0; i < 4; i++) { executor.execute(runnable); } } @Override public void run() { int result = 0; for (Map.Entry<String,Integer> entry:map.entrySet()) { result += entry.getValue(); } // map.put("result",result); System.out.println(result); } public static void main(String[] args){ TestCount testCount = new TestCount(); testCount.count(); }
输出:
结果输出4。
CountDownLatch 与CyclicBarrier:
CyclicBarrier 对比CountDownLatch 还可以调用reset方法重置。,让线程重新执行一次。
Semaphore:信号
可以控制并发量工具,比如有100个连接需要获取数据库连接,保存数据,但是数据库连接池最大连接数为10,那么可以通过这个工具类来控制,如下:
private static final int COUNT = 100; private static ExecutorService threadPool = Executors.newFixedThreadPool(COUNT); private static Semaphore semaphore = new Semaphore(10); public static void main(String[] args){ Runnable runnable = new Runnable() { @Override public void run() { try { semaphore.acquire(); System.out.println("保存数据"); Thread.sleep(2000); semaphore.release(); } catch (InterruptedException e) { e.printStackTrace(); } } }; for (int i = 0; i < COUNT; i++) { threadPool.execute(runnable); } threadPool.shutdown(); }
输出:
在代码中执行会发现,每次只有10个线程在执行,这就是信号灯的作用~ acquire 方法意为获取许可证,release方法意为归还许可证。
Exchanger:
线程之间交换数据彼此的数据,比如A线程执行到指定同步点后会等待B线程,当B线程也到达时,交换数据,然后各自拜拜~
private static Exchanger<String> exchanger = new Exchanger<>(); private static ExecutorService pool = Executors.newFixedThreadPool(2); public static void main(String[] args){ pool.execute(new Runnable() { @Override public void run() { try { String what = exchanger.exchange("钞票"); System.out.println("我是线程A,我拿到了"+what); } catch (InterruptedException e) { e.printStackTrace(); } } }); pool.execute(new Runnable() { @Override public void run() { try { String what = exchanger.exchange("香烟"); System.out.println("我是线程B,我拿到了"+what); } catch (InterruptedException e) { e.printStackTrace(); } } }); pool.shutdown(); }
输出:
由结果看出,线程A和线程B各自拿到了自己想要的东西~~
总结:平时使用的最多的应该是CountDownLatch,因为使用场景的关系,用它用的会比较多,这几个工具类各有特点~
像是CyclicBarrier 在执行一些大型计算的时候也许会用到,Semaphore对于一些共享资源的控制,Exchanger对于需要交换信息比较合适,一般是不同的事情并行处理会比较好,而且是必须要交换数据。
对于像是解析excel,json文件执行导入数据的操作个人认为使用CountDownLatch 就足够了~~ 哈哈
以上是关于JAVA 多线程(10):join 的哥们和朋友 countDownLatchCyclicBarrierSemaphoreExchanger的主要内容,如果未能解决你的问题,请参考以下文章