Java Executors:如何停止提交的任务?

Posted

技术标签:

【中文标题】Java Executors:如何停止提交的任务?【英文标题】:Java Executors: how can I stop submitted tasks? 【发布时间】:2010-11-27 22:19:36 【问题描述】:

我已经使用执行器提交了一个任务,我需要它在一段时间后停止(例如 5 分钟)。我试过这样做:

   for (Future<?> fut : e.invokeAll(tasks, 300, TimeUnit.SECONDS)) 
         try 
             fut.get(); 
          catch (CancellationException ex) 
             fut.cancel(true);   
             tasks.clear();
          catch(ExecutionException ex)
             ex.printStackTrace(); //FIXME: gestita con printstack       
         
   

但我总是得到一个错误:我有一个共享向量需要由任务修改然后由线程读取,即使我停止所有任务,如果发生超时,我会得到:

Exception in thread "Thread-1" java.util.ConcurrentModificationException

有什么问题吗?如何停止提交的 5 分钟后仍在工作的任务?

【问题讨论】:

@Raffaele Di Fazio:我已格式化代码 - 并添加了右大括号,请检查准确性。 谢谢,很抱歉格式错误。 【参考方案1】:

仅仅因为您在Future 上调用cancel() 并不意味着任务会自动停止。您必须在任务中做一些工作以确保它会停止:

使用cancel(true) 以便向任务发送中断。 处理InterruptedException。如果您的任务中的某个函数抛出 InterruptedException,请确保在捕获异常后尽快正常退出。 如果任务进行连续计算,请定期检查Thread.currentThread().isInterrupted()

例如:

class LongTask implements Callable<Double> 
    public Double call() 
        
         // Sleep for a while; handle InterruptedException appropriately
         try 
             Thread.sleep(10000);
          catch (InterruptedException ex) 
             System.out.println("Exiting gracefully!");
             return null;
         


         // Compute for a while; check Thread.isInterrupted() periodically
         double sum = 0.0;
         for (long i = 0; i < 10000000; i++) 
             sum += 10.0
             if (Thread.currentThread().isInterrupted()) 
                 System.out.println("Exiting gracefully");
                 return null;
             
         

         return sum;
     

另外,正如其他帖子所提到的:即使使用线程安全的Vector 类也可以抛出ConcurrentModificationException,因为从Vector 获得的迭代器不是线程安全的,因此需要同步。增强的 for 循环使用迭代器,因此请注意:

final Vector<Double> vector = new Vector<Double>();
vector.add(1.0);
vector.add(2.0);

// Not thread safe!  If another thread modifies "vector" during the loop, then
// a ConcurrentModificationException will be thrown.
for (Double num : vector) 
    System.out.println(num);


// You can try this as a quick fix, but it might not be what you want:
synchronized (vector)     // "vector" must be final
    for (Double num : vector) 
        System.out.println(num);
    

【讨论】:

非常好——不知何故我从来没有遇到过 Thread.interrupted()——我明天可以用它! 首先,调用future.cancel(true) 完全没有任何作用。 invokeAll 的合约声明它将在返回之前取消任务,并且实现使用 final 块来确保它。其次,永远不要调用 Thread.interrupted(),这样做会清除线程的中断状态。大多数实现都希望使用 Thread.isInterrupted()。清除标志应该被仔细检查。第三,他不必处理 InterruptedException,除非他使用锁获取等阻塞方法,然后编译器确保他是。 FutureTask 将捕获异常。 @Tim Bender:你是对的:future.cancel(true) 什么都不做,我自己测试过。但我不明白你认为我应该怎么做.. @Tim Bender:您对 Thread.interrupted() 与 Thread.isInterrupted() 的看法是完全正确的。实际的语法是 Thread.currentThread().isInterrupted()。 @Tim Bender:您不必捕获 InterruptedException,但如果您想在方法退出之前处理清理,您可能需要捕获并重新抛出。【参考方案2】:

ConcurrentModificationException 来自您对 tasks.clear() 的调用,而您的 Executors 正在迭代您的 tasks Vector。您可以尝试在您的 ExecutorService 上调用 shutdownNow()

【讨论】:

【参考方案3】:

ConcurrentModificationException 最常见的情况是 vector 在迭代的同时被修改。通常这将在一个线程中完成。您需要在整个迭代过程中锁定Vector(并注意不要死锁)。

【讨论】:

是的,我知道为什么抛出异常,但它不应该。迭代是在我发布的部分代码之后进行的,因此,如果代码运行良好,我不应该得到异常......【参考方案4】:

fut.get() 是一个阻塞调用,即使在超时之后,你也会阻塞直到任务完成。如果您想在 5 分钟标记处尽可能停止,您确实需要检查中断标志,我只是建议您使用保留中断状态的 Thread.isInterrupted() 方法这样做。如果您只想立即停止并且不需要清除任何状态,则抛出一个异常,该异常将被 Future 捕获并指示为 ExecutionException。

fut.cancel(true) 没有做任何事情,因为 invokeAll() 方法已经为您完成了。

除非您在其他地方使用“任务”集合,否则您可能不需要对其调用 clear()。这不会成为问题的根源,因为在您调用 clear() 时,invokeAll() 方法已使用 List 完成。但是,如果您需要开始形成要执行的新任务列表,我建议您形成一个新的任务列表,而不是使用旧的新任务列表。

很遗憾,对于您的问题,我没有答案。我在这里看不到足够的信息来诊断它。您提供的代码 sn-p 中没有任何内容表明对库类/方法的使用不当(只是不必要的)。如果您包含完整的堆栈跟踪,而不是单行错误,也许可以。

【讨论】:

我在其他地方使用了该集合,并且它处于while循环中,因此需要将其清除才能在循环重复时为空。当然我可以在帖子中显示的代码之后执行 clear() ,这应该没问题。我的问题的重要部分不是例外:我需要知道的是如何在 5 分钟后停止未来,当然,我会按照您的建议尝试抛出异常。我什至可以更改提交任务的方式。我是在这里学到的:***.com/questions/1322147/…【参考方案5】:

fut.cancel(true);放在finally块中

【讨论】:

以上是关于Java Executors:如何停止提交的任务?的主要内容,如果未能解决你的问题,请参考以下文章

多线程-Executors和Executor,线程池

Java:自定义ScheduledExecutorService来实现定时提交任务

Executors框架二 ScheduledThreadPoolExecutor线程池

大数据:Spark Standalone 集群调度如何创建分配Executors的资源

Java线程池详解

深入分析java线程池的实现原理