java并发编程实战读书笔记 ExecutorCompletionService

Posted 郭梧悠

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了java并发编程实战读书笔记 ExecutorCompletionService相关的知识,希望对你有一定的参考价值。

当我们向Executor提交一组任务,并且希望任务在完成后获得结果,此时可以考虑使用ExecutorCompletionService。
ExecutorCompletionService实现了CompletionService接口。ExecutorCompletionService将Executor和BlockingQueue功能融合在一起,使用它可以提交我们的Callable任务。这个任务委托给Executor执行,可以使用ExecutorCompletionService对象的take和poll方法获取结果。其中BlockingQueue负责保存计算完成的结果,其基本结构如下:

public class ExecutorCompletionService<V> implements CompletionService<V> {
    private final Executor executor;
    private final AbstractExecutorService aes;
    private final BlockingQueue<Future<V>> completionQueue;

      public ExecutorCompletionService(Executor executor) {
        this.executor = executor;
        this.aes = (executor instanceof AbstractExecutorService) ?
            (AbstractExecutorService) executor : null;
        this.completionQueue = new LinkedBlockingQueue<Future<V>>();
    }
}    

ExecutorCompletionService对象使用submit提交任务,任务同样返回一个Future。且在执行任务的时候,Task会被封装成RunnableFuture,RunnableFuture是一个接口,可以是FutureTask,也可以是其他实现了RunnableFuture的类型。进而封装成QueueingFuture交给executor执行。


 public Future<V> submit(Callable<V> task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<V> f = newTaskFor(task);
        executor.execute(new QueueingFuture<V>(f, completionQueue));
        return f;
    }

  private RunnableFuture<V> newTaskFor(Callable<V> task) {
        if (aes == null)
            return new FutureTask<V>(task);
        else
            return aes.newTaskFor(task);
    }

QueueingFuture是什么呢?它是FutureTask的子类,主要是重写了FutureTask的done方法,将计算结果放到阻塞队列里。

   private static class QueueingFuture<V> extends FutureTask<Void> {
        QueueingFuture(RunnableFuture<V> task,
                       BlockingQueue<Future<V>> completionQueue) {
            super(task, null);
            this.task = task;
            this.completionQueue = completionQueue;
        }
        //在此处task可以看作是FutureTask.
        private final Future<V> task;
        private final BlockingQueue<Future<V>> completionQueue;
        //将计算结果放到阻塞队列里
        protected void done() { completionQueue.add(task); }
    }

注意我们放进队列的是FutureTask.我们从ExecutorCompletionService的take方法里获取到的是一个FutureTask对象,如果想要获取到最终结果,需要调用FutureTask的get方法:

Result r = ecs.take().get();

所以我们可以使用如下代码来使用CompletionService,创建ExecutorCompletionService对象;使用Executor提交任务;使用ExecutorCompletionService的take方法获取FutureTask,然后调用FutureTask的get方法获取结果。假设你有一些任务希望并发的执行,并且向往将结果交给某方法执行,则可以用如下代码来实现:

void solve(Executor e, Collection<Callable<Result>> solvers) {
    
     CompletionService<Result> ecs
       = new ExecutorCompletionService<Result>(e);
    //循环提交每一个任务   
    for (Callable<Result> s : solvers)
      ecs.submit(s);
      
    int n = solvers.size();
    for (int i = 0; i < n; ++i) {
      Result r = ecs.take().get();
      if (r != null)
       use(r);
    }
  }}

上面我们在for循环里使用ecs.submit(s)来提交一个任务。ecs.submit返回的是一个Future<Result>对象,所以我们提交了多少次,就产生了多少个Future<Result>对象。所以我们可以使用List集合将这些Future<Result>保存起来,可以解决如下的应用场景,假设您希望使用第一个非空结果,忽略所有遇到异常的任务,以及在第一个任务准备就绪时取消所有其他任务。那么如下代码可以帮助到你。

 void solve(Executor e,
             Collection<Callable<Result>> solvers)
      throws InterruptedException {
      
    CompletionService<Result> ecs
        = new ExecutorCompletionService<Result>(e);
    int n = solvers.size();
    //用来保存执行的结果Future集合
    List<Future<Result>> futures = new ArrayList<>(n);
    //提交任务保存结果
    for (Callable<Result> s : solvers)
        futures.add(ecs.submit(s));
    Result result = null;
    try {
        
      for (int i = 0; i < n; ++i) {
       try {
          //获取任务,当有一个任务完成时就退出循环
          Result r = ecs.take().get();
          if (r != null) {
            result = r;
            break;
          }
        } catch (ExecutionException ignore) {}
      }
    }
    finally {
      //取消其他任务
      for (Future<Result> f : futures)
        f.cancel(true);
    }
 
    if (result != null)
      use(result);
  }}

以上是关于java并发编程实战读书笔记 ExecutorCompletionService的主要内容,如果未能解决你的问题,请参考以下文章

Java并发编程实战读书笔记之死锁

Java并发编程实战读书笔记之死锁

java并发编程实战读书笔记 ExecutorCompletionService

java并发编程实战读书笔记 ExecutorCompletionService

java并发编程实战读书笔记之FutureTask

java并发编程实战读书笔记之FutureTask