java多线程系列:一 并发工具类的使用_2 ( CountDownLatch CyclicBarrier Semaphore Exchanger )
Posted edisonhou
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了java多线程系列:一 并发工具类的使用_2 ( CountDownLatch CyclicBarrier Semaphore Exchanger )相关的知识,希望对你有一定的参考价值。
- 前言- 本系列随笔 会深入浅出,解析java多线程的各种技术及实现。
- 随笔主要根据 《java并发编程的艺术》一书作为参考。 本系列以使用为主要目的,本人理解有限,还望读者辩证采纳,没有过多涉及源码的讨论,重在初学者的使用,理解伪码。
- 预备知识:1. volatile 关键字需要有一定理解 2. AQS 队列同步器有一定认知 (后续我会专门讲解AQS,先瞎听就够了,不懂就装懂!)
- 可能新手对下面部分内容完全不理解,我后续会继续更博,并最终将该系列排序。
- 全文 根据常用的 CountDownLatch 、 CyclicBarrier、 Semaphore 、 Exchanger 进行讲述如何使用
二、CyclicBarrier 同步屏障
1、CyclicBarrier默认的构造方法是CyclicBarrier(int parties),其参数表示屏障拦截的线程数
量,每个线程调用await方法告诉CyclicBarrier我已经到达了屏障,然后当前线程被阻塞
2、CyclicBarrier还提供一个更高级的构造函数CyclicBarrier(int parties,Runnable barrier-
Action),用于在线程到达屏障时,优先执行barrierAction,方便处理更复杂的业务场景。
场景:
CyclicBarrier可以用于多线程计算数据,最后合并计算结果的场景。例如,用一个Excel保
存了用户所有银行流水,每个Sheet保存一个账户近一年的每笔银行流水,现在需要统计用户
的日均银行流水,先用多线程处理每个sheet里的银行流水,都执行完之后,得到每个sheet的日
均银行流水,最后,再用barrierAction用这些线程的计算结果,计算出整个Excel的日均银行流
水。
1 package E08工具类; 2 3 import java.util.Map; 4 import java.util.concurrent.*; 5 6 /** 7 * 银行水服务 8 * CyclicBarrier的应用 9 */ 10 public class BankWaterService implements Runnable { 11 /** 12 * 创建4个屏障,处理完之后执行当前类的run方法 13 */ 14 private CyclicBarrier c = new CyclicBarrier(4, this); 15 /** 16 * 假设只有4个sheet,所以只启动4个线程 17 */ 18 private ExecutorService executor = Executors.newFixedThreadPool(4); 19 /** 20 * 保存每个sheet计算出的银流结果 21 */ 22 private ConcurrentHashMap<String, Integer> sheetBankWaterCount = new ConcurrentHashMap<String, Integer>(); 23 24 private void count() { 25 for (int i = 0; i < 4; i++) { 26 executor.execute(new Runnable() { 27 @Override 28 public void run() { 29 // 计算当前sheet的银流数据,计算代码省略 30 sheetBankWaterCount.put(Thread.currentThread().getName(), 1); 31 // 银流计算完成,插入一个屏障 32 try { 33 c.await(); 34 System.out.println(Thread.currentThread().getName() + "--->结束"); 35 } catch (InterruptedException | BrokenBarrierException e) { 36 e.printStackTrace(); 37 } 38 } 39 }); 40 } 41 executor.shutdown(); 42 } 43 44 @Override //屏障state=0时,首先执行此方法 45 public void run() { 46 int result = 0; 47 // 汇总每个sheet计算出的结果 48 for (Map.Entry<String, Integer> sheet : sheetBankWaterCount.entrySet()) { 49 result += sheet.getValue(); 50 } 51 // 将结果输出 52 sheetBankWaterCount.put("result", result); 53 System.out.println(result); 54 } 55 56 public static void main(String[] args) { 57 BankWaterService bankWaterCount = new BankWaterService(); 58 bankWaterCount.count(); 59 60 } 61 }
输出:4 pool-1-thread-4--->结束 pool-1-thread-3--->结束 pool-1-thread-2--->结束 pool-1-thread-1--->结束
流程:
1、4个线程均计算自己的金额
2、4个线程均被c.await()方法阻塞,
3、当第四个线程执行了c.await()后,所有线程从队列中返回,但是 屏障类 优先执行构造函数的第二个参数runnable。
3、其他特性
CountDownLatch的计数器只能使用一次,而CyclicBarrier的计数器可以使用reset()方法重
置。所以CyclicBarrier能处理更为复杂的业务场景。例如,如果计算发生错误,可以重置计数
器,并让线程重新执行一次。
CyclicBarrier还提供其他有用的方法,比如getNumberWaiting方法可以获得Cyclic-Barrier
阻塞的线程数量。isBroken()方法用来了解阻塞的线程是否被中断。
1 Thread thread = new Thread(new Runnable() { 2 @Override 3 public void run() { 4 try { 5 c.await(); 6 } catch (Exception e) { 7 } 8 } 9 }); 10 thread.start(); 11 thread.interrupt(); //中断自己 12 try { 13 c.await(); 14 } catch (Exception e) { 15 System.out.println(c.isBroken()); //可以检查阻塞的线程是否被中断了 16 }
三、Semaphore 信号量
1、用来控制线程的并发数,其实打开其代码,可以发现 semaphore就是一个实现了Lock接口的锁,和ReetrantLock的实现原理相仿 (后续我再解析)。
可以看到它 存在一个静态内部类 Sync 继承了AQS,并且支持公平和非公平获取锁。
2、使用场景:
Semaphore可以用于做流量控制,特别是公用资源有限的应用场景,比如数据库连接。假
如有一个需求,要读取几万个文件的数据,因为都是IO密集型任务,我们可以启动几十个线程
并发地读取,但是如果读到内存后,还需要存储到数据库中,而数据库的连接数只有10个,这
时我们必须控制只有10个线程同时获取数据库连接保存数据,否则会报错无法获取数据库连
接。这个时候,就可以使用Semaphore来做流量控制 (说简单点就是 同时允许几个线程获取锁)
1 package E08工具类; 2 3 import java.util.concurrent.ExecutorService; 4 import java.util.concurrent.Executors; 5 import java.util.concurrent.Semaphore; 6 7 /** 8 * 信号测试 类似于锁了已经 可以同时允许 10个线程执行任务 9 */ 10 public class SemaphoreTest { 11 private static final int THREAD_COUNT = 30; 12 private static ExecutorService threadPool = Executors.newFixedThreadPool(THREAD_COUNT); 13 private static Semaphore s = new Semaphore(10); 14 15 public static void main(String[] args) { 16 for (int i = 0; i < THREAD_COUNT; i++) { 17 threadPool.execute(new Runnable() { 18 @Override 19 public void run() { 20 try { 21 s.acquire(); 22 System.out.println("save data"); 23 Thread.sleep(2000); 24 s.release(); 25 } catch (InterruptedException e) { 26 } 27 } 28 }); 29 } 30 threadPool.shutdown(); 31 } 32 }
结果我就不贴了,每次可以打印10次 save data 代表每次有10个线程 s.acquire(); 成功,其他20个线程在同步队列里阻塞,等待前一个节点释放锁并通知它。
四、Exchanger
Exchanger(交换者)是一个用于线程间协作的工具类。Exchanger用于进行线程间的数据交
换。它提供一个同步点,在这个同步点,两个线程可以交换彼此的数据。这两个线程通过
exchange方法交换数据,如果第一个线程先执行exchange()方法,它会一直等待第二个线程也
执行exchange方法,当两个线程都到达同步点时,这两个线程就可以交换数据,将本线程生产
出来的数据传递给对方。
Exchanger可以用于遗传算法,遗传算法里需要选出两个人作为交配对象,这时候会交换
两人的数据,并使用交叉规则得出2个交配结果。Exchanger也可以用于校对工作,比如我们需
要将纸制银行流水通过人工的方式录入成电子银行流水,为了避免错误,采用AB岗两人进行
录入,录入到Excel之后,系统需要加载这两个Excel,并对两个Excel数据进行校对,看看是否
录入一致。
1 package E08工具类; 2 3 import java.util.concurrent.Exchanger; 4 import java.util.concurrent.ExecutorService; 5 import java.util.concurrent.Executors; 6 7 public class ExchangerTest { 8 private static final Exchanger<String> exgr = new Exchanger<String>(); 9 private static ExecutorService threadPool = Executors.newFixedThreadPool(2); 10 public static void main(String[] args) { 11 threadPool.execute(new Runnable() { 12 @Override 13 public void run() { 14 try { 15 String A = "银行流水AA";// A录入银行流水数据 16 exgr.exchange(A); //传入A 17 } catch (InterruptedException e) { 18 } 19 } 20 }); 21 threadPool.execute(new Runnable() { 22 @Override 23 public void run() { 24 try { 25 String B = "银行流水B";// B录入银行流水数据 26 String A = exgr.exchange("呵呵"); 27 System.out.println("A和B数据是否一致:" + A.equals(B) + ",A录入的是:" 28 + A + ",B录入是:" + B); 29 } catch (InterruptedException e) { 30 } 31 } 32 }); 33 threadPool.shutdown(); 34 } 35 }
结果:
A和B数据是否一致:false,A录入的是:银行流水AA,B录入是:银行流水B
tips: 这三个工具类我都没怎么细讲,主要是工作量比较大。。。。
五、总结:
1、CountDownLatch 本质上是一个 java1.5规范的锁,它的 await() countDown() 本质就是加锁 解锁的过程。
2、CylicBarrier 本质上是利用一个 ReentrantLock 对其执行的await()进行 加锁 解锁。
3、Semaphore 完全是一个 锁,设定了state的上限来保证每次并发数。
这三个工具类 有助于我们理解 同步队列器AbstractQueuedSynchronizer的思想,你可以从我分析的过程来理解 AQS,理解ReetrantLock,或者以后等我解析了 1.5的锁之后,再来理解这些工具类
但至少 目前使用是没啥问题了。
以上是关于java多线程系列:一 并发工具类的使用_2 ( CountDownLatch CyclicBarrier Semaphore Exchanger )的主要内容,如果未能解决你的问题,请参考以下文章
Java并发多线程编程——并发工具类CyclicBarrier(回环栅栏)
Java日期时间API系列4-----Jdk7及以前的日期时间类的线程安全问题
Java多线程系列:CountDownLatchSemaphore等4大并发工具类详解