多线程生产者/消费者模式实现

Posted qf123

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了多线程生产者/消费者模式实现相关的知识,希望对你有一定的参考价值。

参考书籍《java多线程编程核心技术》

都是基于wait/notify实现的

一个生产者和一个消费者:操作值

1 package com.qf.test10.pojo;
2 
3 /**
4  * @author qf
5  * @create 2018-09-18 15:59
6  */
7 public class Entity {
8     public static String value = "";
9 }
 1 package com.qf.test10;
 2 
 3 import com.qf.test10.pojo.Entity;
 4 
 5 /**
 6  * @author qf
 7  * @create 2018-09-18 15:52
 8  * 生产者类
 9  */
10 public class Producer {
11     private String lock;
12 
13     public Producer(String lock) {
14         this.lock = lock;
15     }
16 
17     public void setValue(){
18         try {
19             synchronized (lock){
20                 if(!Entity.value.equals("")){
21                     lock.wait();
22                 }
23                 String value = System.currentTimeMillis()+"_"+System.nanoTime();
24                 System.out.println("set的值是"+value);
25                 Entity.value = value;
26                 lock.notify();
27             }
28         } catch (InterruptedException e) {
29             e.printStackTrace();
30         }
31     }
32 }
 1 package com.qf.test10;
 2 
 3 import com.qf.test10.pojo.Entity;
 4 
 5 /**
 6  * @author qf
 7  * @create 2018-09-18 15:52
 8  * 消费者类
 9  */
10 public class Consumer {
11     private String lock;
12 
13     public Consumer(String lock) {
14         this.lock = lock;
15     }
16 
17     public void getValue(){
18         try {
19             synchronized (lock){
20                 if(Entity.value.equals("")){
21                     lock.wait();
22                 }
23                 System.out.println("get的值"+Entity.value);
24                 Entity.value = "";
25                 lock.notify();
26             }
27         } catch (InterruptedException e) {
28             e.printStackTrace();
29         }
30     }
31 }

线程类

 1 package com.qf.test10.thread;
 2 
 3 import com.qf.test10.Producer;
 4 
 5 /**
 6  * @author qf
 7  * @create 2018-09-18 16:08
 8  */
 9 public class ThreadP extends Thread {
10     private Producer producer;
11 
12     public ThreadP(Producer producer) {
13         this.producer = producer;
14     }
15 
16     @Override
17     public void run() {
18         while(true) {
19             producer.setValue();
20         }
21     }
22 }
 1 package com.qf.test10.thread;
 2 
 3 import com.qf.test10.Consumer;
 4 
 5 /**
 6  * @author qf
 7  * @create 2018-09-18 16:11
 8  */
 9 public class ThreadC extends Thread {
10     private Consumer consumer;
11 
12     public ThreadC(Consumer consumer) {
13         this.consumer = consumer;
14     }
15 
16     @Override
17     public void run() {
18         while (true) {
19             consumer.getValue();
20         }
21     }
22 }

测试运行

 1 package com.qf.test10;
 2 
 3 import com.qf.test10.thread.ThreadC;
 4 import com.qf.test10.thread.ThreadP;
 5 
 6 /**
 7  * @author qf
 8  * @create 2018-09-18 16:12
 9  */
10 public class Run {
11     public static void main(String[] args) {
12         String lock = new String("");
13         Producer p = new Producer(lock);
14         Consumer c = new Consumer(lock);
15         ThreadP tp = new ThreadP(p);
16         tp.start();
17         ThreadC tc = new ThreadC(c);
18         tc.start();
19     }
20 }

打印输出

set的值是1537259244097_800479975994656
get的值1537259244097_800479975994656
set的值是1537259244097_800479976020503
get的值1537259244097_800479976020503
set的值是1537259244097_800479976042246
get的值1537259244097_800479976042246
set的值是1537259244097_800479976062349
get的值1537259244097_800479976062349
set的值是1537259244097_800479976083272
get的值1537259244097_800479976083272
set的值是1537259244097_800479976103785
get的值1537259244097_800479976103785
set的值是1537259244097_800479976124298
get的值1537259244097_800479976124298
set的值是1537259244097_800479976144400
get的值1537259244097_800479976144400
.............

如果以此为基础,设计多个生产者和多个消费者,那么运行过程中很可能会发生假死的情况,也就是所有线程都呈现等待的状态

多个生产者与多个消费者:操作值

修改Producer.java,Consumer.java以及测试类

 1 package com.qf.test10;
 2 
 3 import com.qf.test10.pojo.Entity;
 4 
 5 /**
 6  * @author qf
 7  * @create 2018-09-18 15:52
 8  * 生产者类
 9  */
