执行者完成服务?如果我们有invokeAll,为啥还需要一个?

Posted

技术标签:

【中文标题】执行者完成服务?如果我们有invokeAll,为啥还需要一个?【英文标题】:ExecutorCompletionService? Why do need one if we have invokeAll?执行者完成服务?如果我们有invokeAll,为什么还需要一个? 【发布时间】:2012-08-06 00:51:00 【问题描述】:

如果我们使用ExecutorCompletionService,我们可以将一系列任务作为Callables 提交,并获得与CompletionService 交互的结果作为queue

但也有ExecutorServiceinvokeAll 接受Collection 的任务,我们得到Future 的列表来检索结果。

据我所知,使用其中一个或另一个没有任何好处(除了我们避免使用invokeAllfor 循环,我们将不得不将submit 任务分配给@987654332 @) 本质上它们是相同的想法,但略有不同。

那么为什么有两种不同的方式来提交一系列任务呢?我是否纠正了它们在性能方面是等效的?有没有一种情况比另一种更合适?我想不出一个。

【问题讨论】:

【参考方案1】:

使用ExecutorCompletionService.poll/take,您将在完成时收到Futures,按完成顺序(或多或少)。使用ExecutorService.invokeAll,你没有这个权力;您要么阻塞直到全部完成,要么指定一个超时,在此之后取消不完整的。


static class SleepingCallable implements Callable<String> 

  final String name;
  final long period;

  SleepingCallable(final String name, final long period) 
    this.name = name;
    this.period = period;
  

  public String call() 
    try 
      Thread.sleep(period);
     catch (InterruptedException ex)  
    return name;
  

现在,我将在下面演示invokeAll 的工作原理:

final ExecutorService pool = Executors.newFixedThreadPool(2);
final List<? extends Callable<String>> callables = Arrays.asList(
    new SleepingCallable("quick", 500),
    new SleepingCallable("slow", 5000));
try 
  for (final Future<String> future : pool.invokeAll(callables)) 
    System.out.println(future.get());
  
 catch (ExecutionException | InterruptedException ex)  
pool.shutdown();

这会产生以下输出:

C:\dev\scrap>java CompletionExample
... after 5 s ...
quick
slow

使用CompletionService,我们会看到不同的输出:

final ExecutorService pool = Executors.newFixedThreadPool(2);
final CompletionService<String> service = new ExecutorCompletionService<String>(pool);
final List<? extends Callable<String>> callables = Arrays.asList(
    new SleepingCallable("slow", 5000),
    new SleepingCallable("quick", 500));
for (final Callable<String> callable : callables) 
  service.submit(callable);

pool.shutdown();
try 
  while (!pool.isTerminated()) 
    final Future<String> future = service.take();
    System.out.println(future.get());
  
 catch (ExecutionException | InterruptedException ex)  

这会产生以下输出:

C:\dev\scrap>java CompletionExample
... after 500 ms ...
quick
... after 5 s ...
slow

注意时间是相对于程序开始的,而不是之前的消息。


您可以在here 上找到完整代码。

【讨论】:

所以你是说在从invokeAll 返回的List&lt;Future&gt; 中,如果开始迭代结果,我可以阻止第一个直到它完成,而在ExecutioncCompletion 中我会阻止直到任何一个结果可用?我明白你的意思了吗? +1 是的,没错@user384706。在ExecutorCompletionService 下方是BlockingQueue&lt;Future&lt;V&gt;&gt;,因此您可以等待第一个 作业完成,而不是全部完成。 @user384706 好吧,使用非超时形式返回Futures 后全部完成,无限期阻塞。 @Gray:但在invokeAll我也不等所有人完成 嘿。我从不将赋值放在循环条件中。我猜是个讨厌鬼。好答案。 :-)【参考方案2】:

我实际上从未使用过 ExecutorCompletionService,但我认为这可能比“正常”ExecutorService 更有用的情况是,当您希望按完成顺序接收已完成任务的 Futures 时。使用 invokeAll,您只需获得一个列表,其中可以包含在任何给定时间未完成和已完成的任务。

【讨论】:

【参考方案3】:

那么为什么有两种不同的方式来提交一系列任务呢?我是否正确地认为它们是等效的?有没有一种情况比另一种更合适?我想不出一个。

通过使用ExecutorCompletionService,您可以在每个作业完成时立即收到通知。相比之下,ExecutorService.invokeAll(...) 会等待您的所有个作业完成,然后再返回Futures 的集合。这意味着(例如),如果除了一项作业之外的所有作业都在 10 分钟内完成,而一项作业需要 30 分钟,那么您将在 30 分钟内没有任何结果。

// this waits until _all_ of the jobs complete
List<Future<Object>> futures = threadPool.invokeAll(...);

相反,当您使用ExecutorCompletionService 时,您将能够在每个作业完成后立即获得作业,这允许您(例如)将它们发送到另一个线程池进行处理,立即记录结果,等等。

ExecutorService threadPool = Executors.newFixedThreadPool(2);
ExecutorCompletionService<Result> compService
      = new ExecutorCompletionService<Result>(threadPool);
for (MyJob job : jobs) 
    compService.submit(job);

// shutdown the pool but the jobs submitted continue to run
threadPool.shutdown();
while (true) 
    Future<Result> future;
    // if pool has terminated (all jobs finished after shutdown) then poll() else take()
    if (threadPool.isTerminated()) 
        future = compService.poll();
        if (future == null) 
            break;
        
     else 
        // the take() blocks until any of the jobs complete
        // this joins with the jobs in the order they _finish_
        future = compService.take();
    
    // this get() won't block
    Result result = future.get();
    // you can then put the result in some other thread pool or something
    // to immediately start processing it
    someOtherThreadPool.submit(new SomeNewJob(result));

【讨论】:

while(!threadPool.isTerminated()) 不是很忙的正式等待吗? 它只是 take() 块,所以它没有旋转。我回答你的问题了吗@Sergio? 好的,谢谢!我正在研究如何限制Executors.newFixedThreadPool 内部的阻塞队列。特别是我正在使用ListenableFuture @Gray 我不明白你对while(!threadPool.isTerminated())的解释。为什么需要它?它有什么用途? isTerminate() 如果池已关闭且所有作业均已完成,则为真。这就是你要问的@tinkuge?【参考方案4】:

只考虑结果的顺序进行比较:

当我们使用CompletionService 时,每当提交的作业完成时,结果将被推送到队列(完成顺序)。那么提交作业的顺序和返回的结果就不再一样了。因此,如果您担心任务执行的顺序,请使用 CompletionService

其中 As invokeAll 返回代表任务的 Future 列表,其顺序与给定任务列表的迭代器生成的顺序相同,每个任务都已完成。

【讨论】:

以上是关于执行者完成服务?如果我们有invokeAll,为啥还需要一个?的主要内容,如果未能解决你的问题,请参考以下文章

是使用invokeAll还是submit - java Executor服务

java多线程

如何使用 invokeAll() 让所有线程池完成任务?

fork/join模式中fork和invokeAll的区别

为多个任务设置时限invokeAll

ExecutorService - invokeAll 和 invokeAny 使用场景