并行模式之生产者-消费者模式
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了并行模式之生产者-消费者模式相关的知识,希望对你有一定的参考价值。
参考技术A 生产者-消费者模式是一种经典的多线程设计模式。它通常有两类线程,即若干个生产者线程和若干个消费者线程。生产者线程负责提交用户请求,消费者线程负责处理生产者提交的任务。两者线程通过共享内存缓冲区进行通信。生产者线程将任务提交到共享内存缓冲区,消费者线程并不直接与生产者线程通信,而是在共享内存缓冲区中获取任务,并进行处理。共享内存缓冲区是其核心组件,它负责生产者和消费者之间的通信,避免两种直接通信,生产者和消费者都不需要知道对方的存在。由于共享内存缓冲区的存在,它允许生产者和消费者在执行速度上存在时间差,无论哪一方的速度高于对方,都可以通过共享内存缓冲区得到缓解,保证系统正常运行。
生产者-消费者模式的主要角色及作用:
生产者:用于提交用户请求,提取用户任务,并装入内存缓冲区
消费者:在内存缓冲区中提取处理任务
内存缓冲区:缓存生产者提交的任务或数据,供消费者使用
任务:生产者向内存缓冲区提交的数据结构
Main:使用生产者和消费者的客户端
BlockingQueue充当了共享内存缓存区,用于维护任务或数据队列。PCData对象表示一个生产任务或数据。生产者和消费者对象均引用同一个BlockingQueue对象实例,生产者负责创建PCData对象,并将它加入到BlockingQueue中,消费者则从BlockingQueue中获取PCData。
--参考文献《实战Java高并发程序设计》
并行设计模式-- Future模式
Java多线程编程中,常用的多线程设计模式包括:Future模式、Master-Worker模式、Guarded Suspeionsion模式、不变模式和生产者-消费者模式等。这篇文章主要讲述Future模式,关于其他多线程设计模式的地址如下:
关于Master-Worker模式的详解: 并行设计模式(二)-- Master-Worker模式
关于Guarded Suspeionsion模式的详解: 并行设计模式(三)-- Guarded Suspeionsion模式
关于不变模式的详解: 并行设计模式(四)-- 不变模式
关于生产者-消费者模式的详解:并行设计模式(五)-- 生产者-消费者模式
1. Future模式
Future模式的核心在于:去除了主函数的等待时间,并使得原本需要等待的时间段可以用于处理其他业务逻辑。
Future模式有点类似于商品订单。在网上购物时,提交订单后,在收货的这段时间里无需一直在家里等候,可以先干别的事情。类推到程序设计中时,当提交请求时,期望得到答复时,如果这个答复可能很慢。传统的是一直持续等待直到这个答复收到之后再去做别的事情,但如果利用Future模式,其调用方式改为异步,而原先等待返回的时间段,在主调用函数中,则可以用于处理其他事务。
例如如下的请求调用过程时序图。当call请求发出时,需要很长的时间才能返回。左边的图需要一直等待,等返回数据后才能继续其他操作;而右边的Future模式的图中客户端则无需等到可以做其他的事情。服务器段接收到请求后立即返回结果给客户端,这个结果并不是真实的结果(是虚拟的结果),也就是先获得一个假数据,然后执行其他操作。
Future模式的主要参与者如下表所示:
参 与 者 |
作 用 |
Main | 系统启动,调用Client发出请求 |
Client | 返回Data对象,立即返回FutureData,并开启ClientThread线程装配RealData |
Data | 返回数据的接口 |
FutureData | Future数据,构造很快,但是是一个虚拟的数据,需要装配RealData |
RealData | 真实数据,其构造是比较慢的 |
2. Future模式的代码实现
应用实例对应模式结构图如下所示:
<1. Main函数的实现
Main函数主要负责调用Client发起请求,并使用返回的数据:
public class Main {
public static void main(String[] args) {
Client client = new Client();
// 这里会立即返回,因为获取的是FutureData,而非RealData
Data data = client.request("name");
System.out.println("请求完毕");
try {
// 这里可以用一个sleep代替对其他业务逻辑的处理
// 在处理这些业务逻辑过程中,RealData也正在创建,从而充分了利用等待时间
Thread.sleep(2000);
// 使用真实数据
System.out.println("数据=" + data.getResult());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
<2. Client的实现
Client主要实现了获取futrueData,开启构造RealData的线程,并在接受请求后,很快地返回FutureData
public class Client {
public Data request(final String string) {
final FutureData futureData = new FutureData();
new Thread(new Runnable() {
@Override
public void run() {
// RealData的构建很慢,所以放在单独的线程中运行
RealData realData = new RealData(string);
futureData.setRealData(realData);
}
}).start();
return futureData; // 先直接返回FutureData
}
}
<3. Data的实现
Data是一个接口,提供了getResult()方法。无论futureData或者RealData都实现了这个接口
public interface Data {
String getResult() throws InterruptedException;
}
<4. FutureData的实现
FutureData实现了一个快速返回的RealData包装。它只是一个包装,或者说是一个RealData的虚拟实现。因此,它可以很快被构造并返回。当使用FutureData的getResult()方法是,程序会阻塞,等待RealData被注入到程序中,才使用RealData的getResult()方法返回。
1 public class FutureData implements Data {
2 RealData realData = null; // FutureData是RealData的封装
3 boolean isReady = false; // 是否已经准备好
4
5 public synchronized void setRealData(RealData realData) {
6 if (isReady)
7 return;
8 this.realData = realData;
9 isReady = true;
10 notifyAll(); // RealData已经被注入到FutureData中了,通知getResult()方法
11 }
12
13 @Override
14 public String getResult() throws InterruptedException {
15 if (!isReady) {
16 wait(); // 一直等到RealData注入到FutureData中
17 }
18 return realData.getResult();
19 }
20 }
<5. RealData的实现
RealData是最终需要使用的数据模型,它的构造很慢。在这里,使用sleep()函数模拟这个过程
public class RealData implements Data {
protected String data;
public RealData(String data) {
// 利用sleep方法来表示RealData构造过程是非常缓慢的
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
this.data = data;
}
@Override
public String getResult() throws InterruptedException {
return data;
}
}
3. JDK的内置Future模式实现
由于Future是非常常用的多线程设计模式,因此在JDK中内置了Future模式的实现。这些类在java.util.concurrent包里面。其中最为重要的是FutureTask类,它实现了Runnable接口,作为单独的线程运行。在其run()方法中,通过Sync内部类调用Callable接口,并维护Callable接口的返回对象。当使用FutureTask.get()方法时,将返回Callable接口的返回对象。其核心结构图如下所示:
JDK内置的Future模式功能强大,除了基本的功能外,它还可以取消Future任务,或者设定future任务的超时时间。Callable接口是一个用户自定义的实现。在应用程序中,通过实现Callable接口的call()方法,指定FutureTask的实际工作内容和返回对象。
Future接口提供的线程控制功能有:
1 boolean cancle(boolean mayInterruptIfRunning); // 取消任务
2 boolean isCancelled(); // 是否已经取消
3 boolean isDone(); // 是否已经完成
4 V get() throws InterruptedException, ExecutionException; //取得返回对象
5 V get(long timeout, TimeUnit unit); //取得返回对象,可以设置超时时间
同样,针对上述的实例,如果使用JDK自带的实现,则需要作一些调整。
首先,需要实现Callable接口,实现具体的业务逻辑。在本例中,依然使用RealData来实现这个接口:
1 public class RealData implements Callable<String> {
2 private String para;
3
4 public RealData(String para) {
5 this.para = para;
6 }
7
8 @Override
9 public String call() throws Exception {
10 // 利用sleep方法来表示真是业务是非常缓慢的
11 StringBuffer sb = new StringBuffer();
12 for (int i = 0; i < 10; i++) {
13 sb.append(para);
14 try {
15 Thread.sleep(1000);
16 } catch (InterruptedException e) {
17 e.printStackTrace();
18 }
19 }
20 return sb.toString();
21 }
22 }
在这个改进中,RealData的构造变动非常快,因为其主要业务逻辑被移动到call()方法内,并通过call()方法返回。
Main方法修改如下,由于使用了JDK的内置框架,Data、FutureData等对象就不再需要了。在Main方法的实现中,直接通过RealData构造FutureTask,并将其作为单独的线程运行。在提交请求后,执行其他业务逻辑,最后通过FutureTask.get()方法,得到RealData的执行结果。
1 public class Main {
2 public static void main(String[] args) {
3 FutureTask<String> future = new FutureTask<String>(new RealData("liangyongxing"));
4 ExecutorService executor = Executors.newFixedThreadPool(1); // 使用线程池
5 //执行FutureTask,相当于上例中的client.request("name")发送请求
6 //在这里开启线程进行RealData的call()执行
7 executor.submit(future);
8 System.out.println("请求完毕");
9
10 try {
11 // 这里仍然可以做额外的数据操作,这里使用sleep代替其他业务逻辑的处理
12 Thread.sleep(2000);
13
14 /**
15 * 相当于上例当中的 data.getResult(),取得call()方法的返回值
16 * 如果此时call()方法没有执行完毕,则依然会等待
17 */
18 System.out.println("数据 = " + future.get());
19 } catch (InterruptedException | ExecutionException e) {
20 e.printStackTrace();
21 } finally {
executor.shutdown();
}
22 }
23 }
以上是关于并行模式之生产者-消费者模式的主要内容,如果未能解决你的问题,请参考以下文章
Java--多线程之生产者消费者模式;线程池ExecutorService
并行程序设计模式-Master-Worker模式-Guarded Suspension模式-不变模式-生产者-消费者模式的理解