并发编程之基础( 四)

Posted 被罚站的树

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了并发编程之基础( 四)相关的知识,希望对你有一定的参考价值。

新类库

前面已经把并发编程的基础知识讲的差不多了,这章主要介绍一下JAVA中其它一些关于并发编程的类库,主要有一下几个类库。

  • CountDownLatch
  • CyclicBarrier
  • DelayQueue
  • PriorityBlockingQueue
  • ScheduleExecutor
  • Semaphore
  • Exchanger

1. CountDownLatch

  该类主要是同步一个或多个任务,强制一个或多个任务等待其它任务执行的一组操作完成。可以给该对象设置一个初始计数值,当计数值不为0时,调用该对象的await()方法就会阻塞,调用counDown()方法会让计数值减1,当计数值为0时阻塞任务会被唤醒。其典型用法就是将一个程序分成多个独立的任务,并给CountDownLatch设定一个初始值,该初始值应该为首先需要执行的线程的个数(比如赛跑,5个运动员都做好准备之后,裁判才能打枪,这时初始值应该设置为5)。一些任务需要等待其它任务先完成或者其它任务的一部分完成,那么可以待用await()将自己挂起。而另一些任务的某些操作完成时调用countDown()方法来减小计数值,等待计数值为0时,挂起的任务则则认为当前所有的条件以满足继续执行的需要了,则可以继续运行。注意:计数值只能被设置一次且在new的时候就要指定初值,而且该对象只能使用一次,如果想重复使用,请考虑CyclicBarrier

技术分享
 1 package com.dy.xidian;
 2 
 3 import java.util.Random;
 4 import java.util.concurrent.CountDownLatch;
 5 import java.util.concurrent.ExecutorService;
 6 import java.util.concurrent.Executors;
 7 import java.util.concurrent.TimeUnit;
 8 
 9 class TaskPortion implements Runnable {
10     private static int counter = 0;
11     private final int id = counter++;
12     private static Random rand = new Random(47);
13     private final CountDownLatch latch;
14 
15     public TaskPortion(CountDownLatch latch) {
16         super();
17         this.latch = latch;
18     }
19 
20     @Override
21     public void run() {
22         try {
23             doWork();
24             latch.countDown();
25         } catch (InterruptedException e) {
26         }
27 
28     }
29 
30     public void doWork() throws InterruptedException {
31         TimeUnit.MILLISECONDS.sleep(rand.nextInt(2000));
32         System.out.println(this + "completed");
33     }
34 
35     public String toString() {
36         return String.format("%1$-3d", id);
37     }
38 }
39 
40 class WaitingTask implements Runnable {
41     private static int counter = 0;
42     private final int id = counter++;
43     private final CountDownLatch latch;
44 
45     public WaitingTask(CountDownLatch latch) {
46         super();
47         this.latch = latch;
48     }
49 
50     @Override
51     public void run() {
52         try {
53             latch.await();
54             System.out.println("Latch barrier passed for " + this);
55         } catch (InterruptedException e) {
56             System.out.println(this + " interrupted");
57         }
58     }
59 
60     public String toString() {
61         return String.format("WaitingTask %1$-3d ", id);
62     }
63 }
64 public class CountDownLatchDemo {
65     static final int SIZE = 100;
66     public static void main(String[] args) {
67         ExecutorService exec = Executors.newCachedThreadPool();
68         CountDownLatch latch = new CountDownLatch(SIZE);
69         for (int i = 0; i < 10; i++)
70             exec.execute(new WaitingTask(latch));
71         for (int i = 0; i < SIZE; i++)
72             exec.execute(new TaskPortion(latch));
73         System.out.println("Launched all tasks");
74         exec.shutdownNow();
75     }
76 }
View Code

