Java并发编程从入门到精通 - 第5章:多线程之间的交互:线程阀

Posted kehuaihan

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Java并发编程从入门到精通 - 第5章:多线程之间的交互:线程阀相关的知识,希望对你有一定的参考价值。

详述:
 线程阀是一种线程与线程之间相互制约和交互的机制;
 作用:http://wsmajunfeng.iteye.com/blog/1629354
阻塞队列BlockingQueue;
数组阻塞队列ArrayBlockingQueue;
链表阻塞队列LinkedBlockingQueue;
优先级阻塞队列PriorityBlockingQueue;
延时队列DelayQueue;
同步队列SynchronousQueue;
链表双向阻塞队列LinkedBlockingDeque;
链表传输队列LinkedTransferQueue;
同步计数器CountDownLatch;
抽象队列化同步器AbstractQueuedSynchroizer;
同步计数器Semaphore;
同步计数器CyclicBarrier;

技术分享图片
 1 /**
 2  * ArrayBlockingQueue的简单用法
 3  */
 4 package thread04;
 5 
 6 import java.util.concurrent.ArrayBlockingQueue;
 7 import java.util.concurrent.BlockingQueue;
 8 
 9 public class ArrayBlockingQueueTest01
10 {
11     public static void main(String[] args) throws InterruptedException
12     {
13         // 新建一个等待队列
14         final BlockingQueue<String> bq = new ArrayBlockingQueue<String>(16);
15         
16         for(int i=0;i<16;i++)
17         {
18             String log = (i+1) + " --> ";
19             bq.put(log);
20         }
21         
22         // 新建四个线程
23         for(int i=0;i<4;i++)
24         {
25             new Thread(new Runnable()
26             {
27                 @Override
28                 public void run()
29                 {
30                     while(true)
31                     {
32                         try
33                         {
34                             String log = bq.take();
35                             parseLog(log);
36                         } 
37                         catch (InterruptedException e)
38                         {
39                             e.printStackTrace();
40                         }
41                     }
42                 }
43             }).start();
44         }
45     }
46     
47     public static void parseLog(String log)
48     {
49         System.out.println(log + System.currentTimeMillis());
50         
51         try
52         {
53             Thread.sleep(1000);
54         } 
55         catch (InterruptedException e)
56         {
57             e.printStackTrace();
58         }
59     }
60 }
ArrayBlockingQueue的简单用法
技术分享图片
 1 /**
 2  * LinkedBlockingQueue的简单用法
 3  */
 4 package thread04;
 5 
 6 import java.util.concurrent.BlockingQueue;
 7 import java.util.concurrent.LinkedBlockingQueue;
 8 
 9 public class LinkedBlockingQueueTest01
10 {
11     public static void main(String[] args)
12     {
13         final BlockingQueue<String> bq = new LinkedBlockingQueue<String>(16);
14         
15         for(int i=0;i<16;i++)
16         {
17             String log = (i+1) + " --> ";
18             try
19             {
20                 bq.put(log);
21             } 
22             catch (InterruptedException e)
23             {
24                 e.printStackTrace();
25             }
26         }
27         
28         for(int i=0;i<4;i++)
29         {
30             new Thread(new Runnable()
31             {
32                 @Override
33                 public void run()
34                 {
35                     while(true)
36                     {
37                         try
38                         {
39                             String log = bq.take();
40                             parseLog(log);
41                         } 
42                         catch (InterruptedException e)
43                         {
44                             e.printStackTrace();
45                         }
46                     }
47                 }
48                 
49             }).start();
50         }
51     }
52     
53     public static void parseLog(String log)
54     {
55         System.out.println(log + System.currentTimeMillis());
56         
57         try
58         {
59             Thread.sleep(1000);
60         } 
61         catch (InterruptedException e)
62         {
63             e.printStackTrace();
64         }
65     }
66 }
LinkedBlockingQueue的简单用法
技术分享图片
 1 /**
 2  * DelayQueue的简单用法
 3  */
 4 package thread04;
 5 
 6 import java.util.concurrent.DelayQueue;
 7 import java.util.concurrent.Delayed;
 8 import java.util.concurrent.TimeUnit;
 9 
