java生产者与消费者模式

Posted Yrion

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了java生产者与消费者模式相关的知识,希望对你有一定的参考价值。

前言: 生产者和消费者模式是我们在学习多线程中很经典的一个模式,它主要分为生产者和消费者,分别是两个线程,

目录

一:生产者和消费者模式简介

二:生产者和消费者模式的实现

声明:本例来源于java经典著作:《Think in java》,接下来将会采用本例子将会借鉴其中的案例进行分析

首先我们来设想有一个这样的场景:一个饭店里有一个做饭的厨师和来吃饭的人,服务员负责端食物,这里就可以把厨师当做生产者,(这里暂且把服务员当做消费者),而食物则有这样的过程,被厨师生产出来,然后被服务员消费。当食物存在的时候,厨师等待,不再进行生产,服务员进行消费。当食物为空的时候,厨师开始生产食物,服务员等待。这中间就存在着一个线程之间协作的过程。我们来通过代码进行模拟:

 

首先是新建两个线程,一个厨师线程,一个服务员线程,双方进行协作:

以下是厨师线程:

public class Chef  implements  Runnable{ //product

    private  Restaurant restaurant;

    private  int count=0;

    public Chef(Restaurant restaurant) {
        this.restaurant = restaurant;
    }

    @Override
    public void run() {

        try {
            while (!Thread.interrupted()) {

                synchronized (this) {

                    while (restaurant.meal != null) {

                        wait();
                    }
                    if (++count==10){

                        System.out.println("out of food,closing");
                        restaurant.exec.shutdownNow();
                    }
                    System.out.println("Chef product meal"+count);
                    synchronized (restaurant.waitPerson){
                       restaurant.meal= new Meal(count);
                       restaurant.waitPerson.notifyAll();
                    }
                    TimeUnit.MICROSECONDS.sleep(100);
                }

            }

        }catch (InterruptedException e){
            System.out.println("Chef interruped");

        }

    }
}

 

以下是服务员线程:

public class WaitPerson implements  Runnable{ 

    private  Restaurant restaurant;

    public WaitPerson(Restaurant restaurant) {
        this.restaurant = restaurant;
    }


    @Override
    public void run() {

        try{
            while (!Thread.interrupted()) {
                synchronized (this) {
                    while (restaurant.meal == null) {
                        wait();
                    }
                    System.out.println("waitPerson got:"+restaurant.meal);
                }
                synchronized (restaurant.chef){
                    restaurant.meal=null;
                    restaurant.chef.notifyAll();
                }
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
            System.out.println("waitPerson interrupdate");
        }

    }
}

 我们来分析一下厨师线程,主要看它的run方法,里面包含着一个try、catch块,首先它会一直捕获线程的状态,当它不处于interrupted(线程中断,此时无法运转)异常时,往下走。然后取得当前对象的锁,把它上锁,通过一个while循环,取得餐馆里面的meal,判断其是否为null.当餐馆里面的餐还有剩余的时候,此时生产者不需要工作,它处于wait状态,注意wait和Thread.sleep的区别,sleep方法运行的时候线程是不会丢弃锁的,而wait方法会释放锁,所以此时,被锁住的对象运行到wait方法,它已经释放了锁,因此消费者可以获取到锁。

再接着count会进行累加1。再接着锁住服务员线程,此时服务员线程获取锁的主动权,它通过notifyAll方法唤醒所有的在锁上等待线程,注意此处为什么是notifyAll而不是notify,这主要是因为此刻厨师线程并不知道等待的线程究竟是几条,为了线程安全起见,唤醒所有的等待线程。注意的是:notifyAll会唤醒所有线程,但是运行的只是其中一个,关于这个线程的筛选,是完全随机的。

唤醒了消费者线程,它就会运行到消费者线程,然后我们来看一下它的run方法,和之前的厨师线程差不多,依然是锁住当前的消费者线程,不过线程此时的条件变成了meal为null。接着再获取厨师的锁,使其meal为null,再唤醒厨师线程,接着程序就会运转到厨师线程,执行厨师的run方法。

再来看看Restaurant类:

public class Restaurant{

  Meal meal;

  ExecutorService exec =Executors.newCachedThreadPool();
  
  WaitPerson waitPerson=new WaitPerson(this);
 
  Chef chef=new Chef(this);
  
