java多线程系列:一 并发工具类的使用_2 ( CountDownLatch CyclicBarrier Semaphore Exchanger )

Posted edisonhou

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了java多线程系列:一 并发工具类的使用_2 ( CountDownLatch CyclicBarrier Semaphore Exchanger )相关的知识,希望对你有一定的参考价值。

  • 前言- 本系列随笔 会深入浅出,解析java多线程的各种技术及实现。
  • 随笔主要根据 《java并发编程的艺术》一书作为参考。 本系列以使用为主要目的,本人理解有限,还望读者辩证采纳,没有过多涉及源码的讨论,重在初学者的使用,理解伪码。
  • 预备知识:1. volatile 关键字需要有一定理解  2. AQS 队列同步器有一定认知 (后续我会专门讲解AQS,先瞎听就够了,不懂就装懂!)
  • 可能新手对下面部分内容完全不理解,我后续会继续更博,并最终将该系列排序。
  • 全文 根据常用的 CountDownLatch 、 CyclicBarrier、 Semaphore 、 Exchanger 进行讲述如何使用

 

二、CyclicBarrier 同步屏障

1、CyclicBarrier默认的构造方法是CyclicBarrier(int parties),其参数表示屏障拦截的线程数
量,每个线程调用await方法告诉CyclicBarrier我已经到达了屏障,然后当前线程被阻塞

2、CyclicBarrier还提供一个更高级的构造函数CyclicBarrier(int parties,Runnable barrier-

Action),用于在线程到达屏障时,优先执行barrierAction,方便处理更复杂的业务场景。

场景: 

CyclicBarrier可以用于多线程计算数据,最后合并计算结果的场景。例如,用一个Excel保
存了用户所有银行流水,每个Sheet保存一个账户近一年的每笔银行流水,现在需要统计用户
的日均银行流水,先用多线程处理每个sheet里的银行流水,都执行完之后,得到每个sheet的日
均银行流水,最后,再用barrierAction用这些线程的计算结果,计算出整个Excel的日均银行流
水。

 

 1 package E08工具类;
 2 
 3 import java.util.Map;
 4 import java.util.concurrent.*;
 5 
 6 /**
 7  * 银行水服务
 8  * CyclicBarrier的应用
 9  */
10 public class BankWaterService implements Runnable {
11     /**
12      * 创建4个屏障,处理完之后执行当前类的run方法
13      */
14     private CyclicBarrier c = new CyclicBarrier(4, this);
15     /**
16      * 假设只有4个sheet,所以只启动4个线程
17      */
18     private ExecutorService executor = Executors.newFixedThreadPool(4);
19     /**
20      * 保存每个sheet计算出的银流结果
21      */
22     private ConcurrentHashMap<String, Integer> sheetBankWaterCount = new ConcurrentHashMap<String, Integer>();
23 
24     private void count() {
25         for (int i = 0; i < 4; i++) {
26             executor.execute(new Runnable() {
27                 @Override
28                 public void run() {
29                     // 计算当前sheet的银流数据,计算代码省略
30                     sheetBankWaterCount.put(Thread.currentThread().getName(), 1);
31                     // 银流计算完成,插入一个屏障
32                     try {
33                         c.await();
34                         System.out.println(Thread.currentThread().getName() + "--->结束");
35                     } catch (InterruptedException | BrokenBarrierException e) {
36                         e.printStackTrace();
37                     }
38                 }
39             });
40         }
41         executor.shutdown();
42     }
43 
44     @Override   //屏障state=0时,首先执行此方法
45     public void run() {
46         int result = 0;
47 // 汇总每个sheet计算出的结果
48         for (Map.Entry<String, Integer> sheet : sheetBankWaterCount.entrySet()) {
49             result += sheet.getValue();
50         }
51 // 将结果输出
52         sheetBankWaterCount.put("result", result);
53         System.out.println(result);
54     }
55 
56     public static void main(String[] args) {
57         BankWaterService bankWaterCount = new BankWaterService();
58         bankWaterCount.count();
59 
60     }
61 }

 

输出:4
pool-1-thread-4--->结束
pool-1-thread-3--->结束
pool-1-thread-2--->结束
pool-1-thread-1--->结束

 流程:

1、4个线程均计算自己的金额

2、4个线程均被c.await()方法阻塞,

3、当第四个线程执行了c.await()后,所有线程从队列中返回,但是 屏障类 优先执行构造函数的第二个参数runnable。

 

3、其他特性

CountDownLatch的计数器只能使用一次,而CyclicBarrier的计数器可以使用reset()方法重

置。所以CyclicBarrier能处理更为复杂的业务场景。例如,如果计算发生错误,可以重置计数
器,并让线程重新执行一次。
CyclicBarrier还提供其他有用的方法,比如getNumberWaiting方法可以获得Cyclic-Barrier
阻塞的线程数量。isBroken()方法用来了解阻塞的线程是否被中断。

 1 Thread thread = new Thread(new Runnable() {
 2             @Override
 3             public void run() {
 4                 try {
 5                     c.await();
 6                 } catch (Exception e) {
 7                 }
 8             }
 9         });
10         thread.start();
11         thread.interrupt(); //中断自己
12         try {
13             c.await();
14         } catch (Exception e) {
15             System.out.println(c.isBroken());  //可以检查阻塞的线程是否被中断了
16         }

 

 

 

三、Semaphore  信号量

 1、用来控制线程的并发数,其实打开其代码,可以发现 semaphore就是一个实现了Lock接口的锁,和ReetrantLock的实现原理相仿 (后续我再解析)。

