一、多线程下生产者消费者模式

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了一、多线程下生产者消费者模式相关的知识,希望对你有一定的参考价值。

参考技术A 我们先来看看什么是生产者消费者模式,生产者消费者模式是程序设计中非常常见的一种设计模式,被广泛运用在解耦、消息队列等场景。在现实世界中,我们把生产商品的一方称为生产者,把消费商品的一方称为消费者,有时生产者的生产速度特别快,但消费者的消费速度跟不上,俗称“产能过剩”,又或是多个生产者对应多个消费者时,大家可能会手忙脚乱。如何才能让大家更好地配合呢?这时在生产者和消费者之间就需要一个中介来进行调度,于是便诞生了生产者消费者模式。

使用生产者消费者模式通常需要在两者之间增加一个阻塞队列作为媒介,有了媒介之后就相当于有了一个缓冲,平衡了两者的能力,整体的设计如图所示,最上面是阻塞队列,右侧的 1 是生产者线程,生产者在生产数据后将数据存放在阻塞队列中,左侧的 2 是消费者线程,消费者获取阻塞队列中的数据。而中间的 3 和 4 分别代表生产者消费者之间互相通信的过程,因为无论阻塞队列是满还是空都可能会产生阻塞,阻塞之后就需要在合适的时机去唤醒被阻塞的线程。

那么什么时候阻塞线程需要被唤醒呢?有两种情况。第一种情况是当消费者看到阻塞队列为空时,开始进入等待,这时生产者一旦往队列中放入数据,就会通知所有的消费者,唤醒阻塞的消费者线程。另一种情况是如果生产者发现队列已经满了,也会被阻塞,而一旦消费者获取数据之后就相当于队列空了一个位置,这时消费者就会通知所有正在阻塞的生产者进行生产,这便是对生产者消费者模式的简单介绍。

BlockingQueue 实现生产者消费者模式看似简单,背后却暗藏玄机,我们在掌握这种方法的基础上仍需要掌握更复杂的实现方法。我们接下来看如何在掌握了 BlockingQueue 的基础上利用 Condition 实现生产者消费者模式,它们背后的实现原理非常相似,相当于我们自己实现一个简易版的 BlockingQueue:

wait/notify 实现生产者消费者模式的方法,实际上实现原理和Condition 是非常类似的,它们是兄弟关系:

多线程生产者消费者模式中,如何停止消费者?多生产者情况下对“毒丸”策略的应用。

生产者、消费者模式是多线程中的经典问题。通过中间的缓冲队列,使得生产者和消费者的速度可以相互调节。


对于比较常见的单生产者、多消费者的情况,主要有以下两种策略:

  1. 通过volatile boolean producerDone =false 来标示是否完成。生产者结束后标示为true, 消费者轮询这个变量来决定自己是否退出。 这种方式对producerDone产生比较大的争用,实现起来也有诸多问题需要考虑。

  2. 比较经典的“毒丸”策略,生产者结束后,把一个特别的对象:“毒丸”对象放入队列。消费者从队列中拿到对象后,判断是否是毒丸对象。如果是普通非毒丸对象,则正常消费。如果是毒丸对象,则放回队列(杀死其他消费者),然后结束自己。这种方式不会对结束状态产生争用,是比较好的方式。


由于“毒丸”策略是在单生产者多消费者情况下的。对于多生产者的情况,需要对之进行一些修改。我的想法是这样的。用Countdownlatch作为生产者计数器。所有生产者结束后,由协调者放入毒丸对象,消费者退出过程是一样的。上代码:


Coordinator: 启动生产者消费者,提供队列、计数器。生产者全部结束后,放入毒丸。

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;

public class Coordinator {
	public static final Object POISON_PILL = new Object();//special object to kill consumers
	private int productCount = 3;
	private int consumerCount = 5;

	public void startAll() throws Exception{
		BlockingQueue<Object> queue = new ArrayBlockingQueue<Object>(20);
		CountDownLatch noMoreToProduce = new CountDownLatch(productCount);
		//start consumers;
		for(int i = 0; i < consumerCount; i++){
			new Thread(new Consumer("consumer " + i, queue)).start();
		}
		//start producers;
		for(int i = 0; i < productCount; i++){
			new Thread(new Producer("producer " + i, queue, noMoreToProduce)).start();
		}
		//wait until all producer down
		noMoreToProduce.await();
		System.out.println("All producer finished, putting POISON_PILL to the queue to stop consumers!");
		//put poison pill
		queue.put(POISON_PILL);
	}
	
	public static void main(String[] args) throws Exception{
		new Coordinator().startAll();
	}
}


Producer: 随机生产和结束,结束前使countdownlatch + 1

import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;

public class Producer implements Runnable {
	private String name;
	private CountDownLatch noMoreToProduce;
	private BlockingQueue<Object> queue;
	private Random random = new Random();
	
	
	public Producer(String name, BlockingQueue<Object> queue, CountDownLatch noMoreToProduce){
		this.name = name;
		this.queue = queue;
		this.noMoreToProduce = noMoreToProduce;
	}

	@Override
	public void run() {
		System.out.println(name + " started.");
		try {
			while (true) {
				Object item = randomProduce();
				if (item == null) {
					break; //break if no more item
				}
				queue.put(item);
				System.out.println(name + " produced one.");
			}
		} catch (InterruptedException e) {
			//log
		} finally{
			System.out.println(name + " finished.");
			noMoreToProduce.countDown();//count down to signal "I finished."
		}
	}

	private Object randomProduce() {
		if (random.nextBoolean()) {
			return new Object();
		}
		return null;
	}
}


Consumer: 判断毒丸对象。如果是毒丸,放回队列(杀死其他消费者),然后自己退出。

import java.util.concurrent.BlockingQueue;

public class Consumer implements Runnable {
	private String name;
	private BlockingQueue<Object> queue;
	
	public Consumer(String name, BlockingQueue<Object> queue){
		this.name = name;
		this.queue = queue;
	}

	@Override
	public void run() {
		try {
			System.out.println(name + " started.");
			while (true) {
				Object item = queue.take();
				//poison pill processing
				if (item == Coordinator.POISON_PILL) {
					queue.put(item);//put back to kill others
					System.out.println(name + " finished");
					break;
				}
				item = null;//pretend to consume the item;
				System.out.println(name + " consumed one");
			}
		} catch (InterruptedException e) {
			
		} 
	}
}


执行结果:

consumer 0 started.

consumer 4 started.

consumer 3 started.

consumer 2 started.

consumer 1 started.

producer 0 started.

producer 1 started.

producer 0 finished.

producer 1 produced one.

producer 2 started.

producer 1 produced one.

producer 1 finished.

consumer 3 consumed one

consumer 4 consumed one

consumer 0 consumed one

producer 2 produced one.

producer 2 produced one.

producer 2 produced one.

consumer 1 consumed one

consumer 2 consumed one

producer 2 finished.

All producer finished, putting POISON_PILL to the queue to stop consumers!

consumer 3 finished

consumer 4 finished

consumer 0 finished

consumer 2 finished

consumer 1 finished


本文出自 “ThinkDifferently” 博客,转载请与作者联系!

以上是关于一、多线程下生产者消费者模式的主要内容,如果未能解决你的问题,请参考以下文章

多线程生产者消费者模式中,如何停止消费者?多生产者情况下对“毒丸”策略的应用。

多线程:多线程设计模式:生产者-消费模式

多线程

用阻塞队列实现生产者消费者模式二(多线程消费)

Java多线程-同步:synchronized 和线程通信:生产者消费者模式

Java多线程:生产者消费者模型