并行模式之生产者-消费者模式

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了并行模式之生产者-消费者模式相关的知识,希望对你有一定的参考价值。

参考技术A 生产者-消费者模式是一种经典的多线程设计模式。它通常有两类线程,即若干个生产者线程和若干个消费者线程。生产者线程负责提交用户请求,消费者线程负责处理生产者提交的任务。两者线程通过共享内存缓冲区进行通信。

生产者线程将任务提交到共享内存缓冲区,消费者线程并不直接与生产者线程通信,而是在共享内存缓冲区中获取任务,并进行处理。共享内存缓冲区是其核心组件,它负责生产者和消费者之间的通信,避免两种直接通信,生产者和消费者都不需要知道对方的存在。由于共享内存缓冲区的存在,它允许生产者和消费者在执行速度上存在时间差,无论哪一方的速度高于对方,都可以通过共享内存缓冲区得到缓解,保证系统正常运行。

生产者-消费者模式的主要角色及作用:

生产者:用于提交用户请求,提取用户任务,并装入内存缓冲区

消费者:在内存缓冲区中提取处理任务

内存缓冲区:缓存生产者提交的任务或数据,供消费者使用

任务:生产者向内存缓冲区提交的数据结构

Main:使用生产者和消费者的客户端

BlockingQueue充当了共享内存缓存区,用于维护任务或数据队列。PCData对象表示一个生产任务或数据。生产者和消费者对象均引用同一个BlockingQueue对象实例,生产者负责创建PCData对象,并将它加入到BlockingQueue中,消费者则从BlockingQueue中获取PCData。

--参考文献《实战Java高并发程序设计》

并行设计模式-- Future模式

  Java多线程编程中,常用的多线程设计模式包括:Future模式、Master-Worker模式、Guarded Suspeionsion模式、不变模式和生产者-消费者模式等。这篇文章主要讲述Future模式,关于其他多线程设计模式的地址如下:
  关于Master-Worker模式的详解: 并行设计模式(二)-- Master-Worker模式
  关于Guarded Suspeionsion模式的详解: 并行设计模式(三)-- Guarded Suspeionsion模式
  关于不变模式的详解: 并行设计模式(四)-- 不变模式
  关于生产者-消费者模式的详解:并行设计模式(五)-- 生产者-消费者模式

1. Future模式

  Future模式的核心在于:去除了主函数的等待时间,并使得原本需要等待的时间段可以用于处理其他业务逻辑。

  Future模式有点类似于商品订单。在网上购物时,提交订单后,在收货的这段时间里无需一直在家里等候,可以先干别的事情。类推到程序设计中时,当提交请求时,期望得到答复时,如果这个答复可能很慢。传统的是一直持续等待直到这个答复收到之后再去做别的事情,但如果利用Future模式,其调用方式改为异步,而原先等待返回的时间段,在主调用函数中,则可以用于处理其他事务。

  例如如下的请求调用过程时序图。当call请求发出时,需要很长的时间才能返回。左边的图需要一直等待,等返回数据后才能继续其他操作;而右边的Future模式的图中客户端则无需等到可以做其他的事情。服务器段接收到请求后立即返回结果给客户端,这个结果并不是真实的结果(是虚拟的结果),也就是先获得一个假数据,然后执行其他操作。

 

  Future模式的主要参与者如下表所示:

   参 与 者  

                     作  用                      
 Main  系统启动,调用Client发出请求
 Client  返回Data对象,立即返回FutureData,并开启ClientThread线程装配RealData
 Data    返回数据的接口
 FutureData  Future数据,构造很快,但是是一个虚拟的数据,需要装配RealData
 RealData  真实数据,其构造是比较慢的

2. Future模式的代码实现

应用实例对应模式结构图如下所示:

    

<1. Main函数的实现

  Main函数主要负责调用Client发起请求,并使用返回的数据:

