java多线程笔记--生产者消费组模型
Posted SingleOneMan
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了java多线程笔记--生产者消费组模型相关的知识,希望对你有一定的参考价值。
java多线程笔记–生产者消费者模型
文章目录
参考:
https://www.cnblogs.com/jiangyang/p/6007030.html
http://www.importnew.com/27063.html
https://juejin.im/entry/596343686fb9a06bbd6f888c
1.利用BlockingQueue实现
/**
* @Description: 1)使用阻塞队列BlockingQueue实现生产者消费者模型
* 2)由于操作“出队/入队+日志输出”不是原子的,所以上述日志的绝对顺序与实际的出队/入队顺序有出入
* 3)同一产品的消费一定发生在生产之后
* @Author:
* @Date: 2019/5/27 0:43
*/
public class PCByBlockingQueue
public static void main(String[] args)
Factory factory = new Factory(10);
new Thread(factory.NewProducer()).start();
new Thread(factory.NewProducer()).start();
new Thread(factory.NewConsuner()).start();
new Thread(factory.NewConsuner()).start();
public static class Factory
private final AtomicInteger increNo = new AtomicInteger(0);
private BlockingQueue<Product> resourceQueue;
public Factory(int capacity)
this.resourceQueue = new LinkedBlockingQueue<>(capacity);
public Producer NewProducer()
return new Producer();
public Consuner NewConsuner()
return new Consuner();
private class Product
public int id;
public Product(int id)
this.id = id;
private class Producer implements Runnable
@Override
public void run()
while (true)
try
// 随机延时模拟生产端逻辑耗时(long) (Math.random() * 1000)
Thread.sleep((long) (Math.random() * 1000));
Product product = new Product(increNo.getAndIncrement());
resourceQueue.put(product);
System.out.println("生产者" + Thread.currentThread().getName()
+ "生产资源:" + product.id + ";当前资源池有" + resourceQueue.size() +
"个资源");
catch (InterruptedException e)
e.printStackTrace();
private class Consuner implements Runnable
@Override
public void run()
while (true)
try
Product product = resourceQueue.take();
//模拟服务器处理过程
Thread.sleep(500);
System.out.println("消费者" + Thread.currentThread().getName() +
"消费资源:" + product.id + ";当前资源池有" + resourceQueue.size()
+ "个资源");
catch (InterruptedException e)
e.printStackTrace();
2.利用Wait Notify实现
/**
* @Description: 2)相比于BlockingQueue实现,性能降低,原因BlockingQueue内部使用2个锁,可以生产线程和消费线程并行
* 而lock和wait notify实现都只有一个锁,生产线程和消费线程互斥
* @Author:
* @Date: 2019/5/27 0:42
*/
public class PCbyWaitNotify
public static void main(String[] args)
Factory factory = new Factory(4);
new Thread(factory.NewProducer()).start();
new Thread(factory.NewProducer()).start();
new Thread(factory.NewConsuner()).start();
new Thread(factory.NewConsuner()).start();
public static class Factory
private final AtomicInteger increNo = new AtomicInteger(0);
private final Object STORE_LOCK = new Object();
//用于存储产品
private final Queue<Product> store = new LinkedList<>();
int capacity;
public Factory(int capacity)
this.capacity = capacity;
public Producer NewProducer()
return new Producer();
public Consuner NewConsuner()
return new Consuner();
private class Product
public int id;
public Product(int id)
this.id = id;
private class Producer implements Runnable
@Override
public void run()
while (true)
//模拟生产处理过程
try
Thread.sleep(500);
catch (InterruptedException e)
e.printStackTrace();
synchronized (STORE_LOCK)
//store满了,生产进入阻塞等待,生产线程进入可运行状态,等待cpu调度
try
while (store.size() == capacity)
STORE_LOCK.wait();
Product product = new Product(increNo.getAndIncrement());
store.offer(product);
System.out.println("生产者" + Thread.currentThread().getName()
+ "生产资源:" + product.id);
//唤醒等待的消费者
STORE_LOCK.notifyAll();
catch (InterruptedException e)
e.printStackTrace();
private class Consuner implements Runnable
@Override
public void run()
while (true)
synchronized (STORE_LOCK)
try
//store空了,消费线程进入阻塞等待,消费线程进入可运行状态,等待cpu调度
while (store.size() == 0)
STORE_LOCK.wait();
Product product = store.poll();
//模拟消费处理过程
Thread.sleep(500);
System.out.println("消费组" + Thread.currentThread().getName()
+ "消费资源:" + product.id);
STORE_LOCK.notifyAll();//唤醒等待的生产者
catch (InterruptedException e)
e.printStackTrace();
3.利用Lock 和 Condition实现
/**
* @Description: 1)使用Lock 和 Condition解决生产者消费者问题
* 2)相比于BlockingQueue实现,性能降低,原因BlockingQueue内部使用2个锁,可以生产线程和消费线程并行
* 而lock和wait notify实现都只有一个锁,生产线程和消费线程互斥
* @Author:
* @Date: 2019/5/27 0:42
*/
public class PCByLock
public static void main(String[] args)
Factory factory = new Factory(4);
new Thread(factory.NewProducer()).start();
new Thread(factory.NewProducer()).start();
new Thread(factory.NewConsuner()).start();
new Thread(factory.NewConsuner()).start();
public static class Factory
private final AtomicInteger increNo = new AtomicInteger(0);
//用于存储产品
private final Queue<Product> store = new LinkedList<>();
Lock lock = new ReentrantLock();
Condition producerCondition = lock.newCondition();
Condition consumerCondition = lock.newCondition();
int capacity;
public Factory(int capacity)
this.capacity = capacity;
public Producer NewProducer()
return new Producer();
public Consuner NewConsuner()
return new Consuner();
private class Product
public int id;
public Product(int id)
this.id = id;
private class Producer implements Runnable
@Override
public void run()
while (true)
//模拟生产处理过程
try
Thread.sleep(500);
catch (InterruptedException e)
e.printStackTrace();
lock.lock();
try
//store满了,生产进入阻塞等待,生产线程进入可运行状态,等待cpu调度
while (store.size() == capacity)
producerCondition.await();
Product product = new Product(increNo.getAndIncrement());
store.offer(product);
System.out.println("生产者" + Thread.currentThread().getName()
+ "生产资源:" + product.id);
//唤醒等待的消费者
consumerCondition.signalAll();
catch (InterruptedException e)
e.printStackTrace();
finally
lock.unlock();
private class Consuner implements Runnable
@Override
public void run()
while (true)
lock.lock();
try
//store空了,消费线程进入阻塞等待,消费线程进入可运行状态,等待cpu调度
while (store.size() == 0)
consumerCondition.await();
Product product = store.poll();
//模拟消费处理过程
Thread.sleep(500);
System.out.println("消费组" + Thread.currentThread().getName()
+ "消费资源:" + product.id);
producerCondition.signalAll();//唤醒等待的生产者
catch (InterruptedException e)
e.printStackTrace();
finally
lock.unlock();
4.利用Semaphore实现
**
* @Description: 使用Semaphore实现生产者消费者模型
* @Author:
* @Date: 2019/5/27 0:30
*/
public class PCBySemaphore
public static void main(String[] args)
Factory factory = new Factory(4);
new Thread(factory.NewProducer()).start();
new Thread(factory.NewProducer()).start();
new Thread(factory.NewConsuner()).start();
new Thread(factory.NewConsuner()).start();
public static class Factory
/***
* full 产品容量
* empty 空余容量
* mutex 读写锁
*/
private static Semaphore full, empty, mutex;
private final AtomicInteger increNo = new AtomicInteger(0);
//用于存储产品
private final Queue<Product> store = new LinkedList<>();
int capacity;
public Factory(int capacity)
this.capacity = capacity;
/**
* full 初始化0个产品
* empty 初始化有N个空余位置放置产品
* mutex 初始化每次最多只有一个线程可以读写(生产、消费)
* */
full = new Semaphore(0);
empty = new Semaphore(capacity);
mutex = new Semaphore(1);
public Producer NewProducer()
return new Producer();
public Consumer NewConsuner()
return new Consumer();
private class Product
public int id;
public Product(int id)
this.id = id;
private class Producer implements Runnable
@Override
public void run()
while (true)
try
//模拟生产处理过程
Thread.sleep(500);
empty.acquire();//等待空位
mutex.acquire();//等待读写锁
Product product = new Product(increNo.getAndIncrement());
store.offer(product);
System.out.println("生产者" + Thread.currentThread().getName()
+ "生产资源:" + product.id);
mutex.release();//释放读写锁
full.release();//放置产品
catch (InterruptedException e)
e.printStackTrace();
//消费者类
private class Consumer implements Runnable
@Override
public void run()
while (true)
try
full.acquire();//等待产品
mutex.acquire();//等待读写锁
Product product = store.poll();
//模拟消费处理过程
Thread.sleep(500);
System.out.println("消费组" + Thread.currentThread().getName()
+ "消费资源:" + product.id);
mutex.release();//释放读写锁
empty.release();//释放空位
catch (InterruptedException e)
e.printStackTrace();
以上是关于java多线程笔记--生产者消费组模型的主要内容,如果未能解决你的问题,请参考以下文章