并发编程之基础( 四)
Posted 被罚站的树
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了并发编程之基础( 四)相关的知识,希望对你有一定的参考价值。
新类库
前面已经把并发编程的基础知识讲的差不多了,这章主要介绍一下JAVA中其它一些关于并发编程的类库,主要有一下几个类库。
- CountDownLatch
- CyclicBarrier
- DelayQueue
- PriorityBlockingQueue
- ScheduleExecutor
- Semaphore
- Exchanger
1. CountDownLatch
该类主要是同步一个或多个任务,强制一个或多个任务等待其它任务执行的一组操作完成。可以给该对象设置一个初始计数值,当计数值不为0时,调用该对象的await()方法就会阻塞,调用counDown()方法会让计数值减1,当计数值为0时阻塞任务会被唤醒。其典型用法就是将一个程序分成多个独立的任务,并给CountDownLatch设定一个初始值,该初始值应该为首先需要执行的线程的个数(比如赛跑,5个运动员都做好准备之后,裁判才能打枪,这时初始值应该设置为5)。一些任务需要等待其它任务先完成或者其它任务的一部分完成,那么可以待用await()将自己挂起。而另一些任务的某些操作完成时调用countDown()方法来减小计数值,等待计数值为0时,挂起的任务则则认为当前所有的条件以满足继续执行的需要了,则可以继续运行。注意:计数值只能被设置一次且在new的时候就要指定初值,而且该对象只能使用一次,如果想重复使用,请考虑CyclicBarrier
1 package com.dy.xidian; 2 3 import java.util.Random; 4 import java.util.concurrent.CountDownLatch; 5 import java.util.concurrent.ExecutorService; 6 import java.util.concurrent.Executors; 7 import java.util.concurrent.TimeUnit; 8 9 class TaskPortion implements Runnable { 10 private static int counter = 0; 11 private final int id = counter++; 12 private static Random rand = new Random(47); 13 private final CountDownLatch latch; 14 15 public TaskPortion(CountDownLatch latch) { 16 super(); 17 this.latch = latch; 18 } 19 20 @Override 21 public void run() { 22 try { 23 doWork(); 24 latch.countDown(); 25 } catch (InterruptedException e) { 26 } 27 28 } 29 30 public void doWork() throws InterruptedException { 31 TimeUnit.MILLISECONDS.sleep(rand.nextInt(2000)); 32 System.out.println(this + "completed"); 33 } 34 35 public String toString() { 36 return String.format("%1$-3d", id); 37 } 38 } 39 40 class WaitingTask implements Runnable { 41 private static int counter = 0; 42 private final int id = counter++; 43 private final CountDownLatch latch; 44 45 public WaitingTask(CountDownLatch latch) { 46 super(); 47 this.latch = latch; 48 } 49 50 @Override 51 public void run() { 52 try { 53 latch.await(); 54 System.out.println("Latch barrier passed for " + this); 55 } catch (InterruptedException e) { 56 System.out.println(this + " interrupted"); 57 } 58 } 59 60 public String toString() { 61 return String.format("WaitingTask %1$-3d ", id); 62 } 63 } 64 public class CountDownLatchDemo { 65 static final int SIZE = 100; 66 public static void main(String[] args) { 67 ExecutorService exec = Executors.newCachedThreadPool(); 68 CountDownLatch latch = new CountDownLatch(SIZE); 69 for (int i = 0; i < 10; i++) 70 exec.execute(new WaitingTask(latch)); 71 for (int i = 0; i < SIZE; i++) 72 exec.execute(new TaskPortion(latch)); 73 System.out.println("Launched all tasks"); 74 exec.shutdownNow(); 75 } 76 }
2. CyclicBarrier
CyclicBarrier与CountDownLatch功能差不多,不同之处就是可以多次使用,等到计数值变为0时,它会自动重置。而且不需要每个线程都去调用类似countDown()这样的方法,因为每调用一个await(),它就会自动将计数值减1。它使用于这种情况:多个线程并行执行工作,大家一致向前推进,所有线程在这个阶段的工作都完成了(所有的线程都调用了await方法),才能进入下一阶段,而对于那些早完成的线程只能先等待了。下面是一个赛马比赛,每个马可以看作一个线程,等所有的马都达到栅栏后,才能开始新一轮的比赛。
1 package com.dy.xidian; 2 3 import java.util.ArrayList; 4 import java.util.List; 5 import java.util.Random; 6 import java.util.concurrent.BrokenBarrierException; 7 import java.util.concurrent.CyclicBarrier; 8 import java.util.concurrent.ExecutorService; 9 import java.util.concurrent.Executors; 10 import java.util.concurrent.TimeUnit; 11 12 class Horse implements Runnable { 13 private static int counter = 0; 14 private final int id = counter++; 15 private int strides = 0; 16 private static Random rand = new Random(47); 17 private static CyclicBarrier barrier; 18 19 public Horse(CyclicBarrier b) { 20 barrier = b; 21 } 22 23 public synchronized int getStriders() { 24 return strides; 25 } 26 27 @Override 28 public void run() { 29 try { 30 while (!Thread.interrupted()) { 31 synchronized (this) { 32 strides += rand.nextInt(3); 33 } 34 barrier.await(); 35 } 36 } catch (InterruptedException e) { 37 // TODO 38 } catch (BrokenBarrierException e) { 39 throw new RuntimeException(e); 40 } 41 } 42 43 public String toString() { 44 return "Horse " + id + " "; 45 } 46 47 public String tracks() { 48 StringBuilder s = new StringBuilder(); 49 for (int i = 0; i < getStriders(); i++) 50 s.append("*"); 51 s.append(id); 52 return s.toString(); 53 } 54 } 55 56 public class HorseRace { 57 static final int FINISH_LINE = 75; 58 private List<Horse> horses = new ArrayList<Horse>(); 59 private ExecutorService exec = Executors.newCachedThreadPool(); 60 private CyclicBarrier barrier; 61 62 public HorseRace(int nHorses, final int pause) { 63 barrier = new CyclicBarrier(nHorses, new Runnable() { 64 65 @Override 66 public void run() { 67 StringBuilder s = new StringBuilder(); 68 for (int i = 0; i < FINISH_LINE; i++) { 69 s.append("="); 70 System.out.println(s); 71 for (Horse horse : horses) 72 System.out.println(horse.tracks()); 73 for (Horse horse : horses) 74 if (horse.getStriders() >= FINISH_LINE) { 75 System.out.println(horse + "won!"); 76 exec.shutdownNow(); 77 return; 78 } 79 try { 80 TimeUnit.MILLISECONDS.sleep(pause); 81 } catch (InterruptedException e) { 82 System.out.println("barrier-action sleep interrupted"); 83 } 84 } 85 } 86 }); 87 88 for (int i = 0; i < nHorses; i++) { 89 Horse horse = new Horse(barrier); 90 horses.add(horse); 91 exec.execute(horse); 92 } 93 } 94 95 public static void main(String[] args) { 96 int nHorses = 7; 97 int pause = 200; 98 if (args.length > 0) { 99 int n = new Integer(args[0]); 100 nHorses = n > 0 ? n : nHorses; 101 } 102 if (args.length > 1) { 103 int p = new Integer(args[1]); 104 pause = p > -1 ? p : pause; 105 } 106 new HorseRace(nHorses, pause); 107 } 108 }
运行结果:
= **0 ***1 *2 **3 *4 ***5 ***6 == **0 ***1 *2 **3 *4 ***5 ***6 === **0 ***1 *2 **3 *4 ***5 ***6 ====
运行结果中的==表示栅栏,数字为每个马的编号,*的个数代表每个马目前跑了多少步。在代码我,我们可以看到在创建CyclicBarrier对象时,我们还给他传递了一个复写了Runnable后的对象,这是我CounDownLatch不同的地方。每当计数器的值为0的是时候,里面的该对象中的run方法会被调用。可能有这样一种情况,当计数值再次变为0时,上次的run方法还没执行完,它会不会创建新的线程重新执行run方法呢?通过测试,这种情况是不会发生的,只有等run执行完,才会去创建新的线程。
3.DelayQueue
DelayQueue是一个无界的阻塞队列,用于存放实现了Delayed接口的对象,其中的对象只能在其延迟期满才能从队列中取走。该队列的头部是延迟期满后保存时间最长的Delayed元素。如果没有任何延迟期满的对象,那就不会有任何头元素,这时如果使用take()方法从队列获取对象时会发生阻塞,使用poll时会直接返回null。
1 package com.dy.xidian; 2 3 import java.util.ArrayList; 4 import java.util.List; 5 import java.util.Random; 6 import java.util.concurrent.DelayQueue; 7 import java.util.concurrent.Delayed; 8 import java.util.concurrent.ExecutorService; 9 import java.util.concurrent.Executors; 10 import java.util.concurrent.TimeUnit; 11 12 class DelayedTask implements Runnable, Delayed { 13 private static int counter = 0; 14 private final int id = counter++; 15 private final int delayTime; 16 private final long trigger; 17 18 protected static List<DelayedTask> sequeue = new ArrayList<DelayedTask>(); 19 20 // System.nanoTime()获取当前时间,结果是纳秒级 21 // TimeUnit.MILLSECONDS.convert(time, TimeUnit.SECONDS) 22 // 时间转换(一般是大单位转小单位),比如计算1s=多少ms之类的 23 // time是时间,TimeUnit.SECONDS是原始单位(s),MILLISECONDS是转换后的单位(ms) 24 public DelayedTask(int delayInMilliseconds) { 25 delayTime = delayInMilliseconds; 26 trigger = System.nanoTime() 27 + TimeUnit.NANOSECONDS.convert(delayTime, TimeUnit.MILLISECONDS); 28 sequeue.add(this); 29 } 30 31 // 重载Delayed接口的getDelay方法,该示例代码给出的是重载的标准形式 32 @Override 33 public long getDelay(TimeUnit unit) { 34 return unit.convert(trigger - System.nanoTime(), TimeUnit.NANOSECONDS); 35 36 } 37 38 //比较每个对象的触发时间,以确定在队列中的位置 39 @Override 40 public int compareTo(Delayed arg) { 41 DelayedTask that = (DelayedTask) arg; 42 if (trigger < that.trigger) 43 return -1; 44 if (trigger > that.trigger) 45 return 1; 46 return 0; 47 } 48 49 @Override 50 public void run() { 51 System.out.println(this + " "); 52 } 53 54 public String toString() { 55 return String.format("[%1$-4d]", counter) + "Task " + id; 56 } 57 58 public String summary() { 59 return "(" + id + ":" + counter + ")"; 60 } 61 62 public static class EndSentinel extends DelayedTask { 63 private ExecutorService exec; 64 65 public EndSentinel(int delay, ExecutorService e) { 66 super(delay); 67 exec = e; 68 } 69 70 public void run() { 71 for (DelayedTask pt : sequeue) { 72 System.out.println(pt.summary() + " "); 73 } 74 System.out.println(this + " Calling shutdownNow()"); 75 exec.shutdownNow(); 76 } 77 } 78 } 79 80 class DelayTaskConsumer implements Runnable { 81 private DelayQueue<DelayedTask> q; 82 public DelayTaskConsumer(DelayQueue<DelayedTask> q) { 83 this.q = q; 84 } 85 86 @Override 87 public void run() { 88 try { 89 while (!Thread.interrupted()) 90 q.take().run(); 91 } catch (InterruptedException e) { 92 } 93 System.out.println("Finised DelayedTaskConsumer!"); 94 } 95 96 } 97 public class DelayQueueDemo { 98 public static void main(String[] args) { 99 Random rand = new Random(47); 100 ExecutorService exec = Executors.newCachedThreadPool(); 101 DelayQueue<DelayedTask> queue = new DelayQueue<DelayedTask>(); 102 for (int i = 0; i < 20; i++) 103 queue.put(new DelayedTask(rand.nextInt(5000))); 104 queue.add(new DelayedTask.EndSentinel(5000, exec)); 105 exec.execute(new DelayTaskConsumer(queue)); 106 } 107 }
运行结果:
1 [128 ]Task 11 2 [200 ]Task 7 3 [429 ]Task 5 4 [520 ]Task 18 5 [555 ]Task 1 6 [961 ]Task 4 7 [998 ]Task 16 8 [1207]Task 9 9 [1693]Task 2 10 [1809]Task 14 11 [1861]Task 3 12 [2278]Task 15 13 [3288]Task 10 14 [3551]Task 12 15 [4258]Task 0 16 [4258]Task 19 17 [4522]Task 8 18 [4589]Task 13 19 [4861]Task 17 20 [4868]Task 6 21 (0:21) 22 (1:21) 23 (2:21) 24 (3:21) 25 (4:21) 26 (5:21) 27 (6:21) 28 (7:21) 29 (8:21) 30 (9:21) 31 (10:21) 32 (11:21) 33 (12:21) 34 (13:21) 35 (14:21) 36 (15:21) 37 (16:21) 38 (17:21) 39 (18:21) 40 (19:21) 41 (20:21) 42 [5000]Task 20 Calling shutdownNow() 43 Finised DelayedTaskConsumer!
该程序创建了20个delayedTask对象,这20对象其实是线程对象,然后将这20对象放入DelayedQueue中,同时将这20个对象加入到list中以表明创建的先后顺序。每个线程的延迟期是通过随机数指定的。在DelayedTask中有一个内部类,该类的作用就是遍历list,输出每个线程的信息(id + 延迟期),最后关闭整个线程。DelayedTaskConsumer就是不断从DelayedQueue中取线程对象,然后让其执行。
关于Delayed接口的实现这里要强调一下,代码中写的是标准形式。delayTime是延迟期,需要我们指定。trigger表示这个对象的激活时间(比如到11点整时,其延迟期满),其计算方法就是获取当前时间+延迟期。而getDelay(TimeUnit unit)这个函数是个关键,这个函数会被调用两次:第一次查看延期满的时间点和当前时间之差(比如当前时间9点,延迟期满是在11点),发现是正值,对象需要继续等待;第二次查看时发现是负值(比如当前时间已经到了12点了),返回值为负数,说明对象的延迟期已经到了,可以使用了。
以上是关于并发编程之基础( 四)的主要内容,如果未能解决你的问题,请参考以下文章
Java Review - 并发编程_DelayQueue原理&源码剖析
Java Review - 并发编程_DelayQueue原理&源码剖析
并发编程-concurrent指南-阻塞队列-延迟队列DelayQueue