java并发编程实战读书笔记 ExecutorCompletionService
Posted 郭梧悠
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了java并发编程实战读书笔记 ExecutorCompletionService相关的知识,希望对你有一定的参考价值。
当我们向Executor提交一组任务,并且希望任务在完成后获得结果,此时可以考虑使用ExecutorCompletionService。
ExecutorCompletionService实现了CompletionService接口。ExecutorCompletionService将Executor和BlockingQueue功能融合在一起,使用它可以提交我们的Callable任务。这个任务委托给Executor执行,可以使用ExecutorCompletionService对象的take和poll方法获取结果。其中BlockingQueue负责保存计算完成的结果,其基本结构如下:
public class ExecutorCompletionService<V> implements CompletionService<V> {
private final Executor executor;
private final AbstractExecutorService aes;
private final BlockingQueue<Future<V>> completionQueue;
public ExecutorCompletionService(Executor executor) {
this.executor = executor;
this.aes = (executor instanceof AbstractExecutorService) ?
(AbstractExecutorService) executor : null;
this.completionQueue = new LinkedBlockingQueue<Future<V>>();
}
}
ExecutorCompletionService对象使用submit提交任务,任务同样返回一个Future。且在执行任务的时候,Task会被封装成RunnableFuture,RunnableFuture是一个接口,可以是FutureTask,也可以是其他实现了RunnableFuture的类型。进而封装成QueueingFuture交给executor执行。
public Future<V> submit(Callable<V> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<V> f = newTaskFor(task);
executor.execute(new QueueingFuture<V>(f, completionQueue));
return f;
}
private RunnableFuture<V> newTaskFor(Callable<V> task) {
if (aes == null)
return new FutureTask<V>(task);
else
return aes.newTaskFor(task);
}
QueueingFuture是什么呢?它是FutureTask的子类,主要是重写了FutureTask的done方法,将计算结果放到阻塞队列里。
private static class QueueingFuture<V> extends FutureTask<Void> {
QueueingFuture(RunnableFuture<V> task,
BlockingQueue<Future<V>> completionQueue) {
super(task, null);
this.task = task;
this.completionQueue = completionQueue;
}
//在此处task可以看作是FutureTask.
private final Future<V> task;
private final BlockingQueue<Future<V>> completionQueue;
//将计算结果放到阻塞队列里
protected void done() { completionQueue.add(task); }
}
注意我们放进队列的是FutureTask.我们从ExecutorCompletionService的take方法里获取到的是一个FutureTask对象,如果想要获取到最终结果,需要调用FutureTask的get方法:
Result r = ecs.take().get();
所以我们可以使用如下代码来使用CompletionService,创建ExecutorCompletionService对象;使用Executor提交任务;使用ExecutorCompletionService的take方法获取FutureTask,然后调用FutureTask的get方法获取结果。假设你有一些任务希望并发的执行,并且向往将结果交给某方法执行,则可以用如下代码来实现:
void solve(Executor e, Collection<Callable<Result>> solvers) {
CompletionService<Result> ecs
= new ExecutorCompletionService<Result>(e);
//循环提交每一个任务
for (Callable<Result> s : solvers)
ecs.submit(s);
int n = solvers.size();
for (int i = 0; i < n; ++i) {
Result r = ecs.take().get();
if (r != null)
use(r);
}
}}
上面我们在for循环里使用ecs.submit(s)来提交一个任务。ecs.submit返回的是一个Future<Result>
对象,所以我们提交了多少次,就产生了多少个Future<Result>
对象。所以我们可以使用List集合将这些Future<Result>
保存起来,可以解决如下的应用场景,假设您希望使用第一个非空结果,忽略所有遇到异常的任务,以及在第一个任务准备就绪时取消所有其他任务。那么如下代码可以帮助到你。
void solve(Executor e,
Collection<Callable<Result>> solvers)
throws InterruptedException {
CompletionService<Result> ecs
= new ExecutorCompletionService<Result>(e);
int n = solvers.size();
//用来保存执行的结果Future集合
List<Future<Result>> futures = new ArrayList<>(n);
//提交任务保存结果
for (Callable<Result> s : solvers)
futures.add(ecs.submit(s));
Result result = null;
try {
for (int i = 0; i < n; ++i) {
try {
//获取任务,当有一个任务完成时就退出循环
Result r = ecs.take().get();
if (r != null) {
result = r;
break;
}
} catch (ExecutionException ignore) {}
}
}
finally {
//取消其他任务
for (Future<Result> f : futures)
f.cancel(true);
}
if (result != null)
use(result);
}}
以上是关于java并发编程实战读书笔记 ExecutorCompletionService的主要内容,如果未能解决你的问题,请参考以下文章
java并发编程实战读书笔记 ExecutorCompletionService