线程通信之生产者消费者模型

Posted 奔跑在梦想的道路上

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了线程通信之生产者消费者模型相关的知识,希望对你有一定的参考价值。

  线程通信,是指线程之间的消息传递。

  多个线程在操作同一个资源时,它们对共享资源的操作动作可能不同;它们共享同一个资源,互为条件,相互依赖,相互通信,从而让任务向前推进。

  另外,在线程的同步策略中,虽然可以解决并发更新同一个资源,保障资源的安全,但不能用来实现线程间的消息传递。因此,线程通信与线程同步往往会融合使用。

  生产者消费者模型堪称是线程通信中的一个典型案例,我们接下来通过生产者消费者模式来进一步认识线程通信。在此,我们先对若干概念进行了解。  

  生产者:没有生产之前通知消费者等待,生产产品结束之后,马上通知消费者消费

  消费者:没有消费之前通知生产者等待,消费产品结束之后,通知生产者继续生产产品以供消费

  线程通信:使用java中超类Object中提供的一些方法:

1 public final void wait();  //注:long timeout=0  表示线程一直等待,直到其它线程通知
2 public final native void wait(long timeout);   //线程等待指定毫秒参数的时间,超过该时间则不再等待
3 public final void wait(long timeout, int nanos);  //线程等待指定毫秒、微妙的时间,timeout最大等待时间,以毫秒为单位,nanos额外的时间,在纳秒范围0-9999994 public final native void notify();   //唤醒一个处于等待状态的线程
5 public final native void notifyAll();  //唤醒同一个对象上所有调用wait()方法的线程,优先级别高的线程优先运行

  需要注意的是,上述方法只能在同步方法或者同步代码块中使用,否则会抛出异常。

  接下来,我们以生产A-D个产品,放入仓库,待消费者消费后,生产者再进行生产为例,看下生产者消费者模式的运行流程。

  

  1 /**
  2  * 1.共享资源缓存和操作类
  3  */
  4 public class SharedCache {
  5     //产品,此处使用char字符,作为存储共享数据的数据类型
  6     private char cache;
  7     //产品消费标识,是线程间通信的信号,为true表示未消费(生产),false表示未生产(消费)
  8     private boolean flag=false;
  9     /*
 10     生产操作(生产者):向仓库中添加共享数据
 11      */
 12     public synchronized void addSharedCacheData(char data){
 13         //产品未消费,则生产者的生产操作等待
 14         if(flag){
 15             System.out.println("产品未消费,生产者的生产操作等待");
 16             try {
 17                 //生产者等待
 18                 wait();
 19             } catch (InterruptedException e) {
 20                 System.out.println("Thread interrupted Exception:"+e.getMessage());
 21             }
 22         }
 23         //产品已消费,则生产者继续生产
 24         this.cache=data;
 25         //标记已生产
 26         flag=true;
 27         //通知消费者已生产
 28         notify();
 29         System.out.println("生产者--->产品:"+data+"已生产,等待消费者消费");
 30     }
 31     /*
 32     消费操作(消费者):向仓库中获取共享数据
 33      */
 34     public synchronized char getSharedCacheData(){
 35         //如果产品未生产,则消费者等待
 36         if(!flag){
 37             System.out.println("产品未生产,消费者的消费操作等待");
 38             try {
 39                 wait();
 40             } catch (InterruptedException e) {
 41                 System.out.println("Thread interrupted Exception:"+e.getMessage());
 42             }
 43         }
 44         //标记已消费
 45         flag=false;
 46         //通知生产者已消费
 47         notify();
 48         System.out.println("消费者--->产品:"+this.cache+"已消费,通知生产者生产");
 49         return this.cache;
 50     }
 51 }
 52 /**
 53  * 2.生产者线程类
 54  */
 55 public class Producer extends Thread{
 56     //共享缓存资源类的对象
 57     private SharedCache cache;
 58     //构造器,传入共享资源类的对象
 59     public Producer(SharedCache cache){
 60         this.cache=cache;
 61     }
 62     /*
 63     生产者生产产品,放入共享资源缓存类(相当于将生产的产品放入仓库里)
 64     生产A-D类型的产品
 65      */
 66     @Override
 67     public void run() {
 68         for(char product=A;product<=D;product++){
 69             try {
 70                 sleep((int)(Math.random()*3000));
 71             } catch (InterruptedException e) {
 72                 System.out.println("Thread interrupted Exception:"+e.getMessage());
 73             }
 74             //生产产品,放入共享缓存数据类的对象里(相当于把生产的产品放到仓库里)
 75             cache.addSharedCacheData(product);
 76         }
 77     }
 78 }
 79 /**
 80  * 3.消费者线程类
 81  */
 82 public class Consumer extends Thread{
 83     //共享缓存资源类的对象
 84     private SharedCache cache;
 85     //构造器,传入共享资源类的对象
 86     public Consumer(SharedCache cache){
 87         this.cache=cache;
 88     }
 89     /*
 90     消费者消费产品,获取共享缓存类的对象里的数据(相当于从仓库里提取产品)
 91     当消费到D类型的产品时即停止消费
 92      */
 93     @Override
 94     public void run() {
 95         char product=a;
 96         do{
 97             try {
 98                 Thread.sleep((int)(Math.random()*3000));
 99             } catch (InterruptedException e) {
100                 System.out.println("Thread interrupted Exception:"+e.getMessage());
101             }
102             //消费,从仓库取走商品
103             product=cache.getSharedCacheData();
104         }while (product!=D);
105     }
106 }
107 /**
108  * 4.线程通信测试类
109  */
110 public class Test {
111     public static void main(String[] args) {
112         //生产者与消费者共享同一个资源
113         SharedCache cache = new SharedCache();
114         //启动消费者线程
115         new Consumer(cache).start();
116         //启动生产者线程
117         new Producer(cache).start();
118     }
119 }

  运行上述的测试类后,执行结果如下:

