[Java并发编程实战] 阻塞队列 BlockingQueue(含代码,生产者-消费者模型)
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了[Java并发编程实战] 阻塞队列 BlockingQueue(含代码,生产者-消费者模型)相关的知识,希望对你有一定的参考价值。
见贤思齐焉,见不贤而内自省也。—《论语》
PS: 如果觉得本文有用的话,请帮忙点赞,留言评论支持一下哦,您的支持是我最大的动力!谢谢啦~
Java5.0 增加了两种新的容器类型,它们是指:Queue 和 BlockingQueue。Queue 用来临时保存一组等待处理的元素。BlockingQueue 扩张了 Queue 接口,增加了可阻塞的插入和获取等操作。
BlockingQueue 通常运用于一个线程生产对象放入队列,另一个线程从队列获取对象并消费,这是典型的生产者消费者模型。
这里写图片描述
生产者线程持续生产新对象并插入队列,如果队列已满,那么插入对象的操作会一直阻塞,直到队列中出现可用的空间。
消费者线程持续从队列获取对象,如果队列为空,那么获取操作会一直阻塞,直到队列中出现可用的新对象。BlockingQueue 简化了生产者-消费者设计的实现过程,它支持任意数量的生产者和消费者。
BlockingQueue 的核心方法:
这里写图片描述
offer(E): 向队列插入元素,并返回插入成功与否。本方法不阻塞当前执行线程。
put(E) : 向队列插入元素,如果队列已满,则会阻塞当前线程直至元素加入队列。
take() : 获取队列的首位元素,如果队列为空,则阻塞当前线程直至队列有元素并取走。
poll():获取队列首个元素,指定时间内一旦数据可取,则立即返回;否则返回失败。
remove(E):删除队列中的元素,返回成功与否。
BlockingQueue 的实现
BlockingQueue是一个接口,所以你必须使用它的实现来使用它。它的实现包括以下几个:
ArrayBlockingQueue:基于数组实现的有界队列(FIFO),使用一把全局锁并行对 queue 读写操作,同时使用两个 Condition 阻塞队列为空时的取操作和队列为满时的写操作。
LinkedBlockingQueue:基于已链接节点的,范围上限为 Integer.MAX_VALUE 的 blocking queue(FIFO)。主要操作 put 和 take 都是阻塞的。
DelayQueue:当指定的延迟时间到了,才能够从队列中获取元素。它没有大小限制,因此插入元素时不会阻塞,而只有获取元素时才会被阻塞。它的用法可以参考下面两篇博客:
http://www.cnblogs.com/jobs/archive/2007/04/27/730255.html,
http://www.cnblogs.com/sunzhenchao/p/3515085.htmlPriorityBlockingQueue: 基于优先级的阻塞队列,但需要注意的是PriorityBlockingQueue并不会阻塞数据生产者,而只会在没有可消费的数据时,阻塞数据的消费者。
SynchronoutQueue:它的内部同时只能够容纳单个元素。如果该队列已有一个元素的话,试图向队列插入一个新元素线程会阻塞,知道另一个线程将该元素从队列中拿走。同样,如果该队列为空,试图向队列中抽取一个元素的线程将会阻塞,知道另一个线程向队列中插入一个新的元素。SynchronousQueue适合一对一的匹配场景,没有容量,无法缓存。它的用法强烈推荐博客:
http://www.cnblogs.com/leesf456/p/5560362.html
BlockingQueue的使用
这是一个使用 BlockingQueue 的例子,本例使用 ArrayBlockingQueue 实现。首先,BlockingQueueTest 创建一个生产者线程 Procucer, 把字符存放进共享队列。然后创建三个消费者线程 Consumer,把字符串从队列中取出。Consumer 取到最后一个字符串时,中断所有消费者线程,结束程序。
1import java.util.ArrayList;
2import java.util.Iterator;
3import java.util.List;
4import java.util.Vector;
5import java.util.concurrent.ArrayBlockingQueue;
6import java.util.concurrent.BlockingQueue;
7
8public class BlockingQueueTest {
9 //队列容量
10 private static final int SIZE = 5;
11 private static final int CONSUMER_SIZE = 3;
12 //消费者线程退出标志
13 private static String endString = "num:" + (SIZE*2-1);
14 //存放消费者线程
15 private static List list = new ArrayList<Thread>();
16
17 public static void main(String[] args) throws Exception{
18 //创建固定长度的阻塞队列
19 BlockingQueue q = new ArrayBlockingQueue<String>(SIZE);
20
21 //创建生产者
22 Producer producer = new Producer(q);
23 //启动生产者线程,生产对象
24 producer.start();
25
26 //启动消费者线程,获取队列对象
27 for(int i = 0; i < CONSUMER_SIZE; i++) {
28 list.add(new Consumer(q));
29 ((Thread) list.get(i)).start();
30 }
31 }
32
33 //中断线程
34 public static void shutDownThread() {
35 for(int i = 0; i < CONSUMER_SIZE; i++) {
36 ((Thread) list.get(i)).interrupt();
37 }
38 }
39
40 static class Producer extends Thread{
41
42 private BlockingQueue queue = null;
43
44 public Producer(BlockingQueue q) {
45 this.queue = q;
46 }
47
48 @Override
49 public void run() {
50 // TODO Auto-generated method stub
51 try {
52 //生产10个对象,放进队列
53 for(int i = 0; i < SIZE*2; i++) {
54 String str = "num:" + i;
55 System.out.println(Thread.currentThread().getName() +":"+"IN: " + str);
56 queue.put(str);
57 Thread.sleep(100);
58 }
59 } catch (InterruptedException e) {
60 // TODO Auto-generated catch block
61 e.printStackTrace();
62 }
63
64 }
65 }
66
67 //消费者线程
68 static class Consumer extends Thread{
69
70 private BlockingQueue queue = null;
71
72 public Consumer(BlockingQueue q) {
73 this.queue = q;
74 }
75
76 @Override
77 public void run() {
78 // TODO Auto-generated method stub
79 try {
80 //获取队列的元素,队列为空时会阻塞
81 while(true) {
82 String str = (String) queue.take();
83 System.out.println(Thread.currentThread().getName() + ":" + "OUT " + str);
84 //已经取出最后一个元素,消费者线程应该退出,否则程序一直在运行
85 if(str.equals(endString)) {
86 shutDownThread();//中断线程
87 }
88 }
89 } catch (InterruptedException e) {
90 // TODO Auto-generated catch block
91 e.printStackTrace();
92 }
93 }
94 }
95}
执行结果:
这里写图片描述
总结
java.util.concurrent 中实现的阻塞队列包含了足够的同步机制,从而能够安全的将对象从生产者线程发布到消费者线程。对于可变对象,生产者-消费者模型,把对象的所有权安全的从生产者交付给消费者。转移后,消费者线程获得这个对象的所有权(独占访问权,可以任意修改它),并且生产者线程不会再访问它。同时,阻塞队列负责所有的控制流,使得消费者生产者的代码更加简单和清晰。
本文完毕,如对你有帮助,请关注我,谢谢~
参考
https://blog.csdn.net/defonds/article/details/44021605/
http://www.cnblogs.com/leesf456/p/5428630.html
《并发编程实战》
以上是关于[Java并发编程实战] 阻塞队列 BlockingQueue(含代码,生产者-消费者模型)的主要内容,如果未能解决你的问题,请参考以下文章