Queue中的Java线程,一旦完成任务,需要休眠
Posted
技术标签:
【中文标题】Queue中的Java线程,一旦完成任务,需要休眠【英文标题】:Java threads in Queue, once done with task, need to sleep 【发布时间】:2015-03-17 12:58:40 【问题描述】:我有以下案例要建模
程序首先根据接收到的参数查询数据库并了解要运行的任务数量。
定义了一些固定最大线程数的线程队列来执行任务。每个任务都会启动一个流程,该流程可以具有不同的配置并且可能需要不同的时间。任务完成后,每个任务都有可配置的睡眠时间。
一旦任务休眠,它就不能阻塞执行队列中的某个位置。执行队列必须继续准备好执行的任务
我发现由于某种原因很难编码(主要是由于最后一个要求) 任何帮助将不胜感激
谢谢
【问题讨论】:
为什么不能使用一些ExecutorService
,例如Executors.newCachedThreadPool(...)
或Executors.newFixedThreadPool(...)
?
这还不够。假设我使用具有固定数量线程的固定线程池,例如 10 个。现在假设我有 100 个任务要重复执行。一旦任务结束并且我得到一个对象(可调用),我需要确定这个任务是否需要休眠以及持续多长时间。例如,假设我需要在其中一项任务上休眠 3 分钟,然后再将其提交到执行队列(具有 10 个线程的任务)。当此任务休眠时,10 个执行线程必须继续运行队列中的其他任务。重点是 - 任务的睡眠时间必须在主执行队列之外完成。
任务运行业务逻辑,一旦完成,其他任务就会在队列中占据一席之地。一旦任务过度休眠,它就会重新提交到主执行队列。
您可以将Thread.sleep(...)
添加到您的Callable#call(...)
正文中吗?
嗯,sleep
将使池中的一个线程进入睡眠状态,该线程正在为该任务提供服务。以您为例,10 个线程之一将休眠。您仍然可以向池提交新任务。但请注意,如果您提交的速度比任务完成的速度快并睡过头,您最终会耗尽内存。
【参考方案1】:
这是用于说明计划的重新提交者的冗长但直截了当的代码,我没有测试过:)
import java.util.Deque;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.concurrent.*;
interface Repeatable
boolean shouldBeRepeated();
/**
* @return how long to sleep
*/
long delayBeforeRepeat();
/**
* @return "initial" state of this task instance, so this state can be resubmitted for repeated execution
*/
BusinessTask reset();
/**
* Whatever suits your business logic
*/
interface BusinessTask extends Callable<Repeatable>
class BusinessTaskCompletionData
final BusinessTask businessTask;
/**
* Timestamp when this task should be resubmitted
*/
final long nextSubmitTime;
BusinessTaskCompletionData(BusinessTask businessTask, long nextSubmitTime)
this.businessTask = businessTask;
this.nextSubmitTime = nextSubmitTime;
class TaskResusltsConsumer implements Runnable
private final CompletionService<Repeatable> completionService;
private final Deque<BusinessTaskCompletionData> completedTasks;
TaskResusltsConsumer(ExecutorService executor, Deque<BusinessTaskCompletionData> completedTasks)
this.completedTasks = completedTasks;
completionService = new ExecutorCompletionService<>(executor);
@Override
public void run()
while (true)
try
Future<Repeatable> completedBusinessTask = completionService.take();
Repeatable repeatable = completedBusinessTask.get();
if (repeatable.shouldBeRepeated())
completedTasks.add(new BusinessTaskCompletionData(repeatable.reset(),
System.currentTimeMillis() + repeatable.delayBeforeRepeat()));
catch (ExecutionException | InterruptedException ie)
// handle somehow
class TasksSupplier implements Runnable
private final Deque<BusinessTaskCompletionData> completedTasks;
private final ExecutorService executor;
TasksSupplier(Deque<BusinessTaskCompletionData> completedTasks, ExecutorService executor)
this.completedTasks = completedTasks;
this.executor = executor;
@Override
public void run()
while (true)
BusinessTask t = getTaskSomehow();
executor.submit(getTaskSomehow());
private BusinessTask getTaskSomehow()
// implement
return null;
/**
* Actual implementation of logic to obtain 'initial state' of task to repeat and repeat schedule
*/
class BusinessData implements Repeatable
// whatever
public class SOTest
public static void main(String[] args)
final LinkedList<BusinessTaskCompletionData> tasksToRepeat = new LinkedList<>();
// workers pool
final ExecutorService workersPool = Executors.newFixedThreadPool(10);
// controllers pool: 1 thread for supplier, the other for results consumer
final ExecutorService controllersPool = Executors.newFixedThreadPool(2);
controllersPool.submit(new TasksSupplier(tasksToRepeat, workersPool));
controllersPool.submit(new TaskResusltsConsumer(workersPool, tasksToRepeat));
// resubmitter scheduled pool
ScheduledExecutorService scheduledExecutor = Executors.newSingleThreadScheduledExecutor();
scheduledExecutor.scheduleWithFixedDelay(new Runnable()
@Override
public void run()
long now = System.currentTimeMillis();
Iterator<BusinessTaskCompletionData> it = tasksToRepeat.iterator();
while (it.hasNext())
BusinessTaskCompletionData data = it.next();
if (data.nextSubmitTime >= now)
workersPool.submit(data.businessTask);
it.remove();
,
// initial delay of 1 sec
1000,
// periodic delay of 1 sec
1000,
TimeUnit.MILLISECONDS
);
【讨论】:
以上是关于Queue中的Java线程,一旦完成任务,需要休眠的主要内容,如果未能解决你的问题,请参考以下文章