产品未生产,消费者的消费操作等待
生产者--->产品:A已生产,等待消费者消费
消费者--->产品:A已消费,通知生产者生产
生产者--->产品:B已生产,等待消费者消费
消费者--->产品:B已消费,通知生产者生产
生产者--->产品:C已生产,等待消费者消费
产品未消费,生产者的生产操作等待
消费者--->产品:C已消费,通知生产者生产
生产者--->产品:D已生产,等待消费者消费
消费者--->产品:D已消费,通知生产者生产

  我们在上面完成的生产者消费者模型,在处理线程同步问题时,主要是用了synchronized同步方法,JDK 1.5提供了多线程升级方案,将同步synchronized替换成了显示的Lock操作,可以实现唤醒、冻结指定的线程。

  接口Lock 实现提供了比使用 synchronized 方法和语句可获得的更广泛的锁定操作。Lock 可以支持多个相关的 Condition 对象,从而在使用中更加灵活。

  接口Condition可以替代传统的线程间通信,用await()替换wait(),用signal()替换notify(),用signalAll()替换notifyAll()。该对象可以通过Lock锁进行获取。可以说,传统线程的通信方式,Condition都可以实现。

  需要注意的是,Condition是被绑定到Lock上的,要创建一个Lock的Condition必须用newCondition()方法。  

  Java.util.concurrent.lock 中的Lock 框架是锁定的一个抽象,它允许把锁定的实现作为 Java 类,从而为Lock 的多种实现留下了空间,各种实现可能有不同的调度算法、性能特性或者锁定语义。

  其中,ReentrantLock 类实现了Lock ,它拥有与synchronized 相同的并发性和内存语义,还添加了类似锁投票、定时锁等候和可中断锁等候的一些特性。此外,它还提供了在激烈争用情况下更佳的性能。

  我们接下来通过ReentrantLock 类和Condition接口的实现类来完成一个生产者消费者模型。为此,我们需要创建一个ReentrantLock类的多态对象,即建立一把锁,然后将这把锁与两个Condition对象关联。我们接下来就用Lock与Condition实现一个生产者消费者模型,实现与上述例子相似的效果,代码具体如下:

  1 import java.util.concurrent.locks.Condition;
  2 import java.util.concurrent.locks.Lock;
  3 import java.util.concurrent.locks.ReentrantLock;
  4 /**
  5  * 共享的资源
  6  */
  7 public class Resource {
  8     private char product;
  9 //    private int count = 1;
 10     //产品消费标识,是线程间通信的信号,为true表示未消费(生产),false表示未生产(消费)
 11     private boolean flag = false;
 12     //定义一个实现Lock接口的ReentrantLock类对象
 13     private Lock lock = new ReentrantLock();
 14     /*
 15     Condition是被绑定到Lock上的,
 16     要创建一个Lock的Condition,
 17     必须用Lock对象的newCondition()方法
 18      */
 19     private Condition cond_pro = lock.newCondition();
 20     //一个lock可以有多个相关的condition
 21     private Condition cond_con = lock.newCondition();
 22     /*
 23         定义生产方法
 24      */
 25     public void produce(char product) throws InterruptedException {
 26         lock.lock();//手动加同步锁
 27         try {
 28             while (flag) {//此时若生产完一个以后唤醒了另一个生产者,则再次判断,避免两个生产者同时生产
 29                 System.out.println("产品未消费,生产者的生产操作等待");
 30                 cond_pro.await();
 31             }
 32             this.product = product;
 33             //标记已生产
 34             flag = true;
 35             //通知消费者已生产
 36             cond_con.signal();//唤醒消费方法,利用了condition的signal()指定唤醒对象
 37             System.out.println("生产者"+Thread.currentThread().getName()+"--->产品:"+product+"已生产,等待消费者消费");
 38         } finally {
 39             lock.unlock();//释放锁
 40         }
 41     }
 42     /*
 43         定义消费方法
 44      */
 45     public void consume() throws InterruptedException {
 46         lock.lock();
 47         try {
 48             while (!flag) {
 49                 System.out.println("产品未生产,消费者的消费操作等待");
 50                 cond_con.await();
 51             }
 52             //标记已消费
 53             flag = false;
 54             //通知生产者已消费
 55             cond_pro.signal();
 56             System.out.println("消费者"+Thread.currentThread().getName()+"--->产品:"+this.product+"已消费,通知生产者生产");
 57         } finally {
 58             lock.unlock();
 59         }
 60     }
 61 }
 62 /**
 63  * 生产者
 64  */
 65 public class Producer implements Runnable{
 66     private Resource res;
 67     public Producer(Resource res){
 68         this.res=res;
 69     }
 70     @Override
 71     public void run() {
 72         char product=A;
 73         while(product<E){
 74             try {
 75                 res.produce(product);
 76             } catch (InterruptedException e) {
 77                 e.printStackTrace();
 78             }
 79             product++;
 80         }
 81     }
 82 }
 83 /**
 84  * 消费者
 85  */
 86 public class Consumer implements Runnable{
 87     private Resource res;
 88     public Consumer(Resource res){
 89         this.res=res;
 90     }
 91     @Override
 92     public void run() {
 93         char product=A;
 94         while(product<E){
 95             try {
 96                 res.consume();
 97             } catch (InterruptedException e) {
 98                 e.printStackTrace();
 99             }
100             product++;
101         }
102     }
103 }
104 /**
105  * 用ReentrantLock和Condition实现生产者消费者模型
106  */
107 public class Test {
108     //入口方法
109     public static void main(String[] args) {
110         Resource res = new Resource();//生产者与消费者共享的资源
111         Producer producer = new Producer(res);//生产者
112         Consumer consumer = new Consumer(res);//消费者
113         //生产者线程与消费者线程各创建两个
114         Thread p1 = new Thread(producer);
115         Thread p2 = new Thread(producer);
116         Thread c1 = new Thread(consumer);
117         Thread c2 = new Thread(consumer);
118         p1.start();
119         p2.start();
120         c1.start();
121         c2.start();
122     }
123 }

  上述代码执行结果如下:

