Java深层系列「并发编程系列」让我们一起探索一下CompletionService的技术原理和使用指南

Posted 浩宇天尚

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Java深层系列「并发编程系列」让我们一起探索一下CompletionService的技术原理和使用指南相关的知识,希望对你有一定的参考价值。

CompletionService基本介绍

  • CompletionService与ExecutorService类似都可以用来执行线程池的任务,ExecutorService继承了Executor接口,而CompletionService则是一个接口。
  • 主要是Executor的特性决定的,Executor框架不能完全保证任务执行的异步性,那就是如果需要实现任务(task)的异步性,只要为每个task创建一个线程就实现了任务的异步性。

在高并发的情况下,不断创建线程异步执行任务将会极大增大线程创建的开销、造成极大的资源消耗和影响系统的稳定性。另外,Executor框架还支持同步任务的执行,就是在execute方法中调用提交任务的run()方法就属于同步调用,当我们采用异步的时候,需要进行的就是获取Future对象,之后在需要使用的时候get出来结果即可。

异步调用判断机制

一般情况下,如果需要判断任务是否完成,思路是得到Future列表的每个Future,然后反复调用其get方法,并将timeout参数设为0,从而通过轮询的方式判断任务是否完成。为了更精确实现任务的异步执行以及更简便的完成任务的异步执行,可以使用CompletionService

CompletionService实现原理

CompletionService实际上可以看做是Executor和BlockingQueue的结合体。CompletionService在接收到要执行的任务时,通过类似BlockingQueue的put和take获得任务执行的结果。CompletionService的一个实现是ExecutorCompletionService,ExecutorCompletionService把具体的计算任务交给Executor完成。

QueueingFuture的源码如下

  • ExecutorCompletionService在构造函数中会创建一个BlockingQueue(使用的基于链表的无界队列LinkedBlockingQueue),该BlockingQueue的作用是保存Executor执行的结果。当计算完成时,调用FutureTask的done方法。
  • 当提交一个任务到ExecutorCompletionService时,首先将任务包装成QueueingFuture,它是FutureTask的一个子类,然后改写FutureTask的done方法,之后把Executor执行的计算结果放入BlockingQueue中。
private class QueueingFuture extends FutureTask<Void> 
QueueingFuture(RunnableFuture<V> task)
super(task, null);
this.task = task;

protected void done() completionQueue.add(task);
private final Future<V> task;

CompletionService将提交的任务转化为QueueingFuture,并且覆盖了done方法,在done方法中就是将任务加入任务队列中。

使用ExecutorService实现任务

比如:电商中加载商品详情这一操作,因为商品属性的多样性,将商品的图片显示与商品简介的显示设为两个独立执行的任务。
另外,由于商品的图片可能有许多张,所以图片的显示往往比简介显示更慢。这个时候异步执行能够在一定程度上加快执行的速度提高系统的性能。

public class DisplayProductInfoWithExecutorService 
//线程池
private final ExecutorService executorService = Executors.newFixedThreadPool(2);
//日期格式器
private final DateFormat format = new SimpleDateFormat("HH:mm:ss");
// 由于可能商品的图片可能会有很多张,所以显示商品的图片往往会有一定的延迟
// 除了商品的详情外还包括商品简介等信息的展示,由于这里信息主要的是文字为
// 主,所以能够比图片更快显示出来。下面的代码就以执行这两个任务为主线,完
// 成这两个任务的执行。由于这两个任务的执行存在较大差距,所以想到的第一个
// 思路就是异步执行,首先执行图像的下载任务,之后(不会很久)开始执行商品
// 简介信息的展示,如果网络足够好,图片又不是很大的情况下,可能在开始展示
// 商品的时候图像就下载完成了,所以自然想到使用Executor和Callable完成异
// 步任务的执行。

public void renderProductDetail()
final List<ProductInfo> productInfos = loadProductImages();
//异步下载图像的任务
Callable<List<ProductImage>> task = new Callable<List<ProductImage>>()
@Override
public List<ProductImage> call() throws Exception
List<ProductImage> imageList = new ArrayList<>();
for (ProductInfo info : productInfos)
imageList.add(info.getImage());

return imageList;

;
//提交给线程池执行
Future<List<ProductImage>> listFuture = executorService.submit(task);
//展示商品简介的信息
renderProductText(productInfos);
try
//显示商品的图片
List<ProductImage> imageList = listFuture.get();
renderProductImage(imageList);
catch (InterruptedException e)
// 如果显示图片发生中断异常则重新设置线程的中断状态
// 这样做可以让wait中的线程唤醒
Thread.currentThread().interrupt();
// 同时取消任务的执行,参数false表示在线程在执行不中断
listFuture.cancel(true);
catch (ExecutionException e)
try
throw new Throwable(e.getCause());
catch (Throwable throwable)
throwable.printStackTrace();





private void renderProductImage(List<ProductImage> imageList )
for (ProductImage image : imageList)
try
TimeUnit.SECONDS.sleep(1);
catch (InterruptedException e)
e.printStackTrace();


System.out.println(Thread.currentThread().getName() + " display products images! "
+ format.format(new Date()));


private void renderProductText(List<ProductInfo> productInfos)
for (ProductInfo info : productInfos)
try
Thread.sleep(50);
catch (InterruptedException e)
e.printStackTrace();


System.out.println(Thread.currentThread().getName() + " display products description! "
+ format.format(new Date()));


private List<ProductInfo> loadProductImages()
List<ProductInfo> list = new ArrayList<>();
try
TimeUnit.SECONDS.sleep(5);
catch (InterruptedException e)
e.printStackTrace();

ProductInfo info = new ProductInfo();
info.setImage(new ProductImage());
list.add(info);
System.out.println(Thread.currentThread().getName() + " load products info! "
+ format.format(new Date()));
return list;


/**
* 商品
*/
private static class ProductInfo
private ProductImage image;

public ProductImage getImage()
return image;


public void setImage(ProductImage image)
this.image = image;



private static class ProductImage

深入浅出Java并发编程指南「源码原理系列」让我们一起探索一下CyclicBarrier的技术原理和源码分析

#私藏项目实操分享# Java深层系列「技术盲区」让我们一起探索一下Netty(Java)底层的“零拷贝Zero-Copy”技术(上)

#私藏项目实操分享#Java深层系列「技术盲区」让我们一起去挑战一下如何读取一个较大或者超大的文件数据!

深入浅出Java并发编程指南「难点 - 核心 - 遗漏」让我们一起探索一下CyclicBarrier的技术原理和源码分析

深入浅出Java并发编程指南「难点 - 核心 - 遗漏」让我们一起探索一下CountDownLatch的技术原理和源码分析

深入浅出Java并发编程指南「难点 - 核心 - 遗漏」让我们一起探索一下CompletionService的技术原理和使用指南