如何在不关闭 Executor 的情况下等待 ThreadPoolExecutor 中的所有任务完成?

Posted

技术标签:

【中文标题】如何在不关闭 Executor 的情况下等待 ThreadPoolExecutor 中的所有任务完成?【英文标题】:How to wait for all tasks in an ThreadPoolExecutor to finish without shutting down the Executor? 【发布时间】:2011-04-25 04:02:29 【问题描述】:

我不能使用shutdown()awaitTermination(),因为它可能会在等待时将新任务添加到 ThreadPoolExecutor。

所以我正在寻找一种方法,等待 ThreadPoolExecutor 清空它的队列并完成它的所有任务,而不停止在此之前添加新任务。

如果有什么不同,这适用于 android

谢谢

更新:很多周后,我重新审视了这个,发现修改后的 CountDownLatch 在这种情况下更适合我。我会保留答案,因为它更适用于我的问题。

【问题讨论】:

如果您对添加的新任务感到满意,但如果它永远不会完成会发生什么? 我认为 littleFluffyKitty 只想等待“旧”任务完成。 我不太担心它永远不会完成的可能性,因为如果是这样的话,那么其他东西已经被严重破坏了。如果一切都失败了,我可以实施某种时间,但我可以假设它会完成。我希望它能够在等待时执行新任务,或者换句话说,我希望在调用等待后能够添加新任务。 我认为这是一个相关的问题:***.com/questions/3402895/java-threadpool-usage。您可能想查看那里列出的答案... 我提供了另一个可能的答案。我还是不明白你说的等待所有任务清空队列但又不想阻止新任务进入队列是什么意思……在某些时候,你必须画线,而TPE无法画线给你的线。调用shutdown() 是创建无法提交新任务的“边缘”的方式;调用awaitTermination() 会创建一个优势,确保您阻塞直到所有先前提交的任务都完成。 【参考方案1】:

如果您想知道某项任务或某批任务何时完成,您可以使用ExecutorService.submit(Runnable)。调用此方法会返回一个 Future 对象,该对象可以放入一个 Collection 中,然后您的主线程将遍历每个对象调用 Future.get()。这将导致您的主线程暂停执行,直到 ExecutorService 处理完所有 Runnable 任务。

Collection<Future<?>> futures = new LinkedList<Future<?>>();
futures.add(executorService.submit(myRunnable));
for (Future<?> future:futures) 
    future.get();

【讨论】:

+1 这似乎是最好的方法。必须在应用程序级别完成,但是,在您提交任务的地方,而不是在执行程序服务级别。 +1,或者在一批任务上使用 invokeAll() 来等待完成。在这里查看我的答案:***.com/questions/3269445/… 对,如果您愿意将您的任务设为Callable 而不是Runnable,那么@andersoj 引用的解决方案要简单得多。 Y,注意 Runnable->Callable 与方便方法 Executors.callable() 无关紧要 @littleFluffyKitty,在原始答案中,只要变量在范围内并继续指向堆中的该数据结构,LinkedList 的内存就会存在于堆空间中。如果您在本地将变量定义为某个方法的一部分,它将在该方法的持续时间内存在。我真的不知道你打算如何使用这个 sn-p。当然,我是像 Java 开发者一样思考,Android 不是 Java。【参考方案2】:

我的场景是一个网络爬虫,用于从网站获取一些信息,然后对其进行处理。一个 ThreadPoolExecutor 用于加快进程,因为可以同时加载许多页面。因此,新任务将在现有任务中创建,因为爬虫将跟踪每个页面中的超链接。问题是一样的:主线程不知道什么时候所有的任务都完成了,它可以开始处理结果。我用一种简单的方法来确定这一点。它不是很优雅,但在我的情况下有效:

while (executor.getTaskCount()!=executor.getCompletedTaskCount())
    System.err.println("count="+executor.getTaskCount()+","+executor.getCompletedTaskCount());
    Thread.sleep(5000);

executor.shutdown();
executor.awaitTermination(60, TimeUnit.SECONDS);

【讨论】:

【参考方案3】:

也许您正在寻找CompletionService 来管理批量任务,另请参阅this answer。

【讨论】:

你的链接好像坏了。 @qwertzguy:替换为更稳定的lamer。谢谢。【参考方案4】:

