如何等待 ThreadPoolExecutor 完成

Posted

技术标签:

【中文标题】如何等待 ThreadPoolExecutor 完成【英文标题】:How to wait for a ThreadPoolExecutor to finish 【发布时间】:2012-06-11 15:32:42 【问题描述】:

我的问题:如何在ThreadPoolExecutor 上执行一堆线程对象并等待它们全部完成后再继续?

我是 ThreadPoolExecutor 的新手。所以这段代码是一个测试来了解它是如何工作的。现在我什至没有用对象填充BlockingQueue,因为我不明白如何在不使用另一个RunnableObject 调用execute() 的情况下启动队列。无论如何,现在我只是打电话给awaitTermination(),但我想我仍然错过了一些东西。任何提示都会很棒!谢谢。

public void testThreadPoolExecutor() throws InterruptedException 
  int limit = 20;
  BlockingQueue q = new ArrayBlockingQueue(limit);
  ThreadPoolExecutor ex = new ThreadPoolExecutor(limit, limit, 20, TimeUnit.SECONDS, q);
  for (int i = 0; i < limit; i++) 
    ex.execute(new RunnableObject(i + 1));
  
  ex.awaitTermination(2, TimeUnit.SECONDS);
  System.out.println("finished");

RunnableObject 类:

package playground;

public class RunnableObject implements Runnable 

  private final int id;

  public RunnableObject(int id) 
    this.id = id;
  

  @Override
  public void run() 
    System.out.println("ID: " + id + " started");
    try 
      Thread.sleep(2354);
     catch (InterruptedException ignore) 
    
    System.out.println("ID: " + id + " ended");
  

【问题讨论】:

我认为在您发布问题后立即回答并建议您自己的答案不太合适不是一个好主意。充其量,这保证并更新您的原始问题,解释为什么该答案不够。 @Link,你是对的。我会解决的。 我刚刚编辑了我的答案,以展示一种在关闭执行程序之前等待所有作业完成的方法。 How to wait for all threads to finish, using ExecutorService?的可能重复 【参考方案1】:

您的问题似乎是您在将所有作业提交到您的池后没有调用shutdown。如果没有shutdown(),您的awaitTermination 将始终返回false。

ThreadPoolExecutor ex =
    new ThreadPoolExecutor(limit, limit, 20, TimeUnit.SECONDS, q);
for (int i = 0; i < limit; i++) 
  ex.execute(new RunnableObject(i + 1));

// you are missing this line!!
ex.shutdown();
ex.awaitTermination(2, TimeUnit.SECONDS);

您还可以执行以下操作来等待所有工作完成:

List<Future<Object>> futures = new ArrayList<Future<Object>>();
for (int i = 0; i < limit; i++) 
  futures.add(ex.submit(new RunnableObject(i + 1), (Object)null));

for (Future<Object> future : futures) 
   // this joins with the submitted job
   future.get();

...
// still need to shutdown at the end
ex.shutdown();

另外,因为您正在休眠2354 毫秒,但只等待2 的所有作业终止SECONDSawaitTermination 将始终返回false。我会使用Long.MAX_VALUE 等待作业完成。

最后,听起来您正在担心创建一个新的ThreadPoolExecutor 而您想重用第一个。不要这样。与您为检测作业是否完成而编写的任何代码相比,GC 开销将非常小。


引用 javadocs,ThreadPoolExecutor.shutdown()

启动有序关闭,其中执行先前提交的任务,但不会接受新任务。如果已经关闭,则调用没有额外的效果。

ThreadPoolExecutor.awaitTermination(...)方法中,是等待执行者状态转到TERMINATED。但首先,如果调用了shutdown(),则状态必须转到SHUTDOWN;如果调用了shutdownNow(),则状态必须转到STOP

【讨论】:

【参考方案2】:

这是接受的答案的变体,如果/当抛出 InterruptedException 时处理重试:

executor.shutdown();

boolean isWait = true;

while (isWait)

    try
                 
        isWait = !executor.awaitTermination(10, TimeUnit.SECONDS);
        if (isWait)
        
            log.info("Awaiting completion of bulk callback threads.");
        
     catch (InterruptedException e) 
        log.debug("Interruped while awaiting completion of callback threads - trying again...");
    

【讨论】:

【参考方案3】:

另一种方法是使用 CompletionService,如果您必须尝试任何任务结果,这非常有用:

//run 3 task at time
final int numParallelThreads = 3;

//I used newFixedThreadPool for convenience but if you need you can use ThreadPoolExecutor
ExecutorService executor = Executors.newFixedThreadPool(numParallelThreads);
CompletionService<String> completionService = new ExecutorCompletionService<String>(executor);

int numTaskToStart = 15;

for(int i=0; i<numTaskToStart ; i++)
    //task class that implements Callable<String> (or something you need)
    MyTask mt = new MyTask();

    completionService.submit(mt);


executor.shutdown(); //it cannot be queued more task

try 
    for (int t = 0; t < numTaskToStart ; t++) 
        Future<String> f = completionService.take();
        String result = f.get();
        // ... something to do ...
    
 catch (InterruptedException e) 
    //termination of all started tasks (it returns all not started tasks in queue)
    executor.shutdownNow();
 catch (ExecutionException e) 
    // ... something to catch ...

【讨论】:

【参考方案4】:

你应该循环awaitTermination

ExecutorService threads;
// ...
// Tell threads to finish off.
threads.shutdown();
// Wait for everything to finish.
while (!threads.awaitTermination(10, TimeUnit.SECONDS)) 
  log.info("Awaiting completion of threads.");

【讨论】:

完全做到了。我没有注意到awaitTermination 返回任何东西。没有其他人提到我需要循环通过它来阻止。谢谢! 为什么要循环执行?为什么不增加等待它的秒数。通常我会使用awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS) 或其他方式。 @Gray - 我最初只是在我的一段代码中调用了它一次,有时在退出时会发生真正奇怪的事情,或者一切都被锁定了。最终我找到了它,所以现在我循环并记录了一条“等待”消息,所以我知道发生了一些奇怪的事情。 我喜欢 while 循环,因为我有一个处理文件的任务执行器,它们完成所需的时间是未知的(取决于文件的数量)【参考方案5】:

试试这个,

ThreadPoolExecutor ex =
    new ThreadPoolExecutor(limit, limit, 20, TimeUnit.SECONDS, q);
for (int i = 0; i < limit; i++) 
  ex.execute(new RunnableObject(i + 1));

要添加的行

ex.shutdown();
ex.awaitTermination(timeout, unit)

【讨论】:

你应该检查从等待终止返回的布尔值并继续尝试直到你得到真实。【参考方案6】:

与执行者本身无关。只需使用界面的java.util.concurrent.ExecutorService.invokeAll(Collection&lt;? extends Callable&lt;T&gt;&gt;)。它将阻塞直到所有Callables 完成。

执行者注定要长寿;超出一组任务的生命周期。 shutdown 用于应用程序完成和清理时。

【讨论】:

您能否详细说明一下这个答案。我喜欢这个答案,但我很难实现它。谢谢

以上是关于如何等待 ThreadPoolExecutor 完成的主要内容,如果未能解决你的问题,请参考以下文章

如何在不关闭 Executor 的情况下等待 ThreadPoolExecutor 中的所有任务完成?

阿里推荐的线程使用方法 ThreadPoolExecutor

记录ThreadPoolExecutor主线程等待子线程问题

记录ThreadPoolExecutor主线程等待子线程问题

记录ThreadPoolExecutor主线程等待子线程问题

ThreadPoolExecutor中的核心线程数、最大线程数区别详解