10 public class DelayQueueTest01
11 {
12     public static void main(String[] args)
13     {
14         // 新建一个等待队列
15         final DelayQueue<Student> dq = new DelayQueue<Student>();
16         
17         for(int i=0;i<5;i++)
18         {
19             Student student = new Student("学生"+i, Math.round((Math.random()*10+i)));
20             dq.put(student);  // 将数据存到队列里面
21         }
22         
23         // 获取但不移除此队列的头部;如果此队列为空,则返回null
24         System.out.println("dq.peek():" + dq.peek().getName());
25         
26         // 获取并移除此队列的头部,在可从此队列获得到期延迟的元素,获得到达指定的等待时间之前一直等待(如有必要)
27         // poll(long timeout, TimeUnit unit)
28     }
29 }
30 
31 class Student implements Delayed  // 必须实现Delayed接口
32 {
33     private String name;
34     private long submitTime;  // 交卷时间
35     private long workTime;  // 考试时间
36     
37     public String getName()
38     {
39         return this.name + "交卷,用时" + workTime;
40     }
41     
42     public Student(String name, long submitTime)
43     {
44         this.name = name;
45         this.workTime = submitTime;
46         this.submitTime = TimeUnit.NANOSECONDS.convert(submitTime, TimeUnit.MILLISECONDS) + System.nanoTime();
47         System.out.println(this.name + "交卷,用时" + workTime);
48     }
49     
50     @Override
51     // 必须实现compareTo()方法
52     public int compareTo(Delayed o)
53     {
54         // 比较的方法
55         Student that = (Student) o;
56         return submitTime > that.submitTime ? 1 : (submitTime < that.submitTime ? -1 : 0);
57     }
58 
59     @Override
60     // 必须实现getDelay()方法
61     public long getDelay(TimeUnit unit)
62     {
63         // 返回一个延时时间
64         return unit.convert(submitTime - System.nanoTime(), unit.NANOSECONDS);
65     }
66     
67 }
68 
69 /*
70 每次运行结果都不一样,我们获得永远是队列里面的第一个元素
71 */
DelayQueue的简单用法
技术分享图片
 1 /**
 2  * SynchronousQueue的简单用法
 3  */
 4 package thread04;
 5 
 6 import java.util.concurrent.Semaphore;
 7 import java.util.concurrent.SynchronousQueue;
 8 
 9 public class SynchronousQueueTest01
10 {
11     public static void main(String[] args)
12     {
13         System.out.println("begin:" + System.currentTimeMillis() / 1000);
14         
15         final SynchronousQueue<String> sq = new SynchronousQueue<String>();
16         final Semaphore sem = new Semaphore(1);
17         
18         /*
19         // 放在不同的地方,结果是不一样的
20         for(int i=0;i<10;i++)
21         {
22             String input = i + "";
23             try
24             {
25                 sq.put(input);
26             } 
27             catch (InterruptedException e)
28             {
29                 e.printStackTrace();
30             }
31         }
32         */
33         
34         for(int i=0;i<10;i++)
35         {
36             new Thread(new Runnable()
37             {
38                 @Override
39                 public void run()
40                 {
41                     try
42                     {
43                         sem.acquire();
44                         
45                         String input = sq.take();
46                         String output = TestDo.doSome(input);
47                         System.out.println(Thread.currentThread().getName() + ":" + output);
48                         
49                         sem.release();
50                     } 
51                     catch (InterruptedException e)
52                     {
53                         e.printStackTrace();
54                     }
55                     
56                 }
57             }).start();
58         }
59         
60         /*for(int i=0;i<10;i++)
61         {
62             String input = i + "";
63             try
64             {
65                 sq.put(input);
66             } 
67             catch (InterruptedException e)
68             {
69                 e.printStackTrace();
70             }
71         }*/
72         
73     }
74     
75 }
76 
77 class TestDo
78 {
79     public static String doSome(String input)
80     {
81         try
82         {
83             Thread.sleep(1000);
84         } 
85         catch (InterruptedException e)
86         {
87             e.printStackTrace();
88         }
89         
90         String output = input + ":" + System.currentTimeMillis();
91         
92         return output;
93     }
94 }
SynchronousQueue的简单用法
技术分享图片
 1 /**
 2  * LinkedTransferQueue的简单使用
 3  */
 4 package thread04;
 5 
 6 import java.util.Random;
 7 import java.util.concurrent.LinkedTransferQueue;
 8 import java.util.concurrent.TimeUnit;
 9 