2. CyclicBarrier

  CyclicBarrier与CountDownLatch功能差不多,不同之处就是可以多次使用,等到计数值变为0时,它会自动重置。而且不需要每个线程都去调用类似countDown()这样的方法,因为每调用一个await(),它就会自动将计数值减1。它使用于这种情况:多个线程并行执行工作,大家一致向前推进,所有线程在这个阶段的工作都完成了(所有的线程都调用了await方法),才能进入下一阶段,而对于那些早完成的线程只能先等待了。下面是一个赛马比赛,每个马可以看作一个线程,等所有的马都达到栅栏后,才能开始新一轮的比赛。

技术分享
  1 package com.dy.xidian;
  2 
  3 import java.util.ArrayList;
  4 import java.util.List;
  5 import java.util.Random;
  6 import java.util.concurrent.BrokenBarrierException;
  7 import java.util.concurrent.CyclicBarrier;
  8 import java.util.concurrent.ExecutorService;
  9 import java.util.concurrent.Executors;
 10 import java.util.concurrent.TimeUnit;
 11 
 12 class Horse implements Runnable {
 13     private static int counter = 0;
 14     private final int id = counter++;
 15     private int strides = 0;
 16     private static Random rand = new Random(47);
 17     private static CyclicBarrier barrier;
 18 
 19     public Horse(CyclicBarrier b) {
 20         barrier = b;
 21     }
 22 
 23     public synchronized int getStriders() {
 24         return strides;
 25     }
 26 
 27     @Override
 28     public void run() {
 29         try {
 30             while (!Thread.interrupted()) {
 31                 synchronized (this) {
 32                     strides += rand.nextInt(3);
 33                 }
 34                 barrier.await();
 35             }
 36         } catch (InterruptedException e) {
 37             // TODO
 38         } catch (BrokenBarrierException e) {
 39             throw new RuntimeException(e);
 40         }
 41     }
 42 
 43     public String toString() {
 44         return "Horse " + id + " ";
 45     }
 46 
 47     public String tracks() {
 48         StringBuilder s = new StringBuilder();
 49         for (int i = 0; i < getStriders(); i++)
 50             s.append("*");
 51         s.append(id);
 52         return s.toString();
 53     }
 54 }
 55 
 56 public class HorseRace {
 57     static final int FINISH_LINE = 75;
 58     private List<Horse> horses = new ArrayList<Horse>();
 59     private ExecutorService exec = Executors.newCachedThreadPool();
 60     private CyclicBarrier barrier;
 61 
 62     public HorseRace(int nHorses, final int pause) {
 63         barrier = new CyclicBarrier(nHorses, new Runnable() {
 64 
 65             @Override
 66             public void run() {
 67                 StringBuilder s = new StringBuilder();
 68                 for (int i = 0; i < FINISH_LINE; i++) {
 69                     s.append("=");
 70                     System.out.println(s);
 71                     for (Horse horse : horses)
 72                         System.out.println(horse.tracks());
 73                     for (Horse horse : horses)
 74                         if (horse.getStriders() >= FINISH_LINE) {
 75                             System.out.println(horse + "won!");
 76                             exec.shutdownNow();
 77                             return;
 78                         }
 79                     try {
 80                         TimeUnit.MILLISECONDS.sleep(pause);
 81                     } catch (InterruptedException e) {
 82                         System.out.println("barrier-action sleep interrupted");
 83                     }
 84                 }
 85             }
 86         });
 87 
 88         for (int i = 0; i < nHorses; i++) {
 89             Horse horse = new Horse(barrier);
 90             horses.add(horse);
 91             exec.execute(horse);
 92         }
 93     }
 94 
 95     public static void main(String[] args) {
 96         int nHorses = 7;
 97         int pause = 200;
 98         if (args.length > 0) {
 99             int n = new Integer(args[0]);
100             nHorses = n > 0 ? n : nHorses;
101         }
102         if (args.length > 1) {
103             int p = new Integer(args[1]);
104             pause = p > -1 ? p : pause;
105         }
106         new HorseRace(nHorses, pause);
107     }
108 }
View Code

运行结果:

=
**0
***1
*2
**3
*4
***5
***6
==
**0
***1
*2
**3
*4
***5
***6
===
**0
***1
*2
**3
*4
***5
***6
====

  运行结果中的==表示栅栏,数字为每个马的编号,*的个数代表每个马目前跑了多少步。在代码我,我们可以看到在创建CyclicBarrier对象时,我们还给他传递了一个复写了Runnable后的对象,这是我CounDownLatch不同的地方。每当计数器的值为0的是时候,里面的该对象中的run方法会被调用。可能有这样一种情况,当计数值再次变为0时,上次的run方法还没执行完,它会不会创建新的线程重新执行run方法呢?通过测试,这种情况是不会发生的,只有等run执行完,才会去创建新的线程。

 3.DelayQueue

  DelayQueue是一个无界的阻塞队列,用于存放实现了Delayed接口的对象,其中的对象只能在其延迟期满才能从队列中取走。该队列的头部是延迟期满后保存时间最长的Delayed元素。如果没有任何延迟期满的对象,那就不会有任何头元素,这时如果使用take()方法从队列获取对象时会发生阻塞,使用poll时会直接返回null。

技术分享
  1 package com.dy.xidian;
  2 
  3 import java.util.ArrayList;
  4 import java.util.List;
  5 import java.util.Random;
  6 import java.util.concurrent.DelayQueue;
  7 import java.util.concurrent.Delayed;
  8 import java.util.concurrent.ExecutorService;
  9 import java.util.concurrent.Executors;
 10 import java.util.concurrent.TimeUnit;
 11 
 12 class DelayedTask implements Runnable, Delayed {
 13     private static int counter = 0;
 14     private final int id = counter++;
 15     private final int delayTime;
 16     private final long trigger;
 17 
 18     protected static List<DelayedTask> sequeue = new ArrayList<DelayedTask>();
 19 
 20     // System.nanoTime()获取当前时间,结果是纳秒级
 21     // TimeUnit.MILLSECONDS.convert(time, TimeUnit.SECONDS)
 22     // 时间转换(一般是大单位转小单位),比如计算1s=多少ms之类的
 23     // time是时间,TimeUnit.SECONDS是原始单位(s),MILLISECONDS是转换后的单位(ms)
 24     public DelayedTask(int delayInMilliseconds) {
 25         delayTime = delayInMilliseconds;
 26         trigger = System.nanoTime()
 27                 + TimeUnit.NANOSECONDS.convert(delayTime, TimeUnit.MILLISECONDS);
 28         sequeue.add(this);
 29     }
 30 
 31     // 重载Delayed接口的getDelay方法,该示例代码给出的是重载的标准形式
 32     @Override
 33     public long getDelay(TimeUnit unit) {
 34         return unit.convert(trigger - System.nanoTime(), TimeUnit.NANOSECONDS);
 35 
 36     }
 37 
 38     //比较每个对象的触发时间,以确定在队列中的位置
 39     @Override
 40     public int compareTo(Delayed arg) {
 41         DelayedTask that = (DelayedTask) arg;
 42         if (trigger < that.trigger)
 43             return -1;
 44         if (trigger > that.trigger)
 45             return 1;
 46         return 0;
 47     }
 48 
 49     @Override
 50     public void run() {
 51         System.out.println(this + " ");
 52     }
 53 
 54     public String toString() {
 55         return String.format("[%1$-4d]", counter) + "Task " + id;
 56     }
 57 
 58     public String summary() {
 59         return "(" + id + ":" + counter + ")";
 60     }
 61 
 62     public static class EndSentinel extends DelayedTask {
 63         private ExecutorService exec;
 64 
 65         public EndSentinel(int delay, ExecutorService e) {
 66             super(delay);
 67             exec = e;
 68         }
 69 
 70         public void run() {
 71             for (DelayedTask pt : sequeue) {
 72                 System.out.println(pt.summary() + " ");
 73             }
 74             System.out.println(this + " Calling shutdownNow()");
 75             exec.shutdownNow();
 76         }
 77     }
 78 }
 79 
 80 class DelayTaskConsumer implements Runnable {
 81     private DelayQueue<DelayedTask> q;
 82     public DelayTaskConsumer(DelayQueue<DelayedTask> q) {
 83         this.q = q;
 84     }
 85 
 86     @Override
 87     public void run() {
 88         try {
 89             while (!Thread.interrupted())
 90                 q.take().run();
 91         } catch (InterruptedException e) {
 92         }
 93         System.out.println("Finised DelayedTaskConsumer!");
 94     }
 95 
 96 }
 97 public class DelayQueueDemo {
 98     public static void main(String[] args) {
 99         Random rand = new Random(47);
100         ExecutorService exec = Executors.newCachedThreadPool();
101         DelayQueue<DelayedTask> queue = new DelayQueue<DelayedTask>();
102         for (int i = 0; i < 20; i++)
103             queue.put(new DelayedTask(rand.nextInt(5000)));
104         queue.add(new DelayedTask.EndSentinel(5000, exec));
105         exec.execute(new DelayTaskConsumer(queue));
106     }
107 }
View Code