10 public class Producer {
11     private String lock;
12 
13     public Producer(String lock) {
14         this.lock = lock;
15     }
16 
17     public void setValue(){
18         try {
19             synchronized (lock){
20                 while (!Entity.value.equals("")){
21                     System.out.println("生产者 "+Thread.currentThread().getName()+" WAITING了★");
22                     lock.wait();
23                 }
24                 System.out.println("生产者 "+Thread.currentThread().getName()+" RUNNABLE了");
25                 String value = System.currentTimeMillis()+"_"+System.nanoTime();
26                 //System.out.println("set的值是"+value);
27                 Entity.value = value;
28                 lock.notify();
29             }
30         } catch (InterruptedException e) {
31             e.printStackTrace();
32         }
33     }
34 }
 1 package com.qf.test10;
 2 
 3 import com.qf.test10.pojo.Entity;
 4 
 5 /**
 6  * @author qf
 7  * @create 2018-09-18 15:52
 8  * 消费者类
 9  */
10 public class Consumer {
11     private String lock;
12 
13     public Consumer(String lock) {
14         this.lock = lock;
15     }
16 
17     public void getValue(){
18         try {
19             synchronized (lock){
20                 if(Entity.value.equals("")){
21                     System.out.println("消费者 "+Thread.currentThread().getName()+" WAITING了☆");
22                     lock.wait();
23                 }
24                 System.out.println("消费者 "+Thread.currentThread().getName()+" RUNNABLE了");
25                 //System.out.println("get的值"+Entity.value);
26                 Entity.value = "";
27                 lock.notify();
28             }
29         } catch (InterruptedException e) {
30             e.printStackTrace();
31         }
32     }
33 }
 1 package com.qf.test10;
 2 
 3 import com.qf.test10.thread.ThreadC;
 4 import com.qf.test10.thread.ThreadP;
 5 
 6 /**
 7  * @author qf
 8  * @create 2018-09-18 16:12
 9  */
10 public class Run {
11     public static void main(String[] args) throws InterruptedException {
12         String lock = new String("");
13         Producer p = new Producer(lock);
14         Consumer c = new Consumer(lock);
15         /*ThreadP tp = new ThreadP(p);
16         tp.start();
17         ThreadC tc = new ThreadC(c);
18         tc.start();*/
19         ThreadP[] threadPS = new ThreadP[2];
20         ThreadC[] threadCS = new ThreadC[2];
21         for (int i = 0; i < 2; i++) {
22             threadPS[i] = new ThreadP(p);
23             threadPS[i].setName("生产者"+(i+1));
24             threadPS[i].start();
25             threadCS[i] = new ThreadC(c);
26             threadCS[i].setName("消费者"+(i+1));
27             threadCS[i].start();
28         }
29 
30         Thread.sleep(5000);
31         Thread[] threads = new Thread[Thread.currentThread().getThreadGroup().activeCount()];
32         Thread.currentThread().getThreadGroup().enumerate(threads);
33         for (int i = 0; i < threads.length; i++) {
34             System.out.println(threads[i].getName()+" "+threads[i].getState());
35         }
36     }
37 }

打印结果

生产者 生产者1 RUNNABLE了
生产者 生产者1 WAITING了★
生产者 生产者2 WAITING了★
消费者 消费者1 RUNNABLE了
消费者 消费者1 WAITING了☆
生产者 生产者1 RUNNABLE了
生产者 生产者1 WAITING了★
生产者 生产者2 WAITING了★
消费者 消费者2 RUNNABLE了
消费者 消费者2 WAITING了☆
消费者 消费者1 RUNNABLE了
消费者 消费者1 WAITING了☆
生产者 生产者1 RUNNABLE了
生产者 生产者1 WAITING了★
生产者 生产者2 WAITING了★
main RUNNABLE
Monitor Ctrl-Break RUNNABLE
生产者1 WAITING
消费者1 WAITING
生产者2 WAITING
消费者2 WAITING

主要原因是因为notify可能唤醒的是同类(生产者唤醒生产者,消费者唤醒消费者)。最终导致所有线程都处于WAITING状态,程序进而呈现假死状态

只要将Producer和Consumer中的notify修改为notifyAll即可,这样就不至于出现假死状态

一个生产者和一个消费者:操作栈

 1 package com.qf.test11.pojo;
 2 
 3 import java.util.ArrayList;
 4 import java.util.List;
 5 
 6 /**
 7  * @author qf
 8  * @create 2018-09-18 17:14
 9  */
