实现生产者消费者模式的方法
Posted zgq25302111
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了实现生产者消费者模式的方法相关的知识,希望对你有一定的参考价值。
1、使用 BlockingQueue
ArrayBlockingQueue 完成了很多工作,比如队列满了就去阻塞生产者线程,队列有空就去唤醒生产者线程等。
import java.util.concurrent.*; public class MyProdCons { public static void main(String[] args){ BlockingQueue<Object> queue = new ArrayBlockingQueue<>(10); Runnable producer = () ->{ int num1 = 0; while(true) { try { queue.put(new Object()); System.out.println("生产个数: " + num1++); }catch(InterruptedException e){ e.printStackTrace(); } } }; new Thread(producer).start(); new Thread(producer).start(); Runnable consumer = () ->{ int num2 = 0; while(true) { try { queue.take(); System.out.println("消费个数: " + num2++); }catch(InterruptedException e) { e.printStackTrace(); } } }; new Thread(consumer).start(); new Thread(consumer).start(); } }
2、使用 Condition
import java.util.*; import java.util.concurrent.*; import java.util.concurrent.locks.*; public class MyBlockingQueueForCondition { private Queue<Object> queue; private int max = 16; private ReentrantLock lock = new ReentrantLock(); private Condition notEmpty = lock.newCondition(); private Condition notFull = lock.newCondition(); public MyBlockingQueueForCondition(int size) { this.max = size; queue = new LinkedList(); } public void put(Object o) throws InterruptedException{ lock.lock(); try { while(queue.size() == max) { notFull.await(); } queue.add(o); notEmpty.signalAll(); }finally{ lock.unlock(); } } public Object take() throws InterruptedException{ lock.lock(); try { while(queue.size() == 0) { notEmpty.await(); } Object item = queue.remove(); notFull.signalAll(); return item; }finally { lock.unlock(); } } }
3、使用 wait/notify
public class WaitStyle { public static void main(String[] args) { MyBlockingQueue myBlockingQueue = new MyBlockingQueue(5); Producer producer = new Producer(myBlockingQueue); Consumer consumer = new Consumer(myBlockingQueue); new Thread(producer).start(); new Thread(consumer).start(); } } class Producer implements Runnable{ private MyBlockingQueue storage; public Producer(MyBlockingQueue storage) { this.storage = storage; } public void run() { for(int i=0;i<10;i++) { try { storage.put(); }catch(InterruptedException e) { e.printStackTrace(); } } } } class Consumer implements Runnable{ private MyBlockingQueue storage; public Consumer(MyBlockingQueue storage) { this.storage = storage; } public void run() { for(int i=0;i<10;i++) { try { storage.take(); }catch(InterruptedException e) { e.printStackTrace(); } } } } import java.util.*; public class MyBlockingQueue { private int maxSize; private LinkedList<Object> storage; public MyBlockingQueue(int size) { this.maxSize = size; storage = new LinkedList<>(); } public synchronized void put() throws InterruptedException{ while(storage.size() == maxSize) { wait(); } storage.add(new Object()); System.out.println("放入后个数 " + storage.size()); notifyAll(); } public synchronized void take() throws InterruptedException{ while(storage.size() == 0) { wait(); } //System.out.println(storage.remove()); storage.remove(); System.out.println("取出后个数 " + storage.size()); notifyAll(); } }
以上是三种实现生产者消费者模式的方法,其中,第一种 BlockingQueue 模式实现比较简单,但其背后的实现原理在第二种、第三种实现方法中得以体现,第二种、第三种实现方法本质上是我们自己实现了 BlockingQueue 的一些核心逻辑,供生产者与消费者使用。
ref:
拉勾课堂 徐隆曦 《Java 并发编程 78 讲》
以上是关于实现生产者消费者模式的方法的主要内容,如果未能解决你的问题,请参考以下文章
SpringCloud系列十一:SpringCloudStream(SpringCloudStream 简介创建消息生产者创建消息消费者自定义消息通道分组与持久化设置 RoutingKey)(代码片段