ExecutorService - 以特定时间限制执行每个任务

Posted

技术标签:

【中文标题】ExecutorService - 以特定时间限制执行每个任务【英文标题】:ExecutorService - Execute Each Task with a Specific Time Limit 【发布时间】:2019-04-11 07:00:12 【问题描述】:

我正在创建一个ExecutorService 来执行一些在正常情况下预计需要一分钟才能完成的任务,但在任何情况下都不应允许运行超过两分钟从任务开始 /em>。

我的代码如下:

ExecutorService executorService = Executors.newFixedThreadPool(10);
ArrayList<Future<?>> futuresList = new ArrayList<Future<?>>();



for (String singleTask: taskList)  
                futuresList.add(executorService.submit( new Runnable()      
                       @Override
                       public void run()
                        try 
                            performTask(p1, singleTask, p3);
                         catch (IOException | InterruptedException | ExecutionException e) 
                            // TODO Auto-generated catch block
                            e.printStackTrace();
                
              
         ));       

    



for(Future<?> future : futures) 
    future.get(120, TimeUnit.SECONDS); 

这将阻塞直到指定超时,然后继续。我的问题如下:

1) 如果task1 阻塞了两分钟,task2 也阻塞了两分钟 - 那么 task2 将“阻塞”总共 4 分钟(因为直到 @ 987654327@ 完成阻塞) - 即使两个任务同时提交并开始执行

2) 如果我提交的任务超过 10 个,任务 11+ 可能永远不会阻塞所需的时间,如果之前的任务在 11 日调用 future.get(120, TimeUnit.SECONDS); 时还没有完成任务

我的目标是让每个单独的任务最多执行两分钟,不管列表中有多少任务,也不管它之前或之后有多少任务。

谢谢

【问题讨论】:

你可以使用SingleThreadedExecutor @Ivan,他不能 - 他仍然需要线程池来执行任务。 你打算如何处理运行超过超时时间的任务?您将如何取消任务或将其超时?看起来你需要在performTask 中有一些自定义超时挂钩之王。 @tsolakp,我们的系统中运行着同样的事情——它需要一个自定义的工作线程实现,以及一个单独的看门狗线程,在指定的超时后唤醒并尝试杀死线程,如果它挂了。我觉得要添加到答案中是一件很长的事情。你觉得我的描述就够了? @M.普罗霍罗夫。同意。 OP 很可能需要自定义超时或线程终止实现。 【参考方案1】:

好的,我不确定我的问题是否被完全理解,但我会尝试用我想出的解决方案来回答我自己的问题(这可能有助于向其他人澄清问题)。我认为@Peter Lawrey 回避了这个答案,但答案太短而无法确定。

        int timeLimitOfIndividualTaskInSeconds = 120;
        int fixedThreadPoolCount = 10;

        ExecutorService executorService = Executors.newFixedThreadPool(fixedThreadPoolCount);
        ArrayList<Future<?>> futuresList = new ArrayList<Future<?>>();

        for (String singleTask: taskList) 

            futuresList.add(executorService.submit( new Runnable()      
                @Override
                public void run()
                    try 
                        executeTask(singleTask);
                     catch (IOException | InterruptedException | ExecutionException e) 
                        e.printStackTrace();
                    
                
            ));        

        

        long beforeTimeInMilli = System.currentTimeMillis();
        long beforeTimeInSeconds = TimeUnit.MILLISECONDS.toSeconds(beforeTimeInMilli);
        int counter = 0;

        long timeoutInSeconds = timeLimitOfIndividualTaskInSeconds;

        for(Future<?> future : futuresList) 
            if (counter % fixedThreadPoolCount == 0) 

                // resets time limit to initial limit since next batch of tasks are beginning to execute
                timeoutInSeconds = timeLimitOfIndividualTaskInSeconds;
            

            try 
                future.get(timeoutInSeconds, TimeUnit.SECONDS);
             catch (Exception e)
                e.printStackTrace();
                future.cancel(true); //stops the underlying task
            

            counter++;

            long afterTimeInMilli = System.currentTimeMillis();
            long afterTimeInSeconds = TimeUnit.MILLISECONDS.toSeconds(afterTimeInMilli);

            long taskDurationInSeconds = afterTimeInSeconds - beforeTimeInSeconds;
            timeoutInSeconds = timeoutInSeconds - taskDurationInSeconds;

           

这保证了两件事:

1)所有已提交同时开始执行的任务(即“同一批次”)将运行 max em> 120 秒(但如果任何任务在 120 秒之前完成,它将不会继续阻塞)

2) 同一批次中的先前任务不会导致该批次中的后续任务执行超过120秒的时间限制(因为我们从后续任务的超时值中扣除了先前任务的执行时间)