  public Restaurant(){

  exec.execute(chef);
  
  exec.execute(waitPerson);

}


public static void main(){

  new Restaurant();

} }

这里创建了一个缓存线程池,然后在Restaurant的构造方法中开始执行线程命令,首先会从Chef开始生产,紧接着服务员消费,通过wait/notify进行线程之间的同步:

程序执行的结果:

Chef product meal1
waitPerson got:Meal1
Chef product meal2
waitPerson got:Meal2
Chef product meal3
waitPerson got:Meal3
Chef product meal4
waitPerson got:Meal4
Chef product meal5
waitPerson got:Meal5
Chef product meal6
waitPerson got:Meal6
Chef product meal7
waitPerson got:Meal7
Chef product meal8
waitPerson got:Meal8
Chef product meal9
waitPerson got:Meal9
out of food,closing
Chef product meal10
Chef interruped
waitPerson interrupdate

可以看到结果,线程有条不紊的运行,生产者每生产一个meal,发出order up信号,然后消费者消费这个meal,再轮到生产者,再到消费者,这就是生产者和消费者的模式的意义,它们之间同步工作,不会出现消费者线程没有meal的时候仍然去消费,生产者在有meal的时候依然去生产,这样就会产生线程安全的问题。

 三:关于线程之间的通信

3.1:wait和notfiy方法

 

 查询jdk可以看出,wait和notify方法都是object的方法,也就是说java所有的对象都可以继承这个方法。每个对象运行的时候都可以关联一个线程,调用wait方法就可以使当前线程处于等待状态,等待的时候它的它会释放掉锁,此时其它对象可以获取线程锁。而wait方法和notify是进行相互沟通的,只有notify/notifyAll方法才能唤醒被wait方法等待的线程,notify会唤醒这个线程。

注意看它的解释:唤醒在此对象监视器上等待的单个/所有线程,那么问题来了。监视器又是什么?

3.2:java监视器

3.2.1:对监视器的解释

监视器:monitor
锁:lock(JVM里只有一种独占方式的lock)
进入监视器:entermonitor
离开/释放监视器:leavemonitor
(entermonitor和leavemonitor是JVM的指令)
拥有者:owner

在JVM里,monitor就是实现lock的方式
entermonitor就是获得某个对象的lock(owner是当前线程)
leavemonitor就是释放某个对象的lock

 

3.2.2:通俗的解释

      上面的这段话比较精简,其实理解起来很简单。有这样一句话:程序即生活,我们把它代入到生活实际中试衣间来思考一下,其实不难理解这段话。我们去一个商城购买衣服,选择好了,我们去试衣间,此刻好比对象entermonitor ,执行进入监视器这个指令,我们离开试衣间,就好比执行leavemonitor指令,这个monitor是一个狭小的空间,当一个对象进入的时候,其它对象就不能进入了。这其中的原理实现了队列等待和竞争的,线程进入会先排队,如果有线程进入监视器,当前线程就会等待,如果没有就直接进入,新来的线程会和已经等待的线程进行竞争,但是最后只有一个能进入,进入监视器的线程可以访问它的所有数据,包括类变量和实例变量。这也就实现了锁的原理。

3.3:线程协作解决的问题

3.3.1:线程数据访问的紊乱

假如没有协作的话,数据之间的访问一定会出现紊乱,比如生产者会可能在有很多meal的时候仍然会生产meal,消费者会在没有meal的时候依然进行消费,这都是我们在多线程的程序中不愿看到的,因为它会有很多不预期的事情发生

3.3.2:数据会错位

数据之间会错位,这体现在生产者会消费meal,而消费者会生产meal。此刻他们不能各自履行各自的职责,这也是我们极度不愿意看到的,而不采用保障措施就会有这样问题的出现。

可以看到

四:总结

 本篇博客主要引用《Thinking in java》中的一段进行讲解,讲了线程之间的通信与协作的问题,如何合理运用wait和notify方法进行线程之间的通信,实现一个系统有条不紊的运行。这其中的最关键点在于理解锁的释放与获取的过程,以及线程的切换,生产者和消费者的相互沟通,协同工作的原理。

以上是关于java生产者与消费者模式的主要内容,如果未能解决你的问题,请参考以下文章

java生产者与消费者模式

Java并发程序设计设计模式与并发之生产者-消费者模式

Java设计模式之观察者模式(生产者与消费者模式)

JAVA模拟生产者与消费者实例

Java并发编程--生产者与消费者模式介绍

Day806.生产者消费者模式 -Java 性能调优实战