技术图片

可以看到它 存在一个静态内部类 Sync 继承了AQS,并且支持公平和非公平获取锁。

2、使用场景:

Semaphore可以用于做流量控制,特别是公用资源有限的应用场景,比如数据库连接。假
如有一个需求,要读取几万个文件的数据,因为都是IO密集型任务,我们可以启动几十个线程
并发地读取,但是如果读到内存后,还需要存储到数据库中,而数据库的连接数只有10个,这
时我们必须控制只有10个线程同时获取数据库连接保存数据,否则会报错无法获取数据库连
接。这个时候,就可以使用Semaphore来做流量控制 (说简单点就是 同时允许几个线程获取锁)

 

 1 package E08工具类;
 2 
 3 import java.util.concurrent.ExecutorService;
 4 import java.util.concurrent.Executors;
 5 import java.util.concurrent.Semaphore;
 6 
 7 /**
 8  * 信号测试  类似于锁了已经 可以同时允许 10个线程执行任务
 9  */
10 public class SemaphoreTest {
11     private static final int THREAD_COUNT = 30;
12     private static ExecutorService threadPool = Executors.newFixedThreadPool(THREAD_COUNT);
13     private static Semaphore s = new Semaphore(10);
14 
15     public static void main(String[] args) {
16         for (int i = 0; i < THREAD_COUNT; i++) {
17             threadPool.execute(new Runnable() {
18                 @Override
19                 public void run() {
20                     try {
21                         s.acquire();
22                         System.out.println("save data");
23                         Thread.sleep(2000);
24                         s.release();
25                     } catch (InterruptedException e) {
26                     }
27                 }
28             });
29         }
30         threadPool.shutdown();
31     }
32 }

 

结果我就不贴了,每次可以打印10次 save data 代表每次有10个线程  s.acquire(); 成功,其他20个线程在同步队列里阻塞,等待前一个节点释放锁并通知它。

 

 

四、Exchanger 

Exchanger(交换者)是一个用于线程间协作的工具类。Exchanger用于进行线程间的数据交
换。它提供一个同步点,在这个同步点,两个线程可以交换彼此的数据。这两个线程通过
exchange方法交换数据,如果第一个线程先执行exchange()方法,它会一直等待第二个线程也
执行exchange方法,当两个线程都到达同步点时,这两个线程就可以交换数据,将本线程生产
出来的数据传递给对方。

Exchanger可以用于遗传算法,遗传算法里需要选出两个人作为交配对象,这时候会交换
两人的数据,并使用交叉规则得出2个交配结果。Exchanger也可以用于校对工作,比如我们需
要将纸制银行流水通过人工的方式录入成电子银行流水,为了避免错误,采用AB岗两人进行
录入,录入到Excel之后,系统需要加载这两个Excel,并对两个Excel数据进行校对,看看是否
录入一致。

 1 package E08工具类;
 2 
 3 import java.util.concurrent.Exchanger;
 4 import java.util.concurrent.ExecutorService;
 5 import java.util.concurrent.Executors;
 6 
 7 public class ExchangerTest {
 8     private static final Exchanger<String> exgr = new Exchanger<String>();
 9     private static ExecutorService threadPool = Executors.newFixedThreadPool(2);
10     public static void main(String[] args) {
11         threadPool.execute(new Runnable() {
12             @Override
13             public void run() {
14                 try {
15                     String A = "银行流水AA";// A录入银行流水数据
16                      exgr.exchange(A);  //传入A
17                 } catch (InterruptedException e) {
18                 }
19             }
20         });
21         threadPool.execute(new Runnable() {
22             @Override
23             public void run() {
24                 try {
25                     String B = "银行流水B";// B录入银行流水数据
26                     String A = exgr.exchange("呵呵");
27                     System.out.println("A和B数据是否一致:" + A.equals(B) + ",A录入的是:"
28                             + A + ",B录入是:" + B);
29                 } catch (InterruptedException e) {
30                 }
31             }
32         });
33         threadPool.shutdown();
34     }
35 }

 

结果:

A和B数据是否一致:false,A录入的是:银行流水AA,B录入是:银行流水B

 

tips: 这三个工具类我都没怎么细讲,主要是工作量比较大。。。。 

五、总结:

1、CountDownLatch 本质上是一个 java1.5规范的锁,它的 await() countDown() 本质就是加锁 解锁的过程。

2、CylicBarrier  本质上是利用一个 ReentrantLock 对其执行的await()进行 加锁 解锁。

3、Semaphore  完全是一个 锁,设定了state的上限来保证每次并发数。

这三个工具类 有助于我们理解 同步队列器AbstractQueuedSynchronizer的思想,你可以从我分析的过程来理解 AQS,理解ReetrantLock,或者以后等我解析了 1.5的锁之后,再来理解这些工具类

但至少 目前使用是没啥问题了。

 

以上是关于java多线程系列:一 并发工具类的使用_2 ( CountDownLatch CyclicBarrier Semaphore Exchanger )的主要内容,如果未能解决你的问题,请参考以下文章

Java——多线程高并发系列之理解CAS原子变量类的使用

Java并发多线程编程——并发工具类CyclicBarrier(回环栅栏)

Java日期时间API系列4-----Jdk7及以前的日期时间类的线程安全问题

Java多线程系列:CountDownLatchSemaphore等4大并发工具类详解

Java多线程系列:CountDownLatchSemaphore等4大并发工具类详解

Java并发多线程编程——并发工具类Exchanger