ThreadPoolExecutor 动态任务执行,等到所有任务完成

Posted

技术标签:

【中文标题】ThreadPoolExecutor 动态任务执行,等到所有任务完成【英文标题】:ThreadPoolExecutor dynamic task execution, wait until all task completion 【发布时间】:2020-05-13 13:50:03 【问题描述】:

我有一个 ThreadPoolExecutor 这样的

ThreadPoolExecutor executor = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<>());

任务执行如下

executor.execute(task)

现在每个任务可能还可以向同一个执行者执行更多任务,这些新任务可以提交更多任务

问题是我想让主线程等到所有任务都执行完再调用shutdown

是否保证以下方法有效? (即阻塞/等待主 线程直到所有任务完成)

while (executor.getCompletedTaskCount() < executor.getTaskCount()) 
      try 
        Thread.sleep(100);
       catch (InterruptedException e) 
        LOGGER.error("Exception in atomic Count wait thread sleep", e);
        break;
      
    

这最终会打破循环吗?仅通过初步测试,我发现它即使在线程中出现异常也可以工作

附言 我不能使用闩锁,因为我事先不知道任务的数量 也没有接受的答案here

【问题讨论】:

executor 上调用shutdown,然后在while 循环中检查所有任务是否已完成。睡几毫秒,然后再次检查。 @M.Deinum 最初调用shutdown(仅在初始任务之后)将导致执行者不接受更多任务,而每个任务可以在同一个executor上执行更多任务 也意识到了这一点。您的循环可以工作,但我也怀疑您还需要检查队列是否为空。不确定队列是否可以包含尚未添加到taskCount 的任务。 你有一个竞争条件。是否可以保证在完成的任务计数增加之前任务计数会增加?您可能应该提出一些标准来了解所有任务何时已提交。 一般来说,同类型的任务是不是需要从其他任务里面再提交一次呢?还是可以将它们定义为子任务?你能分享一下为什么任务再次提交相同类型的任务很重要吗?一个例子可能有助于理解这一点。另外,这些任务是依赖于彼此的生命周期还是独立的? 【参考方案1】:

您可能应该保留提交的期货。

Deque<Future<?>> futures = new ConcurrentLinkedDeque<>();

然后每次提交任务。

futures.add(executor.submit( runnable, "Doesn't Really Matter, but Can be Useful"));

然后在等待的主线程中。

while(futures.size()>0)
    futures.pop().get();

这将为您提供保证 .get 在任务完成之前不会完成,如果另一个任务添加了更多任务,那么未来将在原始任务完成之前反映更改。

【讨论】:

请注意,使用while (!futures.isEmpty()) 会更惯用。 您可能会认为它超出了此答案的范围,但如果此解决方案要正常工作,您必须处理任务失败。如果任务失败,对#get() 的调用将引发ExecutionException,这将导致while 循环中断。【参考方案2】:

在我看来,获得任务的实际计数是不确定的,因为在提交任务时调用 execute 方法并且可能会发生以下 3 种情况之一。 1.任务开始执行(添加到Workers) 2.任务入队(加入WorkQueue) 3.Task因为WorkerQueue容量、Workers容量和资源耗尽而被拒绝

 /**
     * Executes the given task sometime in the future.  The task
     * may execute in a new thread or in an existing pooled thread.
     *
     * If the task cannot be submitted for execution, either because this
     * executor has been shutdown or because its capacity has been reached,
     * the task is handled by the current @code RejectedExecutionHandler.
     *
     * @param command the task to execute
     * @throws RejectedExecutionException at discretion of
     *         @code RejectedExecutionHandler, if the task
     *         cannot be accepted for execution
     * @throws NullPointerException if @code command is null
     */
    public void execute(Runnable command) 
        if (command == null)
            throw new NullPointerException();
        /*
         * Proceed in 3 steps:
         *
         * 1. If fewer than corePoolSize threads are running, try to
         * start a new thread with the given command as its first
         * task.  The call to addWorker atomically checks runState and
         * workerCount, and so prevents false alarms that would add
         * threads when it shouldn't, by returning false.
         *
         * 2. If a task can be successfully queued, then we still need
         * to double-check whether we should have added a thread
         * (because existing ones died since last checking) or that
         * the pool shut down since entry into this method. So we
         * recheck state and if necessary roll back the enqueuing if
         * stopped, or start a new thread if there are none.
         *
         * 3. If we cannot queue task, then we try to add a new
         * thread.  If it fails, we know we are shut down or saturated
         * and so reject the task.
         */
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) 
            if (addWorker(command, true))
                return;
            c = ctl.get();
        
        if (isRunning(c) && workQueue.offer(command)) 
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        
        else if (!addWorker(command, false))
            reject(command);
    

getTaskCount() 和 getCompletedTaskCount() 方法由 mainLock 保护,因此我们确实知道内部线程是否仍在向执行器提交任务将通过主执行中的时间检查 (while (executor.getCompletedTaskCount() &lt; executor.getTaskCount())) 来完成。这种情况可能会导致暂时的误报,最终导致错误的结果。

/**
     * Returns the approximate total number of tasks that have ever been
     * scheduled for execution. Because the states of tasks and
     * threads may change dynamically during computation, the returned
     * value is only an approximation.
     *
     * @return the number of tasks
     */
    public long getTaskCount() 
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try 
            long n = completedTaskCount;
            for (Worker w : workers) 
                n += w.completedTasks;
                if (w.isLocked())
                    ++n;
            
            return n + workQueue.size();
         finally 
            mainLock.unlock();
        
    
    /**
     * Returns the approximate total number of tasks that have
     * completed execution. Because the states of tasks and threads
     * may change dynamically during computation, the returned value
     * is only an approximation, but one that does not ever decrease
     * across successive calls.
     *
     * @return the number of tasks
     */
    public long getCompletedTaskCount() 
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try 
            long n = completedTaskCount;
            for (Worker w : workers)
                n += w.completedTasks;
            return n;
         finally 
            mainLock.unlock();
        
    

**此处使用的代码片段来自 JDK 1.8 222

【讨论】:

【参考方案3】:

根据 Java (8) 文档,用于获取已完成计数和已提交计数的方法(即 executor.getCompletedTaskCount() &amp; executor.getTaskCount())并不总是提供 100% 准确的计数,因此该方法可能并不总是有效。

public long getTaskCount()

返回近似的任务总数 预定执行。因为任务和线程的状态可能 在计算过程中动态变化,返回的值只是一个 近似

public long getCompletedTaskCount()

返回大约已完成的任务总数 执行。因为任务和线程的状态可能会改变 在计算过程中动态地,返回的值只是一个 近似值,但在连续的过程中不会减少 来电

【讨论】:

以上是关于ThreadPoolExecutor 动态任务执行,等到所有任务完成的主要内容,如果未能解决你的问题,请参考以下文章

ThreadPoolExecutor:拉出挂起的任务

多线程-ThreadPoolExecutor

JUC并发编程 共享模式之工具 ThreadPoolExecutor -- 任务调度线程池 定时任务 / 延时执行(ScheduledThreadPoolExecutor 延时执行 / 定时执行)(代

ThreadPoolExecutor 线程池

ThreadPoolExecutor参数

高并发通过ThreadPoolExecutor类的源码深度解析线程池执行任务的核心流程