ExecutorService - 以特定时间限制执行每个任务
Posted
技术标签:
【中文标题】ExecutorService - 以特定时间限制执行每个任务【英文标题】:ExecutorService - Execute Each Task with a Specific Time Limit 【发布时间】:2019-04-11 07:00:12 【问题描述】:我正在创建一个ExecutorService
来执行一些在正常情况下预计需要一分钟才能完成的任务,但在任何情况下都不应允许运行超过两分钟从任务开始 /em>。
我的代码如下:
ExecutorService executorService = Executors.newFixedThreadPool(10);
ArrayList<Future<?>> futuresList = new ArrayList<Future<?>>();
for (String singleTask: taskList)
futuresList.add(executorService.submit( new Runnable()
@Override
public void run()
try
performTask(p1, singleTask, p3);
catch (IOException | InterruptedException | ExecutionException e)
// TODO Auto-generated catch block
e.printStackTrace();
));
for(Future<?> future : futures)
future.get(120, TimeUnit.SECONDS);
这将阻塞直到指定超时,然后继续。我的问题如下:
1) 如果task1
阻塞了两分钟,task2
也阻塞了两分钟 - 那么 task2
将“阻塞”总共 4 分钟(因为直到 @ 987654327@ 完成阻塞) - 即使两个任务同时提交并开始执行
2) 如果我提交的任务超过 10 个,任务 11+ 可能永远不会阻塞所需的时间,如果之前的任务在 11 日调用 future.get(120, TimeUnit.SECONDS);
时还没有完成任务
我的目标是让每个单独的任务最多执行两分钟,不管列表中有多少任务,也不管它之前或之后有多少任务。
谢谢
【问题讨论】:
你可以使用SingleThreadedExecutor
@Ivan,他不能 - 他仍然需要线程池来执行任务。
你打算如何处理运行超过超时时间的任务?您将如何取消任务或将其超时?看起来你需要在performTask
中有一些自定义超时挂钩之王。
@tsolakp,我们的系统中运行着同样的事情——它需要一个自定义的工作线程实现,以及一个单独的看门狗线程,在指定的超时后唤醒并尝试杀死线程,如果它挂了。我觉得要添加到答案中是一件很长的事情。你觉得我的描述就够了?
@M.普罗霍罗夫。同意。 OP 很可能需要自定义超时或线程终止实现。
【参考方案1】:
好的,我不确定我的问题是否被完全理解,但我会尝试用我想出的解决方案来回答我自己的问题(这可能有助于向其他人澄清问题)。我认为@Peter Lawrey 回避了这个答案,但答案太短而无法确定。
int timeLimitOfIndividualTaskInSeconds = 120;
int fixedThreadPoolCount = 10;
ExecutorService executorService = Executors.newFixedThreadPool(fixedThreadPoolCount);
ArrayList<Future<?>> futuresList = new ArrayList<Future<?>>();
for (String singleTask: taskList)
futuresList.add(executorService.submit( new Runnable()
@Override
public void run()
try
executeTask(singleTask);
catch (IOException | InterruptedException | ExecutionException e)
e.printStackTrace();
));
long beforeTimeInMilli = System.currentTimeMillis();
long beforeTimeInSeconds = TimeUnit.MILLISECONDS.toSeconds(beforeTimeInMilli);
int counter = 0;
long timeoutInSeconds = timeLimitOfIndividualTaskInSeconds;
for(Future<?> future : futuresList)
if (counter % fixedThreadPoolCount == 0)
// resets time limit to initial limit since next batch of tasks are beginning to execute
timeoutInSeconds = timeLimitOfIndividualTaskInSeconds;
try
future.get(timeoutInSeconds, TimeUnit.SECONDS);
catch (Exception e)
e.printStackTrace();
future.cancel(true); //stops the underlying task
counter++;
long afterTimeInMilli = System.currentTimeMillis();
long afterTimeInSeconds = TimeUnit.MILLISECONDS.toSeconds(afterTimeInMilli);
long taskDurationInSeconds = afterTimeInSeconds - beforeTimeInSeconds;
timeoutInSeconds = timeoutInSeconds - taskDurationInSeconds;
这保证了两件事:
1)所有已提交并同时开始执行的任务(即“同一批次”)将运行 max em> 120 秒(但如果任何任务在 120 秒之前完成,它将不会继续阻塞)
2) 同一批次中的先前任务不会导致该批次中的后续任务执行超过120秒的时间限制(因为我们从后续任务的超时值中扣除了先前任务的执行时间)
我发现这个简单而优雅的解决方案 - 当然,我很高兴收到任何能够改进或评论此解决方案的人的意见。
【讨论】:
【参考方案2】:ExecutorService#invokeAll
可能是这里的关键。
这个问题有点难以理解(甚至可能因为你试图准确地描述它?;-))。因此,我创建了一个示例,试图围绕它展开思考。即使这不是您的意图,也许您可以解释这与您的目标有何不同,以便澄清问题或其他人可以写出更好的答案。
该示例将任务创建为Callable
对象,并放入列表中。这样的列表可以传递给ExecutorService#invokeAll
。 (在您的情况下,您可以使用Executors#callable
从您的Runnable
任务创建这些实例)。已创建 5 个任务。默认情况下,每个任务需要 2000 毫秒来执行。任务"C"
是奇数,耗时8000 毫秒。最大执行时间应为 5000 毫秒。
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
public class ExecutorServiceLimitTaskTime
private static Map<String, Long> taskSubmitMs =
new ConcurrentHashMap<String, Long>();
private static Map<String, Long> taskStartMs =
new ConcurrentHashMap<String, Long>();
private static Map<String, Long> taskFinishedMs =
new ConcurrentHashMap<String, Long>();
public static void main(String[] args) throws Exception
ExecutorService executorService = Executors.newFixedThreadPool(3);
List<String> tasks = Arrays.asList("A", "B", "C", "D", "E");
List<Callable<String>> callables = new ArrayList<Callable<String>>();
for (String task : tasks)
taskSubmitMs.put(task, System.currentTimeMillis());
callables.add(new Callable<String>()
@Override
public String call()
taskStartMs.put(task, System.currentTimeMillis());
long durationMs = 2000;
if (task.equals("C"))
durationMs = 8000;
performTask(task, durationMs);
if (!Thread.currentThread().isInterrupted())
taskFinishedMs.put(task, System.currentTimeMillis());
return task;
);
List<Future<String>> futures =
executorService.invokeAll(callables, 5000, TimeUnit.MILLISECONDS);
for (Future<String> future : futures)
try
future.get();
catch (CancellationException e)
System.out.println("One task was cancelled");
for (String task : tasks)
Long submitMs = taskSubmitMs.get(task);
Long startMs = taskStartMs.get(task);
Long finishedMs = taskFinishedMs.get(task);
if (finishedMs != null)
long waitMs = startMs - submitMs;
long runMs = finishedMs - startMs;
long totalMs = finishedMs - submitMs;
System.out.printf(
"Task %-3s waited %5d ms and ran %5d ms, total %5d ms\n",
task, waitMs, runMs, totalMs);
else
System.out.printf(
"Task %-3s was cancelled\n", task);
private static void performTask(String task, long durationMs)
System.out.println("Executing " + task);
try
Thread.sleep(durationMs);
catch (InterruptedException e)
Thread.currentThread().interrupt();
System.out.println("Executing " + task + " DONE");
最后打印的摘要显示了这个结果:
Task A waited 16 ms and ran 2002 ms, total 2018 ms
Task B waited 3 ms and ran 2002 ms, total 2005 ms
Task C was cancelled
Task D waited 2005 ms and ran 2000 ms, total 4005 ms
Task E waited 2005 ms and ran 2000 ms, total 4005 ms
这表明
立即启动的任务运行了 2000 毫秒 必须等待其他任务的任务也运行了 2000 毫秒(但总共运行了 4000 毫秒) 耗时过长的任务在 5000 毫秒后被取消【讨论】:
【参考方案3】:您可以使用 System.currentTimeMillis();然后添加最大时间,例如 120_000 毫秒。 当您等待时,您减去当前时间。 IE。您只能等到达到最长时间。
【讨论】:
简短的解释将有助于理解此答案如何解决 OP 中的两个问题。谢谢。以上是关于ExecutorService - 以特定时间限制执行每个任务的主要内容,如果未能解决你的问题,请参考以下文章
Java ExecutorService 暂停/恢复特定线程
Symfony2 - 添加控制器安全性以限制用户对特定博客的访问