10 public class LinkedTransferQueueTest01
11 {
12     public static void main(String[] args)
13     {
14         LinkedTransferQueue<String> queue = new LinkedTransferQueue<String>();
15         
16         Producer p = new Producer(queue);
17         Thread producer = new Thread(p);
18         // 设为守护进程,使得线程执行结束后程序自动结束
19         producer.setDaemon(true);
20         producer.start();
21         
22         for(int i=0;i<10;i++)
23         {
24             Consumer c = new Consumer(queue);
25             Thread consumer = new Thread(c);
26             consumer.setDaemon(true);
27             consumer.start();
28             
29             try
30             {
31                 // 消费者进程休眠1秒,以便生产者获得CPU,从而生产产品
32                 Thread.sleep(1000);
33             } 
34             catch (InterruptedException e)
35             {
36                 e.printStackTrace();
37             }
38         }
39     }
40 }
41 
42 class Consumer implements Runnable
43 {
44     private LinkedTransferQueue<String> queue;
45     
46     public Consumer(LinkedTransferQueue<String> queue)
47     {
48         this.queue = queue;
49     }
50     
51     @Override
52     public void run()
53     {
54         try
55         {
56             System.out.println(" Consumer " + Thread.currentThread().getName() + " " + queue.take());
57         } 
58         catch (InterruptedException e)
59         {
60             e.printStackTrace();
61         }
62     }
63 }
64 
65 class Producer implements Runnable
66 {
67     private LinkedTransferQueue<String> queue;
68     
69     public Producer(LinkedTransferQueue<String> queue)
70     {
71         this.queue = queue;
72     }
73     
74     private String produce()
75     {
76         return " your lucky number " + (new Random().nextInt(1000));
77     }
78     
79     @Override
80     public void run()
81     {
82         try
83         {
84             while(true)
85             {
86                 if(queue.hasWaitingConsumer())
87                 {
88                     queue.transfer(produce());
89                 }
90                 
91                 TimeUnit.SECONDS.sleep(1);
92             }
93         } 
94         catch (Exception e)
95         {
96             e.printStackTrace();
97         }
98     }
99 }
LinkedTransferQueue的简单使用

同步计数器CountDownLatch:
 详解:
  在完成一组正在其他线程中执行的操作之前,允许一个或多个线程一直等待;
  用给定的计数初始化CountDownLatch;由于调用了countDown()方法,所以在当前计数到达0之前,awiait()方法会一直受阻塞(哪个线程中调用了await()方法,哪个线程就被阻塞,一直等待);之后(当前计数到达0),会释放所有等待的线程,await的所有后续调用(线程调用await()方法后续的一些操作)都将立即返回;这种现象只出现一次,因为计数无法被重置;
 主要方法:
  CountDownLatch(int count) :构造一个用给定计数初始化的CountDownLatch实例对象;
  void await():使当前线程在锁存器倒计数至0之前一直等待,除非线程被中断;
  boolean await(long timeout, TimeUnit unit) :使当前线程在锁存器倒计数至0之前一直等待,除非线程被中断或超出了指定的等待时间;
  void countDown():递减锁存器的计数,如果计数到达0,则释放所有等待的线程;
  long getCount():返回当前计数;
 应用场景:
  在一些应用场合中,需要等待某个条件到达要求后才能做后面的事情;同时当线程都完成后也会触发事件,以便进行后面的操作;

技术分享图片
 1 /**
 2  * CountDownLatch的简单使用
 3  */
 4 package thread04;
 5 
 6 import java.util.concurrent.CountDownLatch;
 7 
 8 public class CountDownLatchTest01
 9 {
10     private static CountDownLatch latch = new CountDownLatch(3);
11     
12     public static void main(String[] args)
13     {
14         Worker worker1 = new Worker("张三 程序员1", latch);
15         Worker worker2 = new Worker("李四  程序员2", latch);
16         Worker worker3 = new Worker("王五  程序员3", latch);
17         
18         Thread t1 = new Thread(worker1);
19         Thread t2 = new Thread(worker2);
20         Thread t3 = new Thread(worker3);
21         
22         t1.start();
23         t2.start();
24         t3.start();
25         
26         try
27         {
28             latch.await();
29         } catch (InterruptedException e)
30         {
31             e.printStackTrace();
32         }
33         
34         System.out.println("Main is end!");
35     }
36 }
37 
38 class Worker implements Runnable
39 {
40     private String workerName;
41     private CountDownLatch latch;
42     
43     public Worker(String workerName, CountDownLatch latch)
44     {
45         this.workerName = workerName;
46         this.latch = latch;
47     }
48     
49     @Override
50     public void run()
51     {
52         try
53         {
54             System.out.println("Worker " + workerName + " is begin!");
55             Thread.sleep(1000L);
56             System.out.println("Worker " + workerName + " is end!");
57             
58         } catch (InterruptedException e)
59         {
60             e.printStackTrace();
61         }
62         latch.countDown();
63     }
64 }
CountDownLatch的简单使用

信号量Semaphore:
 详解:
  通过对信号量的不同操作,可以分别实现进程间的互斥与同步;
  Semaphore可以控制某个资源被同时访问的任务数(维护了一个许可集合),它通过acquire()获取一个许可,release()释放一个许可;如果被同时访问的任务数已满,则其他acquire的任务进入等待状态,直到有一个任务被release掉,它才能得到许可;
 使用场景:
  排队场景,资源有限的房间,资源有限的群等等,常见的实际应用场景包括线程池、连接池等;

