Queue中的Java线程,一旦完成任务,需要休眠

Posted

技术标签:

【中文标题】Queue中的Java线程,一旦完成任务,需要休眠【英文标题】:Java threads in Queue, once done with task, need to sleep 【发布时间】:2015-03-17 12:58:40 【问题描述】:

我有以下案例要建模

程序首先根据接收到的参数查询数据库并了解要运行的任务数量。

定义了一些固定最大线程数的线程队列来执行任务。每个任务都会启动一个流程,该流程可以具有不同的配置并且可能需要不同的时间。任务完成后,每个任务都有可配置的睡眠时间。

一旦任务休眠,它就不能阻塞执行队列中的某个位置。执行队列必须继续准备好执行的任务

我发现由于某种原因很难编码(主要是由于最后一个要求) 任何帮助将不胜感激

谢谢

【问题讨论】:

为什么不能使用一些ExecutorService,例如Executors.newCachedThreadPool(...)Executors.newFixedThreadPool(...) 这还不够。假设我使用具有固定数量线程的固定线程池,例如 10 个。现在假设我有 100 个任务要重复执行。一旦任务结束并且我得到一个对象(可调用),我需要确定这个任务是否需要休眠以及持续多长时间。例如,假设我需要在其中一项任务上休眠 3 分钟,然后再将其提交到执行队列(具有 10 个线程的任务)。当此任务休眠时,10 个执行线程必须继续运行队列中的其他任务。重点是 - 任务的睡眠时间必须在主执行队列之外完成。 任务运行业务逻辑,一旦完成,其他任务就会在队列中占据一席之地。一旦任务过度休眠,它就会重新提交到主执行队列。 您可以将Thread.sleep(...) 添加到您的Callable#call(...) 正文中吗? 嗯,sleep 将使池中的一个线程进入睡眠状态,该线程正在为该任务提供服务。以您为例,10 个线程之一将休眠。您仍然可以向池提交新任务。但请注意,如果您提交的速度比任务完成的速度快并睡过头,您最终会耗尽内存。 【参考方案1】:

这是用于说明计划的重新提交者的冗长但直截了当的代码,我没有测试过:)

import java.util.Deque;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.concurrent.*;

interface Repeatable 
    boolean shouldBeRepeated();
    /**
     * @return how long to sleep
     */
    long delayBeforeRepeat();

    /**
     * @return "initial" state of this task instance, so this state can be resubmitted for repeated execution
     */
    BusinessTask reset();


/**
 * Whatever suits your business logic
 */
interface BusinessTask extends Callable<Repeatable> 


class BusinessTaskCompletionData 
    final BusinessTask businessTask;

    /**
     * Timestamp when this task should be resubmitted
     */
    final long nextSubmitTime;

    BusinessTaskCompletionData(BusinessTask businessTask, long nextSubmitTime) 
        this.businessTask = businessTask;
        this.nextSubmitTime = nextSubmitTime;
    


class TaskResusltsConsumer implements Runnable 

    private final CompletionService<Repeatable> completionService;
    private final Deque<BusinessTaskCompletionData> completedTasks;

    TaskResusltsConsumer(ExecutorService executor, Deque<BusinessTaskCompletionData> completedTasks) 
        this.completedTasks = completedTasks;
        completionService = new ExecutorCompletionService<>(executor);
    

    @Override
    public void run() 
        while (true) 
            try 
                Future<Repeatable> completedBusinessTask = completionService.take();
                Repeatable repeatable = completedBusinessTask.get();
                if (repeatable.shouldBeRepeated()) 
                    completedTasks.add(new BusinessTaskCompletionData(repeatable.reset(),
                            System.currentTimeMillis() + repeatable.delayBeforeRepeat()));
                
             catch (ExecutionException | InterruptedException ie) 
                // handle somehow
            
        
    


class TasksSupplier implements Runnable 

    private final Deque<BusinessTaskCompletionData> completedTasks;
    private final ExecutorService executor;

    TasksSupplier(Deque<BusinessTaskCompletionData> completedTasks, ExecutorService executor) 
        this.completedTasks = completedTasks;
        this.executor = executor;
    

    @Override
    public void run() 
        while (true) 
            BusinessTask t = getTaskSomehow();
            executor.submit(getTaskSomehow());
        
    

    private BusinessTask getTaskSomehow() 
        // implement
        return null;
    


/**
 * Actual implementation of logic to obtain 'initial state' of task to repeat and repeat schedule
 */
class BusinessData implements Repeatable 
    // whatever


public class SOTest 

    public static void main(String[] args) 

        final LinkedList<BusinessTaskCompletionData> tasksToRepeat = new LinkedList<>();

        // workers pool
        final ExecutorService workersPool = Executors.newFixedThreadPool(10);

        // controllers pool: 1 thread for supplier, the other for results consumer
        final ExecutorService controllersPool = Executors.newFixedThreadPool(2);
        controllersPool.submit(new TasksSupplier(tasksToRepeat, workersPool));
        controllersPool.submit(new TaskResusltsConsumer(workersPool, tasksToRepeat));

        // resubmitter scheduled pool
        ScheduledExecutorService scheduledExecutor = Executors.newSingleThreadScheduledExecutor();
        scheduledExecutor.scheduleWithFixedDelay(new Runnable() 
                @Override
                public void run() 
                    long now = System.currentTimeMillis();
                    Iterator<BusinessTaskCompletionData> it = tasksToRepeat.iterator();
                    while (it.hasNext()) 
                        BusinessTaskCompletionData data = it.next();
                        if (data.nextSubmitTime >= now) 
                            workersPool.submit(data.businessTask);
                            it.remove();
                        
                    
                
            ,
            // initial delay of 1 sec
            1000,
            // periodic delay of 1 sec
            1000,
            TimeUnit.MILLISECONDS
        );
    


【讨论】:

以上是关于Queue中的Java线程,一旦完成任务,需要休眠的主要内容,如果未能解决你的问题,请参考以下文章

java线程如何一分钟执行一次

Java 线程池

第70题JAVA高级技术-多线程4(龟兔赛跑-休眠线程)

Java中的并发 - 将任务委派给工作线程,我做得对吗?

线程通讯

ThreadPoolExecutor中的核心线程数、最大线程数区别详解