一旦他分配的任务之一因任何原因失败,Java 就会停止执行程序服务

Posted

技术标签:

【中文标题】一旦他分配的任务之一因任何原因失败,Java 就会停止执行程序服务【英文标题】:Java stop executor service once one of his assigned tasks fails for any reason 【发布时间】:2020-03-06 13:11:07 【问题描述】:

我需要某种服务,以 1 秒的间隔同时运行几个任务,持续 1 分钟。

如果其中一项任务失败,我想停止服务以及与它一起运行的每个任务,并显示出现问题的某种指示符,否则,如果一分钟后一切顺利,服务将停止并显示所有指示符进展顺利。

例如,我有两个函数:

Runnable task1 = ()->
      int num = Math.rand(1,100);
      if (num < 5)
          throw new Exception("something went wrong with this task,terminate");
      


Runnable task2 = ()->
      int num = Math.rand(1,100)
      return num < 50;




ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(2);
task1schedule = scheduledExecutorService.scheduleAtFixedRate(task1, 1, 60, TimeUnit.SECONDS);
task2schedule = scheduledExecutorService.scheduleAtFixedRate(task2, 1, 60, TimeUnit.SECONDS);

if (!task1schedule || !task2schedule) scheduledExecutorService.shutdown();

关于我应该如何解决这个问题并使事情尽可能通用的任何想法?

【问题讨论】:

除了实际问题外,Math.rand 不是内置 API。 Runnable 的实现必须具有 void run 定义。在提供的上下文中,task1/2schedule 的类型将是 ScheduledFuture&lt;?&gt;。转到实际问题,如何使用awaitTermination?你可以用scheduledExecutorService.awaitTermination(1,TimeUnit.MINUTES); 来做。或者,如何检查是否有任何任务在正常完成之前被取消:if (task1schedule.isCancelled() || task2schedule.isCancelled()) scheduledExecutorService.shutdown(); 安排任务每分钟重复是没有意义的,但是说,如果“一分钟后一切顺利”,您想停止任务。由于您在任何一种情况下都将停止执行程序,因此安排一个在一分钟后关闭执行程序的任务是微不足道的。期货确实已经表明出了什么问题。你没有说,你想要什么其他类型的指标。 【参考方案1】:

这个想法是任务正在推送到一个公共对象TaskCompleteEvent。如果他们推送错误,调度程序将停止,所有任务都将停止。

您可以在地图“错误”和“成功”中查看每个任务迭代的结果。

public class SchedulerTest 

    @Test
    public void scheduler() throws InterruptedException 
        ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(2);
        TaskCompleteEvent taskCompleteEvent = new TaskCompleteEvent(scheduledExecutorService);
        Runnable task1 = () -> 
            int num = new Random().nextInt(100);
            if (num < 5) 
                taskCompleteEvent.message("task1-"+UUID.randomUUID().toString(), "Num "+num+" was obatined. Breaking all the executions.", true);
            
        ;
        Runnable task2 = () -> 
            int num = new Random().nextInt(100);
            taskCompleteEvent.message("task2-"+UUID.randomUUID().toString(), num < 50, false);
        ;
        scheduledExecutorService.scheduleAtFixedRate(task1, 0, 1, TimeUnit.SECONDS);
        scheduledExecutorService.scheduleAtFixedRate(task2, 0, 1, TimeUnit.SECONDS);
        scheduledExecutorService.awaitTermination(60, TimeUnit.SECONDS);
        System.out.println("Success: "+taskCompleteEvent.getSuccess());
        System.out.println("Errors: "+taskCompleteEvent.getErrors());
        System.out.println("Went well?: "+taskCompleteEvent.getErrors().isEmpty());
    

    public static class TaskCompleteEvent 

        private final ScheduledExecutorService scheduledExecutorService;
        private final Map<String, Object> errors = new LinkedHashMap<>();
        private final Map<String, Object> success = new LinkedHashMap<>();

        public TaskCompleteEvent(ScheduledExecutorService scheduledExecutorService) 
            this.scheduledExecutorService = scheduledExecutorService;
        

        public synchronized void message(String id, Object response, boolean error) 
            if (error) 
                errors.put(id, response);
                scheduledExecutorService.shutdown();
             else 
                success.put(id, response);
            
        

        public synchronized Map<String, Object> getErrors() 
            return errors;
        

        public synchronized Map<String, Object> getSuccess() 
            return success;
        

    


【讨论】:

【参考方案2】:

