java利用多线程实现生产者和消费者功能————美的掉渣的代码

Posted 思思博士

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了java利用多线程实现生产者和消费者功能————美的掉渣的代码相关的知识,希望对你有一定的参考价值。

1.使用wait()/notifyAll实现生产者和消费者

 1 /**
 2  * 锁对象类
 3  * 协作类
 4  */
 5 public class MyQueue {
 6     private Queue<Integer> queue;
 7     private int limit;
 8 
 9     public MyQueue(int limit) {
10         this.queue = new ArrayDeque<>(limit);
11         this.limit = limit;
12     }
13 
14     public synchronized void put(int n) throws InterruptedException {
15         while (queue.size()==limit){
16             wait();
17         }
18         queue.add(n);
19         /**
20          * 通知所有queue上的线程(消费者线程,生产者线程)
21          */
22         notifyAll();
23     }
24     public synchronized int get() throws InterruptedException {
25         while (queue.isEmpty()){
26             wait();
27         }
28         int e=queue.poll();
29         /**
30          * 通知所有queue上的线程(消费者线程,生产者线程)
31          */
32         notifyAll();
33         return e;
34     }
35 }

 

 1 /**
 2  * 生产者线程
 3  */
 4 public class ProduceThread extends Thread {
 5     private MyQueue queue;
 6 
 7     public ProduceThread(MyQueue queue) {
 8         this.queue = queue;
 9     }
10 
11     @Override
12     public void run() {
13         int num=0;
14         while (true){
15             try {
16                 System.out.println("生产数据:" + num);
17                 queue.put(num);
18                 num++;
19                 Thread.sleep((int)(Math.random()*100));
20             } catch (InterruptedException e) {
21                 e.printStackTrace();
22             }
23         }
24     }
25 }

 

 1 /**
 2  * 消费者线程
 3  */
 4 public class ConsumerThread extends Thread {
 5     private MyQueue queue;
 6 
 7     public ConsumerThread(MyQueue queue) {
 8         this.queue = queue;
 9     }
10 
11     @Override
12     public void run() {
13        while (true){
14            try {
15                int res=queue.get();
16                System.out.println("消费数据:"+res);
17                Thread.sleep((int)(Math.random()*100));
18            } catch (InterruptedException e) {
19                e.printStackTrace();
20            }
21        }
22     }
23 }

 

 1 /**
 2  * 主线程
 3  */
 4 public class MainTest {
 5     public static void main(String[] args) {
 6         MyQueue queue=new MyQueue(100);
 7         new ProduceThread(queue).start();
 8         new ConsumerThread(queue).start();
 9     }
10 }

 

2.使用ReentrantLock和condition实现生产者和消费者功能

 1 /**
 2  * 协作类
 3  */
 4 public class MyBlockQueue {
 5     private Queue<Integer> queue=null;
 6     private int limit;
 7     private Lock lock=new ReentrantLock();
 8     private Condition produceCondition =lock.newCondition();
 9     private Condition ConsumerCondition =lock.newCondition();
10 
11     public MyBlockQueue(int limit) {
12         this.limit = limit;
13         queue=new ArrayDeque<>(limit);
14     }
15     public void put(int n) throws InterruptedException {
16         lock.lockInterruptibly();//可中断锁
17         try {
18             while (queue.size()==limit){
19                 produceCondition.await();
20             }
21             queue.add(n);
22             ConsumerCondition.signal();
23         }finally {
24             lock.unlock();
25         }
26     }
27 
28 
29     public int get() throws InterruptedException {
30         lock.lockInterruptibly();
31         try {
32             while (queue.isEmpty()){
33                 ConsumerCondition.await();
34             }
35             int e=queue.poll();
36             produceCondition.signal();
37             return  e;
38         }finally {
39             lock.unlock();
40         }
41     }
42 }

 

 1 /**
 2  * 生产者线程
 3  */
 4 public class ProduceThread extends Thread {
 5     private MyBlockQueue queue;
 6 
 7     public ProduceThread(MyBlockQueue queue) {
 8         this.queue = queue;
 9     }
10 
11     @Override
12     public void run() {
13         int num=0;
14         while (true){
15             try {
16                 System.out.println("生产数据:" + num);
17                 queue.put(num);
18                 num++;
19                 Thread.sleep((int)(Math.random()*100));
20             } catch (InterruptedException e) {
21                 e.printStackTrace();
22             }
23         }
24     }
25 }

 

 

 1 /**
 2  * 消费者线程
 3  */
 4 public class ConsumerThread extends Thread {
 5     private MyBlockQueue queue;
 6 
 7     public ConsumerThread(MyBlockQueue queue) {
 8         this.queue = queue;
 9     }
10 
11     @Override
12     public void run() {
13        while (true){
14            try {
15                int res=queue.get();
16                System.out.println("消费数据:"+res);
17                Thread.sleep((int)(Math.random()*100));
18            } catch (InterruptedException e) {
19                e.printStackTrace();
20            }
21        }
22     }
23 }

 

 1 /**
 2  * 主线程
 3  */
 4 public class MainTest {
 5     public static void main(String[] args) {
 6         MyBlockQueue queue=new MyBlockQueue(100);
 7         new ProduceThread(queue).start();
 8         new ConsumerThread(queue).start();
 9     }
10 }

 

分析:wait/notify只能实现一个条件队列;准备唤醒生产者队列,却把消费者也唤醒了;相互竞争后最终生产者满足条件,开始执行,消费者再次变成等待状态。

而使用显示锁,则可以创建多条等待队列。

 

以上是关于java利用多线程实现生产者和消费者功能————美的掉渣的代码的主要内容,如果未能解决你的问题,请参考以下文章

java多线程笔记--生产者消费组模型

Java多线程15:QueueBlockingQueue以及利用BlockingQueue实现生产者/消费者模型

Java多线程(实现多线程线程同步生产者消费者)

java 多线程并发系列之 生产者消费者模式的两种实现

Java多线程-Lock锁的使用,以及生产者和消费者的实现

java-Runnable加锁实现生产者和消费者的多线程问题