多线程生产者/消费者模式实现
Posted qf123
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了多线程生产者/消费者模式实现相关的知识,希望对你有一定的参考价值。
参考书籍《java多线程编程核心技术》
都是基于wait/notify实现的
一个生产者和一个消费者:操作值
1 package com.qf.test10.pojo; 2 3 /** 4 * @author qf 5 * @create 2018-09-18 15:59 6 */ 7 public class Entity { 8 public static String value = ""; 9 }
1 package com.qf.test10; 2 3 import com.qf.test10.pojo.Entity; 4 5 /** 6 * @author qf 7 * @create 2018-09-18 15:52 8 * 生产者类 9 */ 10 public class Producer { 11 private String lock; 12 13 public Producer(String lock) { 14 this.lock = lock; 15 } 16 17 public void setValue(){ 18 try { 19 synchronized (lock){ 20 if(!Entity.value.equals("")){ 21 lock.wait(); 22 } 23 String value = System.currentTimeMillis()+"_"+System.nanoTime(); 24 System.out.println("set的值是"+value); 25 Entity.value = value; 26 lock.notify(); 27 } 28 } catch (InterruptedException e) { 29 e.printStackTrace(); 30 } 31 } 32 }
1 package com.qf.test10; 2 3 import com.qf.test10.pojo.Entity; 4 5 /** 6 * @author qf 7 * @create 2018-09-18 15:52 8 * 消费者类 9 */ 10 public class Consumer { 11 private String lock; 12 13 public Consumer(String lock) { 14 this.lock = lock; 15 } 16 17 public void getValue(){ 18 try { 19 synchronized (lock){ 20 if(Entity.value.equals("")){ 21 lock.wait(); 22 } 23 System.out.println("get的值"+Entity.value); 24 Entity.value = ""; 25 lock.notify(); 26 } 27 } catch (InterruptedException e) { 28 e.printStackTrace(); 29 } 30 } 31 }
线程类
1 package com.qf.test10.thread; 2 3 import com.qf.test10.Producer; 4 5 /** 6 * @author qf 7 * @create 2018-09-18 16:08 8 */ 9 public class ThreadP extends Thread { 10 private Producer producer; 11 12 public ThreadP(Producer producer) { 13 this.producer = producer; 14 } 15 16 @Override 17 public void run() { 18 while(true) { 19 producer.setValue(); 20 } 21 } 22 }
1 package com.qf.test10.thread; 2 3 import com.qf.test10.Consumer; 4 5 /** 6 * @author qf 7 * @create 2018-09-18 16:11 8 */ 9 public class ThreadC extends Thread { 10 private Consumer consumer; 11 12 public ThreadC(Consumer consumer) { 13 this.consumer = consumer; 14 } 15 16 @Override 17 public void run() { 18 while (true) { 19 consumer.getValue(); 20 } 21 } 22 }
测试运行
1 package com.qf.test10; 2 3 import com.qf.test10.thread.ThreadC; 4 import com.qf.test10.thread.ThreadP; 5 6 /** 7 * @author qf 8 * @create 2018-09-18 16:12 9 */ 10 public class Run { 11 public static void main(String[] args) { 12 String lock = new String(""); 13 Producer p = new Producer(lock); 14 Consumer c = new Consumer(lock); 15 ThreadP tp = new ThreadP(p); 16 tp.start(); 17 ThreadC tc = new ThreadC(c); 18 tc.start(); 19 } 20 }
打印输出
set的值是1537259244097_800479975994656
get的值1537259244097_800479975994656
set的值是1537259244097_800479976020503
get的值1537259244097_800479976020503
set的值是1537259244097_800479976042246
get的值1537259244097_800479976042246
set的值是1537259244097_800479976062349
get的值1537259244097_800479976062349
set的值是1537259244097_800479976083272
get的值1537259244097_800479976083272
set的值是1537259244097_800479976103785
get的值1537259244097_800479976103785
set的值是1537259244097_800479976124298
get的值1537259244097_800479976124298
set的值是1537259244097_800479976144400
get的值1537259244097_800479976144400
.............
如果以此为基础,设计多个生产者和多个消费者,那么运行过程中很可能会发生假死的情况,也就是所有线程都呈现等待的状态
多个生产者与多个消费者:操作值
修改Producer.java,Consumer.java以及测试类
1 package com.qf.test10; 2 3 import com.qf.test10.pojo.Entity; 4 5 /** 6 * @author qf 7 * @create 2018-09-18 15:52 8 * 生产者类 9 */ 10 public class Producer { 11 private String lock; 12 13 public Producer(String lock) { 14 this.lock = lock; 15 } 16 17 public void setValue(){ 18 try { 19 synchronized (lock){ 20 while (!Entity.value.equals("")){ 21 System.out.println("生产者 "+Thread.currentThread().getName()+" WAITING了★"); 22 lock.wait(); 23 } 24 System.out.println("生产者 "+Thread.currentThread().getName()+" RUNNABLE了"); 25 String value = System.currentTimeMillis()+"_"+System.nanoTime(); 26 //System.out.println("set的值是"+value); 27 Entity.value = value; 28 lock.notify(); 29 } 30 } catch (InterruptedException e) { 31 e.printStackTrace(); 32 } 33 } 34 }
1 package com.qf.test10; 2 3 import com.qf.test10.pojo.Entity; 4 5 /** 6 * @author qf 7 * @create 2018-09-18 15:52 8 * 消费者类 9 */ 10 public class Consumer { 11 private String lock; 12 13 public Consumer(String lock) { 14 this.lock = lock; 15 } 16 17 public void getValue(){ 18 try { 19 synchronized (lock){ 20 if(Entity.value.equals("")){ 21 System.out.println("消费者 "+Thread.currentThread().getName()+" WAITING了☆"); 22 lock.wait(); 23 } 24 System.out.println("消费者 "+Thread.currentThread().getName()+" RUNNABLE了"); 25 //System.out.println("get的值"+Entity.value); 26 Entity.value = ""; 27 lock.notify(); 28 } 29 } catch (InterruptedException e) { 30 e.printStackTrace(); 31 } 32 } 33 }
1 package com.qf.test10; 2 3 import com.qf.test10.thread.ThreadC; 4 import com.qf.test10.thread.ThreadP; 5 6 /** 7 * @author qf 8 * @create 2018-09-18 16:12 9 */ 10 public class Run { 11 public static void main(String[] args) throws InterruptedException { 12 String lock = new String(""); 13 Producer p = new Producer(lock); 14 Consumer c = new Consumer(lock); 15 /*ThreadP tp = new ThreadP(p); 16 tp.start(); 17 ThreadC tc = new ThreadC(c); 18 tc.start();*/ 19 ThreadP[] threadPS = new ThreadP[2]; 20 ThreadC[] threadCS = new ThreadC[2]; 21 for (int i = 0; i < 2; i++) { 22 threadPS[i] = new ThreadP(p); 23 threadPS[i].setName("生产者"+(i+1)); 24 threadPS[i].start(); 25 threadCS[i] = new ThreadC(c); 26 threadCS[i].setName("消费者"+(i+1)); 27 threadCS[i].start(); 28 } 29 30 Thread.sleep(5000); 31 Thread[] threads = new Thread[Thread.currentThread().getThreadGroup().activeCount()]; 32 Thread.currentThread().getThreadGroup().enumerate(threads); 33 for (int i = 0; i < threads.length; i++) { 34 System.out.println(threads[i].getName()+" "+threads[i].getState()); 35 } 36 } 37 }
打印结果
生产者 生产者1 RUNNABLE了 生产者 生产者1 WAITING了★ 生产者 生产者2 WAITING了★ 消费者 消费者1 RUNNABLE了 消费者 消费者1 WAITING了☆ 生产者 生产者1 RUNNABLE了 生产者 生产者1 WAITING了★ 生产者 生产者2 WAITING了★ 消费者 消费者2 RUNNABLE了 消费者 消费者2 WAITING了☆ 消费者 消费者1 RUNNABLE了 消费者 消费者1 WAITING了☆ 生产者 生产者1 RUNNABLE了 生产者 生产者1 WAITING了★ 生产者 生产者2 WAITING了★ main RUNNABLE Monitor Ctrl-Break RUNNABLE 生产者1 WAITING 消费者1 WAITING 生产者2 WAITING 消费者2 WAITING
主要原因是因为notify可能唤醒的是同类(生产者唤醒生产者,消费者唤醒消费者)。最终导致所有线程都处于WAITING状态,程序进而呈现假死状态
只要将Producer和Consumer中的notify修改为notifyAll即可,这样就不至于出现假死状态
一个生产者和一个消费者:操作栈
1 package com.qf.test11.pojo; 2 3 import java.util.ArrayList; 4 import java.util.List; 5 6 /** 7 * @author qf 8 * @create 2018-09-18 17:14 9 */ 10 public class MyStack { 11 private List list = new ArrayList(); 12 synchronized public void push(){ 13 try { 14 if (list.size() == 1){ 15 this.wait(); 16 } 17 list.add("test"+Math.random()); 18 this.notify(); 19 System.out.println("push = "+list.size()); 20 } catch (InterruptedException e) { 21 e.printStackTrace(); 22 } 23 } 24 public synchronized void pop(){ 25 try { 26 if(list.size() == 0){ 27 //System.out.println("pop操作: "+Thread.currentThread().getName()+"线程wait状态"); 28 this.wait(); 29 } 30 System.out.println("pop操作: "+Thread.currentThread().getName()+"线程,获取值="+list.get(0)); 31 list.remove(0); 32 this.notify(); 33 System.out.println("pop = "+list.size()); 34 } catch (InterruptedException e) { 35 e.printStackTrace(); 36 } 37 } 38 }
生产者/消费者
1 package com.qf.test11; 2 3 import com.qf.test11.pojo.MyStack; 4 5 /** 6 * @author qf 7 * @create 2018-09-18 17:13 8 * 生产者 9 */ 10 public class Producer { 11 private MyStack myStack; 12 13 public Producer(MyStack myStack) { 14 this.myStack = myStack; 15 } 16 17 public void pushService(){ 18 myStack.push(); 19 } 20 }
1 package com.qf.test11; 2 3 import com.qf.test11.pojo.MyStack; 4 5 /** 6 * @author qf 7 * @create 2018-09-18 17:14 8 */ 9 public class Consumer { 10 private MyStack myStack; 11 12 public Consumer(MyStack myStack) { 13 this.myStack = myStack; 14 } 15 public void popService(){ 16 myStack.pop(); 17 } 18 }
线程类
1 package com.qf.test11.thread; 2 3 import com.qf.test11.Producer; 4 5 /** 6 * @author qf 7 * @create 2018-09-18 17:13 8 */ 9 public class ThreadP extends Thread { 10 private Producer producer; 11 12 public ThreadP(Producer producer) { 13 this.producer = producer; 14 } 15 16 @Override 17 public void run() { 18 while (true){ 19 producer.pushService(); 20 } 21 } 22 }
1 package com.qf.test11.thread; 2 3 import com.qf.test11.Consumer; 4 5 /** 6 * @author qf 7 * @create 2018-09-18 17:14 8 */ 9 public class ThreadC extends Thread { 10 private Consumer consumer; 11 12 public ThreadC(Consumer consumer) { 13 this.consumer = consumer; 14 } 15 16 @Override 17 public void run() { 18 while (true){ 19 consumer.popService(); 20 } 21 } 22 }
测试运行
1 package com.qf.test11; 2 3 import com.qf.test11.pojo.MyStack; 4 import com.qf.test11.thread.ThreadC; 5 import com.qf.test11.thread.ThreadP; 6 7 /** 8 * @author qf 9 * @create 2018-09-18 17:34 10 */ 11 public class Run { 12 public static void main(String[] args) { 13 MyStack myStack = new MyStack(); 14 Producer p = new Producer(myStack); 15 Consumer c = new Consumer(myStack); 16 ThreadP tp = new ThreadP(p); 17 ThreadC tc = new ThreadC(c); 18 tp.setName("tp"); 19 tc.setName("tc"); 20 tp.start(); 21 tc.start(); 22 } 23 }
打印结果
push = 1 pop操作: tc线程,获取值=test0.8957260024057878 pop = 0 push = 1 pop操作: tc线程,获取值=test0.9236606274738514 pop = 0 push = 1 pop操作: tc线程,获取值=test0.7661156573296891 pop = 0 push = 1 pop操作: tc线程,获取值=test0.6523634151650343 pop = 0 push = 1 pop操作: tc线程,获取值=test0.08728918553111287 pop = 0 push = 1 pop操作: tc线程,获取值=test0.472483808512989 pop = 0 push = 1 pop操作: tc线程,获取值=test0.17456918848050884 pop = 0 push = 1 pop操作: tc线程,获取值=test0.1785536700399648 pop = 0 ............
以上是关于多线程生产者/消费者模式实现的主要内容,如果未能解决你的问题,请参考以下文章