运行结果:

技术分享
 1 [128 ]Task 11 
 2 [200 ]Task 7 
 3 [429 ]Task 5 
 4 [520 ]Task 18 
 5 [555 ]Task 1 
 6 [961 ]Task 4 
 7 [998 ]Task 16 
 8 [1207]Task 9 
 9 [1693]Task 2 
10 [1809]Task 14 
11 [1861]Task 3 
12 [2278]Task 15 
13 [3288]Task 10 
14 [3551]Task 12 
15 [4258]Task 0 
16 [4258]Task 19 
17 [4522]Task 8 
18 [4589]Task 13 
19 [4861]Task 17 
20 [4868]Task 6 
21 (0:21) 
22 (1:21) 
23 (2:21) 
24 (3:21) 
25 (4:21) 
26 (5:21) 
27 (6:21) 
28 (7:21) 
29 (8:21) 
30 (9:21) 
31 (10:21) 
32 (11:21) 
33 (12:21) 
34 (13:21) 
35 (14:21) 
36 (15:21) 
37 (16:21) 
38 (17:21) 
39 (18:21) 
40 (19:21) 
41 (20:21) 
42 [5000]Task 20 Calling shutdownNow()
43 Finised DelayedTaskConsumer!
View Code

  该程序创建了20个delayedTask对象,这20对象其实是线程对象,然后将这20对象放入DelayedQueue中,同时将这20个对象加入到list中以表明创建的先后顺序。每个线程的延迟期是通过随机数指定的。在DelayedTask中有一个内部类,该类的作用就是遍历list,输出每个线程的信息(id + 延迟期),最后关闭整个线程。DelayedTaskConsumer就是不断从DelayedQueue中取线程对象,然后让其执行。

  关于Delayed接口的实现这里要强调一下,代码中写的是标准形式。delayTime是延迟期,需要我们指定。trigger表示这个对象的激活时间(比如到11点整时,其延迟期满),其计算方法就是获取当前时间+延迟期。而getDelay(TimeUnit unit)这个函数是个关键,这个函数会被调用两次:第一次查看延期满的时间点和当前时间之差(比如当前时间9点,延迟期满是在11点),发现是正值,对象需要继续等待;第二次查看时发现是负值(比如当前时间已经到了12点了),返回值为负数,说明对象的延迟期已经到了,可以使用了。 

以上是关于并发编程之基础( 四)的主要内容,如果未能解决你的问题,请参考以下文章

java并发之DelayQueue实际运用示例

Java Review - 并发编程_DelayQueue原理&源码剖析

Java Review - 并发编程_DelayQueue原理&源码剖析

并发编程-concurrent指南-阻塞队列-延迟队列DelayQueue

转: Java并发编程之二十一:并发新特性—阻塞队列和阻塞栈(含代码)

JUC并发编程之CompletableFuture基础用法