(这是尝试通过我自己的调整来重现 Thilo 之前删除的答案。)

我认为您可能需要澄清您的问题,因为有一个隐含的无限条件......在某些时候您必须决定关闭您的执行程序,此时它将不再接受任何任务。您的问题似乎暗示您想等到知道不会提交更多任务,您只能在自己的应用程序代码中知道。

以下答案将允许您顺利过渡到新 TPE(无论出于何种原因),完成所有当前提交的任务,并且不会拒绝新 TPE 的新任务。它可能会回答你的问题。 @Thilo 也可能。

假设您已经在某处定义了一个可见的 TPE:

AtomicReference<ThreadPoolExecutor> publiclyAvailableTPE = ...;

然后您可以这样编写 TPE 交换例程。也可以使用同步方法编写,但我认为这更简单:

void rotateTPE()

   ThreadPoolExecutor newTPE = createNewTPE();
   // atomic swap with publicly-visible TPE
   ThreadPoolExecutor oldTPE = publiclyAvailableTPE.getAndSet(newTPE);
   oldTPE.shutdown();

   // and if you want this method to block awaiting completion of old tasks in  
   // the previously visible TPE
   oldTPE.awaitTermination();
 

或者,如果您真的不想杀死线程池,那么您的提交者端将需要在某个时候处理被拒绝的任务,您可以将null 用于新的 TPE:

void killTPE()

   ThreadPoolExecutor oldTPE = publiclyAvailableTPE.getAndSet(null);
   oldTPE.shutdown();

   // and if you want this method to block awaiting completion of old tasks in  
   // the previously visible TPE
   oldTPE.awaitTermination();
 

这可能会导致上游问题,调用者需要知道如何处理null

您也可以用一个简单地拒绝每个新执行的虚拟 TPE 进行替换,但这相当于您在 TPE 上调用 shutdown() 时发生的情况。

【讨论】:

感谢您抽出宝贵的时间来写这篇文章,我会看看所有内容,看看哪条路线效果最好。【参考方案5】:

如果您不想使用shutdown,请遵循以下方法:

    ExecutorService 上的提交遍历所有Future 任务,并按照Tim Bender 的建议在Future 对象上使用阻塞调用get() 检查状态

    使用其中一种

      ExecutorService 上使用invokeAll 使用CountDownLatch 使用ForkJoinPool 或newWorkStealingPool 的Executors(从java 8 开始)

executor服务上的invokeAll()也达到CountDownLatch的相同目的

相关的 SE 问题:

How to wait for a number of threads to complete?

【讨论】:

【参考方案6】:

您可以在 Runner 类上调用 waitTillDone()

Runner runner = Runner.runner(10);

runner.runIn(2, SECONDS, runnable);
runner.run(runnable); // each of this runnables could submit more tasks

runner.waitTillDone(); // blocks until all tasks are finished (or failed)

// and now reuse it

runner.runIn(500, MILLISECONDS, callable);

runner.waitTillDone();
runner.shutdown();

要使用它,请将此 gradle/maven 依赖项添加到您的项目中:'com.github.matejtymes:javafixes:1.0'

更多详情请看这里:https://github.com/MatejTymes/JavaFixes 或这里:http://matejtymes.blogspot.com/2016/04/executor-that-notifies-you-when-task.html

【讨论】:

【参考方案7】:

尝试使用队列大小和活动任务计数,如下所示

 while (executor.getThreadPoolExecutor().getActiveCount() != 0 || !executor.getThreadPoolExecutor().getQueue().isEmpty())
                     try 
                Thread.sleep(500);
             catch (InterruptedException e) 
            
        

【讨论】:

以上是关于如何在不关闭 Executor 的情况下等待 ThreadPoolExecutor 中的所有任务完成?的主要内容,如果未能解决你的问题,请参考以下文章

如何在不抛出 TaskCanceledExceptions 的情况下等待任务?

如何在不等待完成的情况下调用存储过程?

如何在不冻结整个线程的情况下让函数等待

如何在不等待 C++ 响应的情况下启动线程?

C#如何在不等待完成的情况下启动异步方法?

如何在不等待 1 秒的情况下对进程的 CPU 使用情况进行采样