技术分享图片
 1 /**
 2  * Semaphore的简单使用
 3  */
 4 package thread04;
 5 
 6 import java.util.concurrent.ExecutorService;
 7 import java.util.concurrent.Executors;
 8 import java.util.concurrent.Semaphore;
 9 import java.util.concurrent.locks.ReentrantLock;
10 
11 public class SemaphoreTest01
12 {
13     public static void main(String[] args)
14     {
15         ExecutorService es = Executors.newCachedThreadPool();
16         
17         final Semaphore sh = new Semaphore(5);
18         
19         ReentrantLock rl = new ReentrantLock();
20         
21         for(int i=0;i<10;i++)
22         {
23             final int num = i;
24             
25             Runnable run = new Runnable()
26             {
27                 @Override
28                 public void run()
29                 {
30                     // rl.lock();
31                     try
32                     {
33                         sh.acquire();
34                         System.out.println("线程 " + Thread.currentThread().getName() + " 获得许可:" + num);
35                         for(int i=0;i<999999;i++);
36                         sh.release();
37                         System.out.println("线程 " + Thread.currentThread().getName() + "释放许可:" + num);
38                         System.out.println("当前允许进入的任务个数: " + sh.availablePermits());
39                     }
40                     catch (InterruptedException e)
41                     {
42                         e.printStackTrace();
43                     }
44                     finally
45                     {
46                         // rl.unlock();
47                     }
48                 }
49             };
50             es.execute(run);
51         }
52         
53         es.shutdown();
54     }
55 }
Semaphore的简单使用

障碍器CyclicBarrier:
 详解:
  又叫同步计数器;
 使用场景:
  你希望创建一组任务,它们并发地执行工作,另外的一个任务在这一组任务并发执行结束前一直阻塞等待,直到该组任务全部执行结束,这个任务才得以执行;

技术分享图片
 1 /**
 2  * CyclicBarrier的简单使用
 3  */
 4 package thread04;
 5 
 6 import java.util.concurrent.BrokenBarrierException;
 7 import java.util.concurrent.CyclicBarrier;
 8 
 9 public class CyclicBarrierTest01
10 {
11     public static void main(String[] args)
12     {
13         // 创建CyclicBarrier对象,并设置执行完一组5个线程的并发任务后,再执行MainTask任务 
14         CyclicBarrier cb = new CyclicBarrier(5, new MainTask());
15         
16         SubTask sb1 = new SubTask("A", cb);
17         SubTask sb2 = new SubTask("B", cb);
18         SubTask sb3 = new SubTask("C", cb);
19         SubTask sb4 = new SubTask("D", cb);
20         SubTask sb5 = new SubTask("E", cb);
21         
22         new Thread(sb1).start();
23         new Thread(sb2).start();
24         new Thread(sb3).start();
25         new Thread(sb4).start();
26         new Thread(sb5).start();
27         
28     }
29 }
30 
31 /**
32  * 最后要执行的任务
33  * @author Administrator
34  *
35  */
36 class MainTask implements Runnable
37 {
38     @Override
39     public void run()
40     {
41         System.out.println("前面的并发任务全部执行完毕后,开始执行最后任务...");
42     }
43 }
44 
45 /**
46  * 一组并发任务
47  * @author Administrator
48  *
49  */
50 class SubTask implements Runnable
51 {
52     private String name;
53     private CyclicBarrier cb;
54     
55     public SubTask(String name, CyclicBarrier cb)
56     {
57         this.name = name;
58         this.cb = cb;
59     }
60     
61     @Override
62     public void run()
63     {
64         System.out.println("并发任务 " + name + " 开始执行...");
65         for(int i=0;i<999999;i++);
66         System.out.println("并发任务 " + name + " 执行完毕,通知障碍器...");
67         
68         try
69         {
70             // 每执行完一项任务就通知障碍器
71             cb.await();
72         }
73         catch (InterruptedException | BrokenBarrierException e)
74         {
75             e.printStackTrace();
76         }
77     }
78 
79     public String getName()
80     {
81         return name;
82     }
83     public void setName(String name)
84     {
85         this.name = name;
86     }
87     public CyclicBarrier getCb()
88     {
89         return cb;
90     }
91     public void setCb(CyclicBarrier cb)
92     {
93         this.cb = cb;
94     }
95 }
CyclicBarrier的简单使用


































以上是关于Java并发编程从入门到精通 - 第5章:多线程之间的交互:线程阀的主要内容,如果未能解决你的问题,请参考以下文章

Java并发编程从入门到精通 - 第6章:线程池

Java并发编程从入门到精通-总纲

Java并发编程从入门到精通 张振华.Jack --我的书

Java并发编程从入门到精通 - 第2章:认识Thread

Java并发编程从入门到精通 - 第7章:Fork/Join框架

2018年java架构师分布式性能优化 附带源码