Java生产者消费者模式

Posted 战斗的小白

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Java生产者消费者模式相关的知识,希望对你有一定的参考价值。

为什么要使用生产者和消费者模式

在线程世界里,生产者就是生产数据的线程,消费者就是消费数据的线程。在多线程开发当中,如果生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,才能继续生产数据。同样的道理,如果消费者的处理能力大于生产者,那么消费者就必须等待生产者。为了解决这种生产消费能力不均衡的问题,所以便有了生产者和消费者模式。

什么是生产者消费者模式

生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。

这个阻塞队列就是用来给生产者和消费者解耦的。纵观大多数设计模式,都会找一个第三者出来进行解耦,如工厂模式的第三者是工厂类,模板模式的第三者是模板类。

生产者消费者模式实战

利用BlockingQueue

package com;

import java.util.Random;
import java.util.concurrent.BlockingQueue;

public class Comsumer implements Runnable   {
	private BlockingQueue<PCData> queue;
	private static final int SLEEPTIME = 1000;
	public Comsumer(BlockingQueue<PCData> queue) {
		this.queue = queue;
	}
	@Override
	public void run() {
		System.out.println("start Consumer Id:"+Thread.currentThread().getId());
		Random r = new Random();
		Boolean isrunning = true;
		try {
			while(isrunning){
			PCData data = queue.take();
			if(data != null){
				System.out.println("Comsumer data:"+data);
				Thread.sleep(r.nextInt(SLEEPTIME));
			}
			}
		} catch (InterruptedException e) {
			e.printStackTrace();
            Thread.currentThread().interrupt();
		}
	}
}

  

package com;

import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

public class Producer implements Runnable{
	private volatile boolean isRunning = true;
	//内存缓冲区
	private BlockingQueue<PCData> queue;
	//总数  AtomicInteger
	private static AtomicInteger count = new AtomicInteger();
	private static final int SLEEPTIME = 1000;
	
	public Producer(BlockingQueue<PCData> queue){
		this.queue = queue;
	}
	
	public void run(){
		PCData data = null;
		Random r = new Random();
		System.out.println("start producting id:"+ Thread.currentThread().getId());
		while(isRunning){
			try {
				while(isRunning){
				Thread.sleep(r.nextInt(SLEEPTIME));
				data = new PCData(count.incrementAndGet());
				if(!queue.offer(data,2,TimeUnit.SECONDS)){
					System.out.println("加入队列失败");
				}else{
					System.out.println("Producer data:"+data);
				}
				}
			} catch (InterruptedException e) {
				e.printStackTrace();
				Thread.currentThread().interrupt();
			}
						}
		}
	
	public void stop(){
		isRunning = false;
	}
}
	
	

  

package com;

public class PCData {
	private final int intData;
	public PCData(int d){
		intData = d;
	}
	
	public PCData(String d){
		intData = Integer.valueOf(d);
	}
	
	public int getData(){
		return intData;
	}
	
	@Override
	public String toString(){
		return ""+intData;
	}
}

  

package com;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;

public class Main {
	public static void main(String[] args) throws InterruptedException{
		BlockingQueue<PCData> queue = new LinkedBlockingQueue<>(10); 
		Producer p1 = new Producer(queue);
		Producer p2 = new Producer(queue);
		Producer p3 = new Producer(queue);
		Comsumer c1 = new Comsumer(queue);
		Comsumer c2 = new Comsumer(queue);
		Comsumer c3 = new Comsumer(queue);
		ExecutorService service = Executors.newCachedThreadPool();
		service.execute(p1);
		service.execute(p2);
		service.execute(p3);
		service.execute(c1);
		service.execute(c2);
		service.execute(c3);
		Thread.sleep(10*1000);
		p1.stop();
		p2.stop();
		p3.stop();
		Thread.sleep(3000);
		service.shutdown();
	}
}

  利用notifyAll和wait

package com;

import java.util.List;

public class Consumer implements Runnable{
	private List<PCData> queue;
	public  Consumer(List<PCData> queue){
		this.queue = queue;
	}

	@Override
	public void run() {
	while(true){
		PCData data = null;
		try {
		synchronized (queue) {
			if(queue.size() == 0){
				System.out.println(Thread.currentThread().getId()+"队列为空,无法消费");
				queue.notifyAll();
				queue.wait();
			}else{
				data = queue.remove(0);
				System.out.println(Thread.currentThread().getId()+"消费:"+data);
			}
		}
		Thread.sleep(1000);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	}
	}
}

  

package com;

import java.util.List;
import java.util.Random;

public class Producer implements Runnable {
	private List<PCData> queue;
	private int length;
	
	public Producer(List<PCData> queue,int length){
		this.queue = queue;
		this.length = length;
	}

	@Override
	public void run() {
			while(true){           
				Random r = new Random();
				PCData data = new PCData(r.nextInt(100));
				try {
				synchronized (queue) {
					if(queue.size() >= length){
						System.out.println(Thread.currentThread().getId()+"队列满了,无法加入 ");
						queue.notifyAll();
						queue.wait();
					}else{
						queue.add(data);
						System.out.println(Thread.currentThread().getId()+"生产了:"+data);
					}
				}
				Thread.sleep(1000);
				} catch (InterruptedException e){
					e.printStackTrace();
				}
			}
		
		
	}
	
	
}

  

package com;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class Main {
	public static void main(String[] args){
		List<PCData> queue = new ArrayList<>();
		int length =10;
		Producer p1 = new Producer(queue, length);
		Producer p2 = new Producer(queue, length);
		Producer p3 = new Producer(queue, length);
		Consumer c1 = new Consumer(queue);
		Consumer c2 = new Consumer(queue);
		Consumer c3 = new Consumer(queue);
		ExecutorService service = Executors.newCachedThreadPool();
		service.execute(p1);
		service.execute(p2);
		service.execute(p3);
		service.execute(c1);
		service.execute(c2);
		service.execute(c3);
	}
}

  

 


以上是关于Java生产者消费者模式的主要内容,如果未能解决你的问题,请参考以下文章

Java多线程:生产者消费者模型

java生产者/消费者模式实现——一生产者一消费者(操作值)

Java并发多线程编程——生产者消费者模式示例(阻塞队列版本)

[Java基础]生产者和消费者模式概述与案例分析

SpringCloud系列十一:SpringCloudStream(SpringCloudStream 简介创建消息生产者创建消息消费者自定义消息通道分组与持久化设置 RoutingKey)(代码片段

java——利用生产者消费者模式思想实现简易版handler机制