你只需要添加一个额外的任务,它的工作就是监控所有其他正在运行的任务——当任何被监控的任务失败时,他们需要设置一个刺客可以检查的信号量(标志)。

    ScheduledExecutorService executor = (ScheduledExecutorService) Executors.newScheduledThreadPool(2);

    // INSTANTIATE THE REMOTE-FILE-MONITOR:
    RemoteFileMonitor monitor = new RemoteFileMonitor(remotesource, localtarget);

    // THIS TimerTask PERIODICALLY TRIGGERS THE RemoteFileMonitor: 
    TimerTask remote = new TimerTask() 

        // RUN FORREST... RUN !
        public void run() 

            try  

                kae.trace("TimerTask::run() --> Calling RemoteFileMonitor.check()");
                monitor.check();

             catch (Exception ex) 

                // NULL TRAP: ALLOWS US TO CONTINUE AND RETRY:

            

        

    ;

    // THIS TimerTask PERIODICALLY TRIES TO KILL THE REMOTE-FILE-MONITOR:
    TimerTask assassin = new TimerTask() 

        // WHERE DO BAD FOLKS GO WHEN THEY DIE ? 
        private final LocalDateTime death = LocalDateTime.now().plus(ConfigurationOptions.getPollingCycleTime(), ChronoUnit.MINUTES);

        // RUN FORREST... RUN !
        public void run() 

            // IS THERE LIFE AFTER DEATH ???
            if (LocalDateTime.now().isAfter(death)) 

                // THEY GO TO A LAKE OF FIRE AND FRY:
                kae.error(ReturnCode.MONITOR_POLLING_CYCLE_EXCEEDED);                   

            

        

    ;

    // SCHEDULE THE PERIODIC EXECUTION OF THE RemoteFileMonitor: (remote --> run() monitor --> check())
    executor.scheduleAtFixedRate(remote, delay, interval, TimeUnit.MINUTES);

    // SCHEDULE PERIODIC ASSASSINATION ATTEMPTS AGAINST THE RemoteFileMonitor: (assassin --> run() --> after death --> die())
    executor.scheduleAtFixedRate(assassin, delay, 60L, TimeUnit.SECONDS);

    // LOOP UNTIL THE MONITOR COMPLETES:
    do 

        try 

            // I THINK I NEED A NAP:
            Thread.sleep(interval * 10);                

         catch (InterruptedException e) 

            // FAIL && THEN cleanexit();
            kae.error(ReturnCode.MONITORING_ERROR, "Monitoring of the XXXXXX-Ingestion site was interrupted");

        

        // NOTE: THE MONITOR IS SET TO 'FINISHED' WHEN THE DONE-File IS DELIVERED AND RETRIEVED:
     while (monitor.isNotFinished());

    // SHUTDOWN THE MONITOR TASK:
    executor.shutdown();

【讨论】:

TimerTaskScheduledExecutorService完全无关;它恰好实现了Runnable。此外,安排定期任务是没有意义的,只是为了检查是否已达到特定时间 (ConfigurationOptions.getPollingCycleTime())。你有一个ScheduledExecutorService,所以你可以告诉它在所需的时间安排任务。 我使用的示例中的实现是如果任务没有完成,则在一段时间后杀死正在执行的任务。用例是:如果远程服务器在 2 小时内没有删除文件 - 终止任务。这就是 OP 要求的。 您阅读并理解我的评论了吗?代码做什么并不重要,它无缘无故地使用了一个不鼓励的类,只需将TimerTask 替换为Runnable 即可解决问题,而无需更改代码的功能。此外,只需使用executor.schedule(assassin, ConfigurationOptions.getPollingCycleTime(), ChronoUnit.MINUTES);,它将在所需时间运行一次,因此,if(LocalDateTime.now().isAfter(death)) 检查已过时。同样,除了使代码更简单、更高效之外,它并没有改变代码的作用。

以上是关于一旦他分配的任务之一因任何原因失败,Java 就会停止执行程序服务的主要内容,如果未能解决你的问题,请参考以下文章

YARN 重启失败原因之一

如果 Liquibase 因更改集迁移失败而失败,我的 csv 数据有啥问题...原因:java.lang.NullPointerException

在Hadoop集群中,任务分配到每个节点上的传统方法是啥,怎么实现随机分配,均衡分配........

Images.xcassets:1:1: Distill 因未知原因而失败

malloc函数分配内存失败的常见原因

我应该如何重新打开失败的 WCF 频道?