Java ExecutorService:等待终止所有递归创建的任务
Posted
技术标签:
【中文标题】Java ExecutorService:等待终止所有递归创建的任务【英文标题】:Java ExecutorService: awaitTermination of all recursively created tasks 【发布时间】:2011-06-24 21:44:51 【问题描述】:我使用ExecutorService
来执行任务。这个任务可以递归地创建提交给同一个ExecutorService
的其他任务,这些子任务也可以这样做。
我现在有一个问题,我想等到所有任务都完成(即所有任务都完成并且他们没有提交新任务)后再继续。
我不能在主线程中调用ExecutorService.shutdown()
,因为这会阻止ExecutorService
接受新任务。
如果 shutdown
没有被调用,调用 ExecutorService.awaitTermination()
似乎没有任何作用。
所以我有点卡在这里。 ExecutorService
看到所有工作人员都处于空闲状态并不是那么难,不是吗?我能想出的唯一不优雅的解决方案是直接使用ThreadPoolExecutor
并每隔一段时间查询它的getPoolSize()
。真的没有更好的办法吗?
【问题讨论】:
【参考方案1】:您可以使用跟踪正在运行的线程的运行器:
Runner runner = Runner.runner(numberOfThreads);
runner.runIn(2, SECONDS, callable);
runner.run(callable);
// blocks until all tasks are finished (or failed)
runner.waitTillDone();
// and reuse it
runner.runRunnableIn(500, MILLISECONDS, runnable);
runner.waitTillDone();
// and then just kill it
runner.shutdownAndAwaitTermination();
要使用它,您只需添加一个依赖项:
编译'com.github.matejtymes:javafixes:1.3.0'
【讨论】:
【参考方案2】:我能想出的唯一不优雅的解决方案是直接使用 ThreadPoolExecutor 并每隔一段时间查询它的 getPoolSize()。真的没有更好的办法吗?
您必须以正确的顺序使用shutdown() ,
awaitTermination()and shutdownNow()
方法。
shutdown()
:启动有序关闭,执行之前提交的任务,但不会接受新任务。
awaitTermination()
:在关闭请求后阻塞,直到所有任务都完成执行,或者发生超时,或者当前线程被中断,以先发生者为准。
shutdownNow()
:尝试停止所有正在执行的任务,停止等待任务的处理,并返回等待执行的任务列表。
来自ExecutorService的oracle文档页面的推荐方式:
void shutdownAndAwaitTermination(ExecutorService pool)
pool.shutdown(); // Disable new tasks from being submitted
try
// Wait a while for existing tasks to terminate
if (!pool.awaitTermination(60, TimeUnit.SECONDS))
pool.shutdownNow(); // Cancel currently executing tasks
// Wait a while for tasks to respond to being cancelled
if (!pool.awaitTermination(60, TimeUnit.SECONDS))
System.err.println("Pool did not terminate");
catch (InterruptedException ie)
// (Re-)Cancel if current thread also interrupted
pool.shutdownNow();
// Preserve interrupt status
Thread.currentThread().interrupt();
如果完成以下任务的持续时间较长,您可以将if条件替换为while条件:
改变
if (!pool.awaitTermination(60, TimeUnit.SECONDS))
到
while(!pool.awaitTermination(60, TimeUnit.SECONDS))
Thread.sleep(60000);
您可以参考其他替代方案(join()
除外,可与独立线程一起使用):
wait until all threads finish their work in java
【讨论】:
【参考方案3】:您可以创建自己的线程池来扩展ThreadPoolExecutor。您想知道任务何时提交以及何时完成。
public class MyThreadPoolExecutor extends ThreadPoolExecutor
private int counter = 0;
public MyThreadPoolExecutor()
super(1, 1, 0, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
@Override
public synchronized void execute(Runnable command)
counter++;
super.execute(command);
@Override
protected synchronized void afterExecute(Runnable r, Throwable t)
super.afterExecute(r, t);
counter--;
notifyAll();
public synchronized void waitForExecuted() throws InterruptedException
while (counter == 0)
wait();
【讨论】:
我比得分为 13 的解决方案更喜欢这个解决方案。但是:“while (counter == 0)”应该是“while (counter > 0)”,对吧??! 【参考方案4】:为您的任务使用Future(而不是提交Runnable
),回调会在完成时更新其状态,因此您可以使用Future.isDone 跟踪所有任务的状态。
【讨论】:
如何获得 Future 的回调?以为你必须调用 .get 就可以了。 当他说回调时,他的意思是你从调用方法返回的值 你做了,我的意思是为你设置了“完成”标志(通过回调)。我已经改写了答案以使其更加明确。 好的,所以他还需要轮询 isDone 吗?只是想确定一下。 @John V. - callable 上的 call 方法返回结果 - 如果需要,请等待它。当你提交一个 Callable 时,你会得到一个 Future。也许我们对回调的定义不同。 不,你是对的,我想念你对他的意思的提问。【参考方案5】:使用CountDownLatch。 将 CountDownLatch 对象传递给您的每个任务,并为您的任务编写如下代码。
public void doTask()
// do your task
latch.countDown();
而需要等待的线程应该执行以下代码:
public void doWait()
latch.await();
当然,这假设您已经知道子任务的数量,以便您可以初始化锁存器的计数。
【讨论】:
锁存器应该初始化成什么? CountDownLatch 存在一个问题,即一旦创建计数就无法重置计数。我假设他不知道系统将调用的任务数量。 是的……我知道,我以为他可以事先知道任务的数量 其实我事先不知道任务数量:)还是谢谢!【参考方案6】:我必须说,上面描述的递归调用任务和等待结束子任务问题的解决方案并不能让我满意。我的解决方案灵感来自 Oracle 的原始文档:CountDownLatch 和示例:Human resources CountDownLatch。
类 HRManagerCompact 实例中的第一个公共线程有两个子线程的等待锁存器,其后续 2 个子线程的等待锁存器......等等。
当然,latch可以设置为不等于2的值(在CountDownLatch的构造函数中),也可以在迭代中建立可运行对象的数量即ArrayList,但必须对应(倒计时的数量必须是等于 CountDownLatch 构造函数中的参数)。
注意,锁存器的数量会根据限制条件呈指数增长: 'level.get()
package processes.countdownlatch.hr;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
/** Recursively latching running classes to wait for the peak threads
*
* @author hariprasad
*/
public class HRManagerCompact extends Thread
final int N = 2; // number of daughter's tasks for latch
CountDownLatch countDownLatch;
CountDownLatch originCountDownLatch;
AtomicInteger level = new AtomicInteger(0);
AtomicLong order = new AtomicLong(0); // id latched thread waiting for
HRManagerCompact techLead1 = null;
HRManagerCompact techLead2 = null;
HRManagerCompact techLead3 = null;
// constructor
public HRManagerCompact(CountDownLatch countDownLatch, String name,
AtomicInteger level, AtomicLong order)
super(name);
this.originCountDownLatch=countDownLatch;
this.level = level;
this.order = order;
private void doIt()
countDownLatch = new CountDownLatch(N);
AtomicInteger leveli = new AtomicInteger(level.get() + 1);
AtomicLong orderi = new AtomicLong(Thread.currentThread().getId());
techLead1 = new HRManagerCompact(countDownLatch, "first", leveli, orderi);
techLead2 = new HRManagerCompact(countDownLatch, "second", leveli, orderi);
//techLead3 = new HRManagerCompact(countDownLatch, "third", leveli);
techLead1.start();
techLead2.start();
//techLead3.start();
try
synchronized (Thread.currentThread()) // to prevent print and latch in the same thread
System.out.println("*** HR Manager waiting for recruitment to complete... " + level + ", " + order + ", " + orderi);
countDownLatch.await(); // wait actual thread
System.out.println("*** Distribute Offer Letter, it means finished. " + level + ", " + order + ", " + orderi);
catch (InterruptedException e)
e.printStackTrace();
@Override
public void run()
try
System.out.println(Thread.currentThread().getName() + ": working... " + level + ", " + order + ", " + Thread.currentThread().getId());
Thread.sleep(10*level.intValue());
if (level.get() < 2) doIt();
Thread.yield();
catch (Exception e)
// TODO Auto-generated catch block
e.printStackTrace();
/*catch (InterruptedException e)
// TODO Auto-generated catch block
e.printStackTrace();
*/
// TODO Auto-generated method stub
System.out.println("--- " +Thread.currentThread().getName() + ": recruted " + level + ", " + order + ", " + Thread.currentThread().getId());
originCountDownLatch.countDown(); // count down
public static void main(String args[])
AtomicInteger levelzero = new AtomicInteger(0);
HRManagerCompact hr = new HRManagerCompact(null, "zero", levelzero, new AtomicLong(levelzero.longValue()));
hr.doIt();
可能的注释输出(有一定的概率):
first: working... 1, 1, 10 // thread 1, first daughter's task (10)
second: working... 1, 1, 11 // thread 1, second daughter's task (11)
first: working... 2, 10, 12 // thread 10, first daughter's task (12)
first: working... 2, 11, 14 // thread 11, first daughter's task (14)
second: working... 2, 11, 15 // thread 11, second daughter's task (15)
second: working... 2, 10, 13 // thread 10, second daughter's task (13)
--- first: recruted 2, 10, 12 // finished 12
--- first: recruted 2, 11, 14 // finished 14
--- second: recruted 2, 10, 13 // finished 13 (now can be opened latch 10)
--- second: recruted 2, 11, 15 // finished 15 (now can be opened latch 11)
*** HR Manager waiting for recruitment to complete... 0, 0, 1
*** HR Manager waiting for recruitment to complete... 1, 1, 10
*** Distribute Offer Letter, it means finished. 1, 1, 10 // latch on 10 opened
--- first: recruted 1, 1, 10 // finished 10
*** HR Manager waiting for recruitment to complete... 1, 1, 11
*** Distribute Offer Letter, it means finished. 1, 1, 11 // latch on 11 opened
--- second: recruted 1, 1, 11 // finished 11 (now can be opened latch 1)
*** Distribute Offer Letter, it means finished. 0, 0, 1 // latch on 1 opened
【讨论】:
【参考方案7】:这确实是 Phaser 的理想人选。 Java 7 即将推出这个新类。它是一个灵活的 CountdonwLatch/CyclicBarrier。您可以在JSR 166 Interest Site 获得稳定版本。
它是更灵活的 CountdownLatch/CyclicBarrier 的方式是因为它不仅能够支持未知数量的参与方(线程),而且还可以重复使用(这就是阶段部分的来源)
对于您提交的每项任务,您都会注册,当该任务完成时,您就会到达。这可以递归完成。
Phaser phaser = new Phaser();
ExecutorService e = //
Runnable recursiveRunnable = new Runnable()
public void run()
//do work recursively if you have to
if(shouldBeRecursive)
phaser.register();
e.submit(recursiveRunnable);
phaser.arrive();
public void doWork()
int phase = phaser.getPhase();
phaser.register();
e.submit(recursiveRunnable);
phaser.awaitAdvance(phase);
编辑:感谢@depthofreality 在我之前的示例中指出竞争条件。我正在更新它,以便执行线程仅等待当前阶段的推进,因为它阻塞递归函数完成。
阶段号直到arrive
s == register
s 的数量才会跳闸。由于在每次递归调用调用register
之前,当所有调用完成时都会发生阶段增量。
【讨论】:
很好,Phasers 似乎是我需要的。我想坚持使用当前的标准 Java 库,但一旦它出现,我就会使用它们。感谢您的提示! 我知道它是很久以前发布的。我仍然想知道这里是否有比赛条件。在 recursiveRunnable 向移相器注册之前 doWork() 不能完成吗? @depthofreality 这是一个很好的观点。你是对的,这里肯定会有一场比赛(当我快速把这个例子放在一起时一定忽略了它)。我马上更新。 @JohnVint 感谢您的澄清和修复。我想现在还有另一个问题。有些派对可能会在没有注册的情况下到达。 @depthofreality 我考虑过,但事实并非如此。注册的第一方在doWork
中完成,因为下面的移相器不需要arriveAndAwaitAdvance
recursiveRunnable
需要到达(确实如此)。在执行到 ExecutorService 之前还要注意可运行的register
s【参考方案8】:
如果您想使用 JSR166y 类 - 例如Phaser 或 Fork/Join——其中任何一个都可能适合您,您始终可以从以下位置下载它们的 Java 6 反向移植:http://gee.cs.oswego.edu/dl/concurrency-interest/ 并将其用作基础,而不是编写完全自制的解决方案。然后当 7 出现时,您可以删除对 backport 的依赖并更改一些包名称。
(完全披露:我们已经在 prod 中使用 LinkedTransferQueue 有一段时间了。没有问题)
【讨论】:
【参考方案9】:(mea culpa:我的就寝时间有点过了;)但这是第一次尝试动态闩锁):
package oss.alphazero.sto4958330;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
public class DynamicCountDownLatch
@SuppressWarnings("serial")
private static final class Sync extends AbstractQueuedSynchronizer
private final CountDownLatch toplatch;
public Sync()
setState(0);
this.toplatch = new CountDownLatch(1);
@Override
protected int tryAcquireShared(int acquires)
try
toplatch.await();
catch (InterruptedException e)
throw new RuntimeException("Interrupted", e);
return getState() == 0 ? 1 : -1;
public boolean tryReleaseShared(int releases)
for (;;)
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
public boolean tryExtendState(int acquires)
for (;;)
int s = getState();
int exts = s+1;
if (compareAndSetState(s, exts))
toplatch.countDown();
return exts > 0;
private final Sync sync;
public DynamicCountDownLatch()
this.sync = new Sync();
public void await()
throws InterruptedException
sync.acquireSharedInterruptibly(1);
public boolean await(long timeout, TimeUnit unit)
throws InterruptedException
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
public void countDown()
sync.releaseShared(1);
public void join()
sync.tryExtendState(1);
此锁存器为现有(克隆的)CountDownLatch API 引入了一个新方法 join(),任务使用该方法表示它们进入更大的任务组。
latch 从父任务传递到子任务。根据 Suraj 的模式,每个任务都会首先“join()”锁存器,执行其 task(),然后 countDown()。
为了解决主线程启动任务组然后立即等待()的情况——在任何任务线程甚至有机会加入()之前——topLatch
用于 int 内部 Sync
班级。这是一个锁存器,每次 join() 都会被倒计时;当然,只有第一个倒计时很重要,因为所有后续倒计时都是 nops。
上面的初始实现确实引入了某种语义上的皱纹,因为 tryAcquiredShared(int) 不应该抛出 InterruptedException,但我们确实需要处理 topLatch 上的等待中断。
这是对 OP 自己使用原子计数器的解决方案的改进吗?我会说可能不是 IFF,他坚持使用 Executors,但我相信,在这种情况下,它是使用 AQS 的同样有效的替代方法,并且也可用于通用线程。
击退其他黑客。
【讨论】:
【参考方案10】:如果递归任务树中的任务数量最初是未知的,也许最简单的方法是实现您自己的同步原语,某种“反向信号量”,并在您的任务之间共享它。在提交每个任务之前,您增加一个值,当任务完成时,它会减少该值,然后等待该值变为 0。
将其实现为从任务中显式调用的单独原语可以将此逻辑与线程池实现分离,并允许您将多个独立的递归任务树提交到同一个池中。
类似这样的:
public class InverseSemaphore
private int value = 0;
private Object lock = new Object();
public void beforeSubmit()
synchronized(lock)
value++;
public void taskCompleted()
synchronized(lock)
value--;
if (value == 0) lock.notifyAll();
public void awaitCompletion() throws InterruptedException
synchronized(lock)
while (value > 0) lock.wait();
请注意,taskCompleted()
应在 finally
块内调用,以使其免受可能的异常的影响。
还要注意beforeSubmit()
应该在提交任务之前由提交线程调用,而不是由任务本身调用,以避免在旧任务完成而新任务尚未启动时可能出现的“错误完成”。
编辑:修复了使用模式的重要问题。
【讨论】:
正在回答类似的问题 - 他可以通过使用 AtomicInteger 得到。 @SB:使用 AtomicInteger,您无法等待完成而不忙。 有一个错字,你在做lock--而不是value-- @axtavt 您仍然可以在 InverseSemaphore 中使用 AtomicInteger 而不是 int。那时你就不需要围绕它进行同步了。 @dogbane 对这个答案没有帮助,因为等待时需要同步。【参考方案11】:哇,你们真快:)
感谢您的所有建议。 Futures 不容易与我的模型集成,因为我不知道事先安排了多少可运行对象。因此,如果我让父任务保持活动状态只是为了等待它的递归子任务完成,那么我周围就会有很多垃圾。
我使用 AtomicInteger 建议解决了我的问题。本质上,我将 ThreadPoolExecutor 子类化,并在调用 execute() 时递增计数器,并在调用 afterExecute() 时递减计数器。当计数器变为 0 时,我调用 shutdown()。这似乎对我的问题有用,不确定这是否是一种普遍的好方法。特别是,我假设您只使用 execute() 来添加 Runnables。
作为一个侧节点:我首先尝试在 afterExecute() 中检查队列中的 Runnable 数量以及当它们为 0 时处于活动状态和关闭的工作人员数量;但这不起作用,因为并非所有 Runnable 都出现在队列中,而且 getActiveCount() 也没有达到我的预期。
无论如何,这是我的解决方案:(如果有人发现此问题严重,请告诉我:)
public class MyThreadPoolExecutor extends ThreadPoolExecutor
private final AtomicInteger executing = new AtomicInteger(0);
public MyThreadPoolExecutor(int coorPoolSize, int maxPoolSize, long keepAliveTime,
TimeUnit seconds, BlockingQueue<Runnable> queue)
super(coorPoolSize, maxPoolSize, keepAliveTime, seconds, queue);
@Override
public void execute(Runnable command)
//intercepting beforeExecute is too late!
//execute() is called in the parent thread before it terminates
executing.incrementAndGet();
super.execute(command);
@Override
protected void afterExecute(Runnable r, Throwable t)
super.afterExecute(r, t);
int count = executing.decrementAndGet();
if(count == 0)
this.shutdown();
【讨论】:
虽然这可以满足您的特定要求,但它不是一个通用解决方案(考虑到在您递减然后测试 count == 0 的值后可能存在的竞争条件。)一般解决方案是使用 AbstractQueuedSynchronizer 滚动您自己的“动态”倒计时闩锁。 您遇到的问题是执行器不知道您何时停止添加任务。如果在任何时候您的所有任务都在您完成添加之前完成,这些任务将被拒绝,因为池已关闭。 @PeterLawrey 对,但有一个简单的解决方案:最初增加计数器并在完成添加后减少它。或者使用“adder task”来添加所有任务。以上是关于Java ExecutorService:等待终止所有递归创建的任务的主要内容,如果未能解决你的问题,请参考以下文章
ExecutorService对象的shutdown 和shutdownNow 的区别
聊聊高并发(三十九)解析java.util.concurrent各个组件(十五) 理解ExecutorService接口的设计