最近把《java并发编程实战》-Java Consurrency in Practice 重温了一遍,把书中提到的一些常用工具记录于此:
一、闭锁(门栓)- CountDownLatch
适用场景:多线程测试时,通常为了精确计时,要求所有线程都ready后,才开始执行,防止有线程先起跑,造成不公平,类似的,所有线程执行完,整个程序才算运行完成。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
|
/** * 闭锁测试(菩提树下的杨过 http://yjmyzz.cnblogs.com/) * * @throws InterruptedException */ @Test public void countdownLatch() throws InterruptedException { CountDownLatch startLatch = new CountDownLatch( 1 ); //类似发令枪 CountDownLatch endLatch = new CountDownLatch( 10 ); //这里的数量,要与线程数相同 for ( int i = 0 ; i < 10 ; i++) { Thread t = new Thread(() -> { try { startLatch.await(); //先等着,直到发令枪响,防止有线程先run System.out.println(Thread.currentThread().getName() + " is running..." ); Thread.sleep( 10 ); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } finally { endLatch.countDown(); //每个线程执行完成后,计数 } }); t.setName( "线程-" + i); t.start(); } long start = System.currentTimeMillis(); startLatch.countDown(); //发令枪响,所有线程『开跑』 endLatch.await(); //等所有线程都完成 long end = System.currentTimeMillis(); System.out.println( "done! exec time => " + (end - start) + " ms" ); } |
执行结果:
线程-1 is running...
线程-5 is running...
线程-8 is running...
线程-4 is running...
线程-3 is running...
线程-0 is running...
线程-2 is running...
线程-9 is running...
线程-7 is running...
线程-6 is running...
done! exec time => 13 ms
注:大家可以把第14行注释掉,再看看运行结果有什么不同。
二、信号量(Semaphore)
适用场景:用于资源数有限制的并发访问场景。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
|
public class BoundedHashSet<T> { private final Set<T> set; private final Semaphore semaphore; public BoundedHashSet( int bound) { this .set = Collections.synchronizedSet( new HashSet<T>()); this .semaphore = new Semaphore(bound); } public boolean add(T t) throws InterruptedException { if (!semaphore.tryAcquire( 5 , TimeUnit.SECONDS)) { return false ; } ; boolean added = false ; try { added = set.add(t); return added; } finally { if (!added) { semaphore.release(); } } } public boolean remove(Object o) { boolean removed = set.remove(o); if (removed) { semaphore.release(); } return removed; } } @Test public void semaphoreTest() throws InterruptedException { BoundedHashSet<String> set = new BoundedHashSet<>( 5 ); for ( int i = 0 ; i < 6 ; i++) { if (set.add(i + "" )) { System.out.println(i + " added !" ); } else { System.out.println(i + " not add to Set!" ); } } } |
上面的示例将一个普通的Set变成了有界容器。执行结果如下:
0 added !
1 added !
2 added !
3 added !
4 added !
5 not add to Set!
三、栅栏CyclicBarrier
这个跟闭锁类似,可以通过代码设置一个『屏障』点,其它线程到达该点后才能继续,常用于约束其它线程都到达某一状态后,才允许做后面的事情。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
|
public class Worker extends Thread { private CyclicBarrier cyclicBarrier; public Worker(CyclicBarrier cyclicBarrier) { this .cyclicBarrier = cyclicBarrier; } private void step1() { System.out.println( this .getName() + " step 1 ..." ); } private void step2() { System.out.println( this .getName() + " step 2 ..." ); } public void run() { step1(); try { cyclicBarrier.await(); } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } step2(); } } @Test public void cyclicBarrierTest() throws InterruptedException, BrokenBarrierException { CyclicBarrier cyclicBarrier = new CyclicBarrier( 11 ); for ( int i = 0 ; i < 10 ; i++) { Worker w = new Worker(cyclicBarrier); w.start(); } cyclicBarrier.await(); } |
这里我们假设有一个worder线程,里面有2步操作,要求所有线程完成step1后,才能继续step2. 执行结果如下:
Thread-0 step 1 ...
Thread-1 step 1 ...
Thread-2 step 1 ...
Thread-3 step 1 ...
Thread-4 step 1 ...
Thread-5 step 1 ...
Thread-6 step 1 ...
Thread-7 step 1 ...
Thread-8 step 1 ...
Thread-9 step 1 ...
Thread-9 step 2 ...
Thread-0 step 2 ...
Thread-3 step 2 ...
Thread-4 step 2 ...
Thread-6 step 2 ...
Thread-2 step 2 ...
Thread-1 step 2 ...
Thread-8 step 2 ...
Thread-7 step 2 ...
Thread-5 step 2 ...
四、Exchanger
如果2个线程需要交换数据,Exchanger就能派上用场了,见下面的示例:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
|
@Test public void exchangerTest() { Exchanger<String> exchanger = new Exchanger<>(); Thread t1 = new Thread(() -> { String temp = "AAAAAA" ; System.out.println( "thread 1 交换前:" + temp); try { temp = exchanger.exchange(temp); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println( "thread 1 交换后:" + temp); }); Thread t2 = new Thread(() -> { String temp = "BBBBBB" ; System.out.println( "thread 2 交换前:" + temp); try { temp = exchanger.exchange(temp); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println( "thread 2 交换后:" + temp); }); t1.start(); t2.start(); } |
执行结果:
thread 1 交换前:AAAAAA
thread 2 交换前:BBBBBB
thread 2 交换后:AAAAAA
thread 1 交换后:BBBBBB
五、FutureTask/Future
一些很耗时的操作,可以用Future转化成异步,不阻塞后续的处理,直到真正需要返回结果时调用get拿到结果
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
|
@Test public void futureTaskTest() throws ExecutionException, InterruptedException, TimeoutException { Callable<String> callable = () -> { System.out.println( "很耗时的操作处理中。。。" ); Thread.sleep( 5000 ); return "done" ; }; FutureTask<String> futureTask = new FutureTask<>(callable); System.out.println( "就绪。。。" ); new Thread(futureTask).start(); System.out.println( "主线程其它处理。。。" ); System.out.println(futureTask.get()); System.out.println( "处理完成!" ); System.out.println( "-----------------" ); System.out.println( "executor 就绪。。。" ); ExecutorService executorService = Executors.newSingleThreadExecutor(); Future<String> future = executorService.submit(callable); System.out.println(future.get( 10 , TimeUnit.SECONDS)); } |
执行结果:
就绪。。。
主线程其它处理。。。
很耗时的操作处理中。。。
done
处理完成!
-----------------
executor 就绪。。。
很耗时的操作处理中。。。
done
六、阻塞队列BlockingQueue
阻塞队列可以在线程间实现生产者-消费者模式。比如下面的示例:线程producer模拟快速生产数据,而线程consumer模拟慢速消费数据,当达到队列的上限时(即:生产者产生的数据,已经放不下了),队列就堵塞住了。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
|
@Test public void blockingQueueTest() throws InterruptedException { final BlockingQueue<String> blockingDeque = new ArrayBlockingQueue<>( 5 ); Thread producer = new Thread() { public void run() { Random rnd = new Random(); while ( true ) { try { int i = rnd.nextInt( 10000 ); blockingDeque.put(i + "" ); System.out.println( this .getName() + " 产生了一个数字:" + i); Thread.sleep(rnd.nextInt( 50 )); //模拟生产者快速生产 } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } } }; producer.setName( "producer 1" ); Thread consumer = new Thread() { public void run() { while ( true ) { Random rnd = new Random(); try { String i = blockingDeque.take(); System.out.println( this .getName() + " 消费了一个数字:" + i); Thread.sleep(rnd.nextInt( 10000 )); //消费者模拟慢速消费 } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } } }; consumer.setName( "consumer 1" ); producer.start(); consumer.start(); while ( true ) { Thread.sleep( 100 ); } } |
执行结果:
producer 1 产生了一个数字:6773
consumer 1 消费了一个数字:6773
producer 1 产生了一个数字:4456
producer 1 产生了一个数字:8572
producer 1 产生了一个数字:5764
producer 1 产生了一个数字:2874
producer 1 产生了一个数字:780 # 注意这里就已经堵住了,直到有消费者消费一条数据,才能继续生产
consumer 1 消费了一个数字:4456
producer 1 产生了一个数字:4193