public class Main {
    public static void main(String[] args) {
        Client client = new Client();
        // 这里会立即返回,因为获取的是FutureData,而非RealData
        Data data = client.request("name");
        System.out.println("请求完毕");

        try {
            // 这里可以用一个sleep代替对其他业务逻辑的处理
            // 在处理这些业务逻辑过程中,RealData也正在创建,从而充分了利用等待时间
            Thread.sleep(2000);

            // 使用真实数据
            System.out.println("数据=" + data.getResult());
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

<2. Client的实现

  Client主要实现了获取futrueData,开启构造RealData的线程,并在接受请求后,很快地返回FutureData

public class Client {
    public Data request(final String string) {
        final FutureData futureData = new FutureData();

        new Thread(new Runnable() {
            @Override
            public void run() {
                // RealData的构建很慢,所以放在单独的线程中运行
                RealData realData = new RealData(string);
                futureData.setRealData(realData);
            }
        }).start();
        return futureData; // 先直接返回FutureData
    }
}

<3. Data的实现

  Data是一个接口,提供了getResult()方法。无论futureData或者RealData都实现了这个接口

public interface Data {
    String getResult() throws InterruptedException;
}

<4. FutureData的实现

  FutureData实现了一个快速返回的RealData包装。它只是一个包装,或者说是一个RealData的虚拟实现。因此,它可以很快被构造并返回。当使用FutureData的getResult()方法是,程序会阻塞,等待RealData被注入到程序中,才使用RealData的getResult()方法返回。

 1 public class FutureData implements Data {
 2     RealData realData = null; // FutureData是RealData的封装
 3     boolean isReady = false; // 是否已经准备好
 4 
 5     public synchronized void setRealData(RealData realData) {
 6         if (isReady)
 7             return;
 8         this.realData = realData;
 9         isReady = true;
10         notifyAll(); // RealData已经被注入到FutureData中了,通知getResult()方法
11     }
12 
13     @Override
14     public String getResult() throws InterruptedException {
15         if (!isReady) {
16             wait(); // 一直等到RealData注入到FutureData中
17         }
18         return realData.getResult();
19     }
20 }

<5. RealData的实现

  RealData是最终需要使用的数据模型,它的构造很慢。在这里,使用sleep()函数模拟这个过程

public class RealData implements Data {
    protected String data;

    public RealData(String data) {
        // 利用sleep方法来表示RealData构造过程是非常缓慢的
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        this.data = data;
    }

    @Override
    public String getResult() throws InterruptedException {
        return data;
    }
}

3. JDK的内置Future模式实现

  由于Future是非常常用的多线程设计模式,因此在JDK中内置了Future模式的实现。这些类在java.util.concurrent包里面。其中最为重要的是FutureTask类,它实现了Runnable接口,作为单独的线程运行。在其run()方法中,通过Sync内部类调用Callable接口,并维护Callable接口的返回对象。当使用FutureTask.get()方法时,将返回Callable接口的返回对象。其核心结构图如下所示:

    

  JDK内置的Future模式功能强大,除了基本的功能外,它还可以取消Future任务,或者设定future任务的超时时间。Callable接口是一个用户自定义的实现。在应用程序中,通过实现Callable接口的call()方法,指定FutureTask的实际工作内容和返回对象。

  Future接口提供的线程控制功能有:

1 boolean cancle(boolean mayInterruptIfRunning);  // 取消任务
2 boolean isCancelled();  // 是否已经取消
3 boolean isDone();    // 是否已经完成
4 V get() throws InterruptedException, ExecutionException;  //取得返回对象
5 V get(long timeout, TimeUnit unit);  //取得返回对象,可以设置超时时间

  同样,针对上述的实例,如果使用JDK自带的实现,则需要作一些调整。

  首先,需要实现Callable接口,实现具体的业务逻辑。在本例中,依然使用RealData来实现这个接口:

 1 public class RealData implements Callable<String> {
 2     private String para;
 3 
 4     public RealData(String para) {
 5         this.para = para;
 6     }
 7 
 8     @Override
 9     public String call() throws Exception {
10         // 利用sleep方法来表示真是业务是非常缓慢的
11         StringBuffer sb = new StringBuffer();
12         for (int i = 0; i < 10; i++) {
13             sb.append(para);
14             try {
15                 Thread.sleep(1000);
16             } catch (InterruptedException e) {
17                 e.printStackTrace();
18             }
19         }
20         return sb.toString();
21     }
22 }

  在这个改进中,RealData的构造变动非常快,因为其主要业务逻辑被移动到call()方法内,并通过call()方法返回。

  Main方法修改如下,由于使用了JDK的内置框架,Data、FutureData等对象就不再需要了。在Main方法的实现中,直接通过RealData构造FutureTask,并将其作为单独的线程运行。在提交请求后,执行其他业务逻辑,最后通过FutureTask.get()方法,得到RealData的执行结果。

 1 public class Main {
 2     public static void main(String[] args) {
 3         FutureTask<String> future = new FutureTask<String>(new RealData("liangyongxing"));
 4         ExecutorService executor = Executors.newFixedThreadPool(1);    // 使用线程池
 5         //执行FutureTask,相当于上例中的client.request("name")发送请求
 6         //在这里开启线程进行RealData的call()执行
 7         executor.submit(future);
 8         System.out.println("请求完毕");
 9 
10         try {
11             // 这里仍然可以做额外的数据操作,这里使用sleep代替其他业务逻辑的处理
12             Thread.sleep(2000);
13             
14             /**
15              * 相当于上例当中的 data.getResult(),取得call()方法的返回值
16              * 如果此时call()方法没有执行完毕,则依然会等待
17              */
18             System.out.println("数据 = " + future.get());
19         } catch (InterruptedException | ExecutionException e) {
20             e.printStackTrace();
21         } finally {
              executor.shutdown();
          } 22 } 23 }

 

以上是关于并行模式之生产者-消费者模式的主要内容,如果未能解决你的问题,请参考以下文章

Java--多线程之生产者消费者模式;线程池ExecutorService

并行程序设计模式-Master-Worker模式-Guarded Suspension模式-不变模式-生产者-消费者模式的理解

JUC - 多线程之Synchronized和Lock锁;生产者消费者模式

Java并发程序设计设计模式与并发之生产者-消费者模式

设计模式之生产者消费者模式

并行设计模式-- Future模式