我发现这个简单而优雅的解决方案 - 当然,我很高兴收到任何能够改进或评论此解决方案的人的意见。

【讨论】:

【参考方案2】:

ExecutorService#invokeAll 可能是这里的关键。

这个问题有点难以理解(甚至可能因为你试图准确地描述它?;-))。因此,我创建了一个示例,试图围绕它展开思考。即使这不是您的意图,也许您可​​以解释这与您的目标有何不同,以便澄清问题或其他人可以写出更好的答案。

该示例将任务创建为Callable 对象,并放入列表中。这样的列表可以传递给ExecutorService#invokeAll。 (在您的情况下,您可以使用Executors#callable 从您的Runnable 任务创建这些实例)。已创建 5 个任务。默认情况下,每个任务需要 2000 毫秒来执行。任务"C" 是奇数,耗时8000 毫秒。最大执行时间应为 5000 毫秒。

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

public class ExecutorServiceLimitTaskTime

    private static Map<String, Long> taskSubmitMs = 
        new ConcurrentHashMap<String, Long>();
    private static Map<String, Long> taskStartMs = 
        new ConcurrentHashMap<String, Long>();
    private static Map<String, Long> taskFinishedMs = 
        new ConcurrentHashMap<String, Long>();

    public static void main(String[] args) throws Exception
    
        ExecutorService executorService = Executors.newFixedThreadPool(3);

        List<String> tasks = Arrays.asList("A", "B", "C", "D", "E");

        List<Callable<String>> callables = new ArrayList<Callable<String>>();
        for (String task : tasks)
        
            taskSubmitMs.put(task, System.currentTimeMillis());

            callables.add(new Callable<String>()
            
                @Override
                public String call()
                
                    taskStartMs.put(task, System.currentTimeMillis());

                    long durationMs = 2000;
                    if (task.equals("C"))
                    
                        durationMs = 8000;
                    

                    performTask(task, durationMs);
                    if (!Thread.currentThread().isInterrupted())
                    
                        taskFinishedMs.put(task, System.currentTimeMillis());
                    
                    return task;
                
            );
        

        List<Future<String>> futures = 
            executorService.invokeAll(callables, 5000, TimeUnit.MILLISECONDS);

        for (Future<String> future : futures)
        
            try
            
                future.get();
            
            catch (CancellationException e) 
            
                System.out.println("One task was cancelled");
            
        

        for (String task : tasks)
        
            Long submitMs = taskSubmitMs.get(task);
            Long startMs = taskStartMs.get(task);
            Long finishedMs = taskFinishedMs.get(task);

            if (finishedMs != null)
            
                long waitMs = startMs - submitMs;
                long runMs = finishedMs - startMs;
                long totalMs = finishedMs - submitMs;
                System.out.printf(
                    "Task %-3s waited %5d ms and ran %5d ms, total %5d ms\n", 
                    task, waitMs, runMs, totalMs);
            
            else
            
                System.out.printf(
                    "Task %-3s was cancelled\n", task);

            
        

    

    private static void performTask(String task, long durationMs)
    
        System.out.println("Executing " + task);
        try
        
            Thread.sleep(durationMs);
        
        catch (InterruptedException e)
        
            Thread.currentThread().interrupt();
        
        System.out.println("Executing " + task + " DONE");
    


最后打印的摘要显示了这个结果:

Task A   waited    16 ms and ran  2002 ms, total  2018 ms
Task B   waited     3 ms and ran  2002 ms, total  2005 ms
Task C   was cancelled
Task D   waited  2005 ms and ran  2000 ms, total  4005 ms
Task E   waited  2005 ms and ran  2000 ms, total  4005 ms

这表明

立即启动的任务运行了 2000 毫秒 必须等待其他任务的任务也运行了 2000 毫秒(但总共运行了 4000 毫秒) 耗时过长的任务在 5000 毫秒后被取消

【讨论】:

【参考方案3】:

您可以使用 System.currentTimeMillis();然后添加最大时间,例如 120_000 毫秒。 当您等待时,您减去当前时间。 IE。您只能等到达到最长时间。

【讨论】:

简短的解释将有助于理解此答案如何解决 OP 中的两个问题。谢谢。

以上是关于ExecutorService - 以特定时间限制执行每个任务的主要内容,如果未能解决你的问题,请参考以下文章

Java ExecutorService 暂停/恢复特定线程

限制函数参数以允许特定值

Symfony2 - 添加控制器安全性以限制用户对特定博客的访问

Android线程管理之ExecutorService线程池

Java ExecutorService 四种线程池

Java ExecutorService - 任务/可调用不取消/中断