生产者/消费者模式(阻塞队列)
Posted 学无止境
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了生产者/消费者模式(阻塞队列)相关的知识,希望对你有一定的参考价值。
生产消费者模式
貌似也是阻塞的问题
花了一些时间终于弄明白这个鸟东东,以前还以为是不复杂的一个东西的,以前一直以为和观察者模式差不多(其实也是差不多的,呵呵),生产消费者模式应该是可以通过观察者模式来实现的,对于在什么环境下使用现在想的还不是特别清楚,主要是在实际中还没使用过这个。
需要使用到同步,以及线程,属于多并发行列,和观察者模式的差异也就在于此吧,所以实现起来也主要在这里的差异。
参考地址:http://blog.csdn.net/program_think/
在实际的软件开发过程中,经常会碰到如下场景:某个模块负责产生数据,这些数据由另一个模块来负责处理(此处的模块是广义的,可以是类、函数、线程、进程等)。产生数据的模块,就形象地称为生产者;而处理数据的模块,就称为消费者。
单单抽象出生产者和消费者,还够不上是生产者/消费者模式。该模式还需要有一个缓冲区处于生产者和消费者之间,作为一个中介。生产者把数据放入缓冲区,而消费者从缓冲区取出数据
◇解耦
假设生产者和消费者分别是两个类。如果让生产者直接调用消费者的某个方法,那么生产者对于消费者就会产生依赖(也就是耦合)。将来如果消费者的代码发生变化,可能会影响到生产者。而如果两者都依赖于某个缓冲区,两者之间不直接依赖,耦合也就相应降低了。
◇支持并发(concurrency)
生产者直接调用消费者的某个方法,还有另一个弊端。由于函数调用是同步的(或者叫阻塞的),在消费者的方法没有返回之前,生产者只好一直等在那边。万一消费者处理数据很慢,生产者就会白白糟蹋大好时光。
使用了生产者/消费者模式之后,生产者和消费者可以是两个独立的并发主体(常见并发类型有进程和线程两种,后面的帖子会讲两种并发类型下的应用)。生产者把制造出来的数据往缓冲区一丢,就可以再去生产下一个数据。基本上不用依赖消费者的处理速度。其实当初这个模式,主要就是用来处理并发问题的。
◇支持忙闲不均
缓冲区还有另一个好处。如果制造数据的速度时快时慢,缓冲区的好处就体现出来了。当数据制造快的时候,消费者来不及处理,未处理的数据可以暂时存在缓冲区中。等生产者的制造速度慢下来,消费者再慢慢处理掉。
用了两种方式实现了一下这个模式,主要参考了网上的一些例子才弄明白,这里对队列的实现有很多种方法,需要和具体的应用相结合吧,队列缓冲区很简单,现在已有大量的实现,缺点是在性能上面(内存分配的开销和同步/互斥的开销),下面的实现都是这种方式;环形缓冲区(减少了内存分配的开销),双缓冲区(减少了同步/互斥的开销)。
第一个例子是使用的信号量的东东,没有执行具体的东西,只是实现了这个例子,要做复杂的业务逻辑的话需要自己在某些方法内去具体实现
代码如下:
消费者:
- public class TestConsumer implements Runnable {
- TestQueue obj;
- public TestConsumer(TestQueue tq){
- this.obj=tq;
- }
- public void run() {
- try {
- for(int i=0;i<10;i++){
- obj.consumer();
- }
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- }
生产者:
- public class TestProduct implements Runnable {
- TestQueue obj;
- public TestProduct(TestQueue tq){
- this.obj=tq;
- }
- public void run() {
- for(int i=0;i<10;i++){
- try {
- obj.product("test"+i);
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- }
- }
队列(使用了信号量,采用synchronized进行同步,采用lock进行同步会出错,或许是还不知道实现的方法):
- public static Object signal=new Object();
- boolean bFull=false;
- private List thingsList=new ArrayList();
- private final ReentrantLock lock = new ReentrantLock(true);
- BlockingQueue q = new ArrayBlockingQueue(10);
- /**
- * 生产
- * @param thing
- * @throws Exception
- */
- public void product(String thing) throws Exception{
- synchronized(signal){
- if(!bFull){
- bFull=true;
- //产生一些东西,放到 thingsList 共享资源中
- System.out.println("product");
- System.out.println("仓库已满,正等待消费...");
- thingsList.add(thing);
- signal.notify(); //然后通知消费者
- }
- signal.wait(); // 然后自己进入signal待召队列
- }
- }
- /**
- * 消费
- * @return
- * @throws Exception
- */
- public String consumer()throws Exception{
- synchronized(signal){
- if(!bFull) {
- signal.wait(); // 进入signal待召队列,等待生产者的通知
- }
- bFull=false;
- // 读取buf 共享资源里面的东西
- System.out.println("consume");
- System.out.println("仓库已空,正等待生产...");
- signal.notify(); // 然后通知生产者
- }
- String result="";
- if(thingsList.size()>0){
- result=thingsList.get(thingsList.size()-1).toString();
- thingsList.remove(thingsList.size()-1);
- }
- return result;
- }
测试代码:
- public class TestMain {
- public static void main(String[] args) throws Exception{
- TestQueue tq=new TestQueue();
- TestProduct tp=new TestProduct(tq);
- TestConsumer tc=new TestConsumer(tq);
- Thread t1=new Thread(tp);
- Thread t2=new Thread(tc);
- t1.start();
- t2.start();
- }
- }
运行结果:
- product
- 仓库已满,正等待消费...
- consume
- 仓库已空,正等待生产...
- product
- 仓库已满,正等待消费...
- consume
- 仓库已空,正等待生产...
- product
- 仓库已满,正等待消费...
- consume
- 仓库已空,正等待生产...
- product
- 仓库已满,正等待消费...
- consume
- 仓库已空,正等待生产...
- product
- 仓库已满,正等待消费...
- consume
- 仓库已空,正等待生产...
- product
- 仓库已满,正等待消费...
- consume
- 仓库已空,正等待生产...
- product
- 仓库已满,正等待消费...
- consume
- 仓库已空,正等待生产...
- product
- 仓库已满,正等待消费...
- consume
- 仓库已空,正等待生产...
- product
- 仓库已满,正等待消费...
- consume
- 仓库已空,正等待生产...
- product
- 仓库已满,正等待消费...
- consume
- 仓库已空,正等待生产...
第二种发放使用java.util.concurrent.BlockingQueue类来重写的队列那个类,使用这个方法比较简单,并且性能上也没有什么问题。
这是jdk里面的例子
- * class Producer implements Runnable {
- * private final BlockingQueue queue;
- * Producer(BlockingQueue q) { queue = q; }
- * public void run() {
- * try {
- * while(true) { queue.put(produce()); }
- * } catch (InterruptedException ex) { ... handle ...}
- * }
- * Object produce() { ... }
- * }
- *
- * class Consumer implements Runnable {
- * private final BlockingQueue queue;
- * Consumer(BlockingQueue q) { queue = q; }
- * public void run() {
- * try {
- * while(true) { consume(queue.take()); }
- * } catch (InterruptedException ex) { ... handle ...}
- * }
- * void consume(Object x) { ... }
- * }
- *
- * class Setup {
- * void main() {
- * BlockingQueue q = new SomeQueueImplementation();
- * Producer p = new Producer(q);
- * Consumer c1 = new Consumer(q);
- * Consumer c2 = new Consumer(q);
- * new Thread(p).start();
- * new Thread(c1).start();
- * new Thread(c2).start();
- * }
- * }
jdk1.5以上的一个实现,使用了Lock以及条件变量等东西
- class BoundedBuffer {
- final Lock lock = new ReentrantLock();
- final Condition notFull = lock.newCondition();
- final Condition notEmpty = lock.newCondition();
- final Object[] items = new Object[100];
- int putptr, takeptr, count;
- public void put(Object x) throws InterruptedException {
- lock.lock();
- try {
- while (count == items.length)
- notFull.await();
- items[putptr] = x;
- if (++putptr == items.length) putptr = 0;
- ++count;
- notEmpty.signal();
- } finally {
- lock.unlock();
- }
- }
- public Object take() throws InterruptedException {
- lock.lock();
- try {
- while (count == 0)
- notEmpty.await();
- Object x = items[takeptr];
- if (++takeptr == items.length) takeptr = 0;
- --count;
- notFull.signal();
- return x;
- } finally {
- lock.unlock();
- }
- }
- }
以上是关于生产者/消费者模式(阻塞队列)的主要内容,如果未能解决你的问题,请参考以下文章