10 public class MyStack {
11     private List list = new ArrayList();
12     synchronized public void push(){
13         try {
14             if (list.size() == 1){
15                 this.wait();
16             }
17             list.add("test"+Math.random());
18             this.notify();
19             System.out.println("push = "+list.size());
20         } catch (InterruptedException e) {
21             e.printStackTrace();
22         }
23     }
24     public synchronized void pop(){
25         try {
26             if(list.size() == 0){
27                 //System.out.println("pop操作: "+Thread.currentThread().getName()+"线程wait状态");
28                 this.wait();
29             }
30             System.out.println("pop操作: "+Thread.currentThread().getName()+"线程,获取值="+list.get(0));
31             list.remove(0);
32             this.notify();
33             System.out.println("pop = "+list.size());
34         } catch (InterruptedException e) {
35             e.printStackTrace();
36         }
37     }
38 }

生产者/消费者

 1 package com.qf.test11;
 2 
 3 import com.qf.test11.pojo.MyStack;
 4 
 5 /**
 6  * @author qf
 7  * @create 2018-09-18 17:13
 8  * 生产者
 9  */
10 public class Producer {
11     private MyStack myStack;
12 
13     public Producer(MyStack myStack) {
14         this.myStack = myStack;
15     }
16 
17     public void pushService(){
18         myStack.push();
19     }
20 }
 1 package com.qf.test11;
 2 
 3 import com.qf.test11.pojo.MyStack;
 4 
 5 /**
 6  * @author qf
 7  * @create 2018-09-18 17:14
 8  */
 9 public class Consumer {
10     private MyStack myStack;
11 
12     public Consumer(MyStack myStack) {
13         this.myStack = myStack;
14     }
15     public void popService(){
16         myStack.pop();
17     }
18 }

线程类

 1 package com.qf.test11.thread;
 2 
 3 import com.qf.test11.Producer;
 4 
 5 /**
 6  * @author qf
 7  * @create 2018-09-18 17:13
 8  */
 9 public class ThreadP extends Thread {
10     private Producer producer;
11 
12     public ThreadP(Producer producer) {
13         this.producer = producer;
14     }
15 
16     @Override
17     public void run() {
18         while (true){
19             producer.pushService();
20         }
21     }
22 }
 1 package com.qf.test11.thread;
 2 
 3 import com.qf.test11.Consumer;
 4 
 5 /**
 6  * @author qf
 7  * @create 2018-09-18 17:14
 8  */
 9 public class ThreadC extends Thread {
10     private Consumer consumer;
11 
12     public ThreadC(Consumer consumer) {
13         this.consumer = consumer;
14     }
15 
16     @Override
17     public void run() {
18         while (true){
19             consumer.popService();
20         }
21     }
22 }

测试运行

 1 package com.qf.test11;
 2 
 3 import com.qf.test11.pojo.MyStack;
 4 import com.qf.test11.thread.ThreadC;
 5 import com.qf.test11.thread.ThreadP;
 6 
 7 /**
 8  * @author qf
 9  * @create 2018-09-18 17:34
10  */
11 public class Run {
12     public static void main(String[] args) {
13         MyStack myStack = new MyStack();
14         Producer p = new Producer(myStack);
15         Consumer c = new Consumer(myStack);
16         ThreadP tp = new ThreadP(p);
17         ThreadC tc = new ThreadC(c);
18         tp.setName("tp");
19         tc.setName("tc");
20         tp.start();
21         tc.start();
22     }
23 }

打印结果

push = 1
pop操作: tc线程,获取值=test0.8957260024057878
pop = 0
push = 1
pop操作: tc线程,获取值=test0.9236606274738514
pop = 0
push = 1
pop操作: tc线程,获取值=test0.7661156573296891
pop = 0
push = 1
pop操作: tc线程,获取值=test0.6523634151650343
pop = 0
push = 1
pop操作: tc线程,获取值=test0.08728918553111287
pop = 0
push = 1
pop操作: tc线程,获取值=test0.472483808512989
pop = 0
push = 1
pop操作: tc线程,获取值=test0.17456918848050884
pop = 0
push = 1
pop操作: tc线程,获取值=test0.1785536700399648
pop = 0
............

 

以上是关于多线程生产者/消费者模式实现的主要内容,如果未能解决你的问题,请参考以下文章

多线程:生产者/消费者模式

Java的多线程实现生产/消费模式

多线程之生产者消费者模式

java 多线程并发系列之 生产者消费者模式的两种实现

多线程生产者/消费者模式实现

用Python多线程实现生产者消费者模式爬取斗图网的表情图片