生产者Thread-0--->产品:A已生产,等待消费者消费
产品未消费,生产者的生产操作等待
消费者Thread-2--->产品:A已消费,通知生产者生产
产品未生产,消费者的消费操作等待
生产者Thread-1--->产品:A已生产,等待消费者消费
产品未消费,生产者的生产操作等待
消费者Thread-2--->产品:A已消费,通知生产者生产
产品未生产,消费者的消费操作等待
生产者Thread-0--->产品:B已生产,等待消费者消费
产品未消费,生产者的生产操作等待
消费者Thread-3--->产品:B已消费,通知生产者生产
产品未生产,消费者的消费操作等待
生产者Thread-1--->产品:B已生产,等待消费者消费
产品未消费,生产者的生产操作等待
消费者Thread-2--->产品:B已消费,通知生产者生产
产品未生产,消费者的消费操作等待
生产者Thread-0--->产品:C已生产,等待消费者消费
产品未消费,生产者的生产操作等待
消费者Thread-3--->产品:C已消费,通知生产者生产
产品未生产,消费者的消费操作等待
生产者Thread-1--->产品:C已生产,等待消费者消费
产品未消费,生产者的生产操作等待
消费者Thread-2--->产品:C已消费,通知生产者生产
生产者Thread-0--->产品:D已生产,等待消费者消费
消费者Thread-3--->产品:D已消费,通知生产者生产
产品未生产,消费者的消费操作等待
生产者Thread-1--->产品:D已生产,等待消费者消费
消费者Thread-3--->产品:D已消费,通知生产者生产  

 

以上是关于线程通信之生产者消费者模型的主要内容,如果未能解决你的问题,请参考以下文章

RTX线程通信之——信号量

RTX线程通信之——信号量

RTX线程通信之——信号量

41. 线程间同步通信-生产者消费者模型

41. 线程间同步通信-生产者消费者模型

41. 线程间同步通信-生产者消费者模型