ThreadPoolTaskScheduler实现动态管理定时任务

Posted 流云一号

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了ThreadPoolTaskScheduler实现动态管理定时任务相关的知识,希望对你有一定的参考价值。

        最近,有个项目有需要用到定时任务,所以做了一个动态管理定时任务的模块。本文将从项目背景、需求、选型、思路、具体实现等方面展开介绍。

        背景:有个支付类的项目,中间会产生一些中间态的订单,需要有个定时任务轮询确认订单状态。该类项目体量较小,单节点部署,客户比较多,需要简单快速的部署、维护。

        需求:定时任务能够通过表达式灵活指定执行计划,并支持动态启动、关闭、修改。定时任务模块最好和业务包在一个jar包内,部署简单。

        选型:说到定时任务,当下最火的当属xxl-job,本案为什么不采用xxl-job呢?不是因为它不够强大,是因为需要单独部署组件,并且需要建一系列相关的表,运维小哥哥不会弄或者嫌麻烦。基于上述原因,考虑采用Springboot自己的定时任务,常见的有两种实现方式,一种基于注解,如:

@Configuration      //1.主要用于标记配置类,兼备Component的效果。
@EnableScheduling   // 2.开启定时任务
public class SaticScheduleTask 
    
    @Scheduled(cron = "0/5 * * * * ?")
    private void configureTasks() 
        System.out.println("开始执行静态定时任务时间");
    

        另一种基于接口,主要代码如:

@Configuration      //1.主要用于标记配置类,兼备Component的效果。
@EnableScheduling   // 2.开启定时任务
public class DynamicScheduleTask implements SchedulingConfigurer 

    /**
     * 执行定时任务.
     */
    @Override
    public void configureTasks(ScheduledTaskRegistrar taskRegistrar) 

        taskRegistrar.addTriggerTask(
                //1.添加任务内容(Runnable)
                () -> System.out.println("执行动态定时任务: " + LocalDateTime.now().toLocalTime()),
                //2.设置执行周期(Trigger)
                triggerContext -> 
                    String cron = "0/5 * * * * ?";//这个表达式可以写在配置文件里,也可以从数据库读取
                    //合法性校验
                    if (StringUtils.isEmpty(cron)) 
                        //这里写具体的业务代码
                    
                    //返回执行周期(Date)
                    return new CronTrigger(cron).nextExecutionTime(triggerContext);
                
        );
    

        但是上述两种都不太灵活,不能动态的启动、关闭和修改。最后,选择通过ThreadPoolTaskScheduler来实现动态管理定时任务。

        思路:

        1、ThreadPoolTaskScheduler可以实现任务调度,支持基于cron表达式的任务,其提交任务接口如下:

public ScheduledFuture<?> schedule(Runnable task, Trigger trigger) 
        //...
    

        那么,定时任务业务实现类实现Runnable接口,用cron构造CronTrigger,就可以完成任务提交。提交任务的时候将回调对象ScheduledFuture记录下来,后面可以用它的cancel方法实现任务的关闭。

        2、在数据库建一张任务表,主要有任务编号、任务名称、调度计划表达式、任务状态等字段。在项目启动后,自动查询任务表,加载任务状态为启动状态的任务。

        3、写一个前端管理页面,主要用于查询展示定时任务,新增、启动、关闭和修改定时任务。通过管理页面对任务进行启动、关闭等操作时,一方面修改数据库中的状态,另一方面也要同步做启动或者关闭任务操作。

        具体实现:

        核心代码:

@Component
public class DynamicTimedTask implements ApplicationRunner 

    @Autowired
    TimedTaskService timedTaskService;

    private static final Logger logger = LoggerFactory.getLogger(DynamicTimedTask.class);

    /**
     * @Author yrd
     * @Description 项目启动后自动加载数据库里状态为启动的定时任务
     * @Date 10:40 2021/12/20
     * @Param [args]
     * @return void
     **/
    @Override
    public void run(ApplicationArguments args) throws Exception 
        //查询数据库中定时任务信息
        List<Map> timedTasks = timedTaskService.queryAllTimedTask();
        for (Map timedTask : timedTasks) 
            String taskId = timedTask.getString("taskid");//任务编号
            String cron = timedTask.getString("cron");//表达式
            String status = timedTask.getString("status");//任务状态
            if ("1".equals(status)) 
                startTask(taskId, cron);
            
        
    

    //接受任务的返回结果
    private ConcurrentHashMap<String, ScheduledFuture> futureMap = new ConcurrentHashMap<>();

    @Autowired
    private ThreadPoolTaskScheduler threadPoolTaskScheduler;

    //实例化一个线程池任务调度类,可以使用自定义的ThreadPoolTaskScheduler
    @Bean
    public ThreadPoolTaskScheduler threadPoolTaskScheduler() 
        ThreadPoolTaskScheduler schedulerPool = new ThreadPoolTaskScheduler();
        schedulerPool.setPoolSize(3);
        schedulerPool.setWaitForTasksToCompleteOnShutdown(true);
        schedulerPool.setAwaitTerminationSeconds(60);
        return schedulerPool;
    

    /**
     * 启动定时任务
     * @return
     */
    public boolean startTask(String taskId, String cron) 
        boolean flag = false;
        ScheduledFuture future = futureMap.get(taskId);
        if (future != null) 
            if (future.isCancelled()) 
                logger.info("任务【】已存在但是关闭状态!!!", taskId);
             else 
                logger.info("任务【】已存在且已启动!!!", taskId);
                return true;
            
        
        if (SpringBeanUtils.containsBean(taskId)) 
            future = threadPoolTaskScheduler.schedule((Runnable) SpringBeanUtils.getBean(taskId), new CronTrigger(cron));
            if (future != null)
                flag = true;
                futureMap.put(taskId, future);
                logger.info("任务【】启动成功!!!", taskId);
            else 
                logger.info("任务【】启动失败!!!", taskId);
            
         else 
            logger.info("任务【】未实现!!!", taskId);
        
        return flag;
    

    /**
     * 停止定时任务
     * @return
     */
    public boolean stopTask(String taskId) 
        boolean flag = false;
        ScheduledFuture future = futureMap.get(taskId);
        if (future == null) 
            logger.info("任务【】不在任务队列中!!!", taskId);
            flag = true;
         else 
            if (future.isCancelled()) 
                logger.info("任务【】已经是关闭状态!!!", taskId);
                flag = true;
             else 
                boolean cancel = future.cancel(true);
                if (cancel)
                    flag = true;
                    logger.info("任务【】关闭成功!!!", taskId);
                else 
                    logger.info("任务【】关闭失败!!!", taskId);
                
            
        
        return flag;
    

        说明:上述代码用到了通过bean的ID从应用上下文中获取bean实例,具体实现方法网上有很多介绍,本文不再详述。因此,具体任务的业务实现类bean的ID需要与任务ID一致,上述代码中startTask方法也对次作了判断。如图:

 

         任务管理页面对应的controller类:

@Controller
@RequestMapping("/timedTask")
public class TimedTaskManageController 

    @Autowired
    TimedTaskService timedTaskService;

    /**
     * @Author yrd
     * @Description 进入定时任务管理页面
     * @Date 10:37 2021/12/20
     * @Param []
     * @return java.lang.String
     **/
    @RequestMapping("/timedTaskManagePage")
    public String timedTaskManagePage() throws AppException 
        return "/timedTask/timedTaskManage";
    

    /**
     * @Author yrd
     * @Description 查询定时任务
     * @Date 10:38 2021/12/20
     * @Param [para]
     * @return java.util.List
     **/
    @RequestMapping("/queryTimedTask")
    @ResponseBody
    public List queryTimedTask(@RequestBody DataObject para) throws Exception 
        return timedTaskService.queryTimedTask(para);
    

    /**
     * @Author yrd
     * @Description 进入新增定时任务页面
     * @Date 10:38 2021/12/20
     * @Param []
     * @return java.lang.String
     **/
    @RequestMapping("/addTimedTaskPage")
    public String addTimedTaskPage() throws Exception 
        return "/timedTask/addTimedTask";
    

    /**
     * @Author yrd
     * @Description 根据任务编号检查定时任务
     * @Date 10:38 2021/12/20
     * @Param [taskId]
     * @return java.lang.String
     **/
    @RequestMapping("/checkTimedTask")
    @ResponseBody
    public String checkTimedTask(@RequestParam("taskId") String taskId) throws Exception 
        Map map = timedTaskService.queryTimedTaskById(taskId);
        JSONObject object = new JSONObject();
        if (map.isEmpty()) 
            if (SpringBeanUtils.containsBean(taskId)) 
                object.put("success", "true");
                object.put("msg", "");
             else 
                object.put("success", "false");
                object.put("msg", "该任务编号对应的任务未实现!");
            
         else 
            object.put("success", "false");
            object.put("msg", "任务编号重复!");
        
        return object.toString();
    

    /**
     * @Author yrd
     * @Description 新增定时任务
     * @Date 10:39 2021/12/20
     * @Param [request]
     * @return java.lang.String
     **/
    @RequestMapping("/addTimedTask")
    @ResponseBody
    public String addTimedTask(HttpServletRequest request) throws Exception 
        Map param = new HashMap();
        param.put("taskId", request.getParameter("taskId").toString());
        param.put("taskName", request.getParameter("taskName").toString());
        param.put("cron", request.getParameter("cron").toString());
        return timedTaskService.addTimedTask(param);
    

    /**
     * @Author yrd
     * @Description 进入修改定时任务页面
     * @Date 10:39 2021/12/20
     * @Param [taskId, model]
     * @return java.lang.String
     **/
    @RequestMapping("/editTimedTaskPage")
    public String editTimedTaskPage(@RequestParam("taskId") String taskId , Model model) throws Exception 
        Map map = timedTaskService.queryTimedTaskById(taskId);
        model.addAttribute("param", map);
        return "/timedTask/editTimedTask";
    

    /**
     * @Author yrd
     * @Description 启动定时任务
     * @Date 10:39 2021/12/20
     * @Param [taskId]
     * @return java.lang.String
     **/
    @RequestMapping("/startTask")
    @ResponseBody
    public String startTask(@RequestParam("taskId") String taskId) throws Exception 
        return timedTaskService.startTask(taskId);
    

    /**
     * @Author yrd
     * @Description 关闭定时任务
     * @Date 10:39 2021/12/20
     * @Param [taskId]
     * @return java.lang.String
     **/
    @RequestMapping("/stopTask")
    @ResponseBody
    public String stopTask(@RequestParam("taskId") String taskId) throws Exception 
        return timedTaskService.stopTask(taskId);
    

    /**
     * @Author yrd
     * @Description 修改定时任务
     * @Date 10:39 2021/12/20
     * @Param [request]
     * @return java.lang.String
     **/
    @RequestMapping("/editTimedTask")
    @ResponseBody
    public String editTimedTask(HttpServletRequest request) throws Exception 
        Map param = new HashMap();
        param.put("taskId", request.getParameter("taskId").toString());
        param.put("taskName", request.getParameter("taskName").toString());
        param.put("cron", request.getParameter("cron").toString());
        return timedTaskService.editTimedTask(param);
    

        前端、service及dao层的对应代码本文不再详述,以上功能亲测可以使用,并已运用到实际项目当中去。

以上是关于ThreadPoolTaskScheduler实现动态管理定时任务的主要内容,如果未能解决你的问题,请参考以下文章

ThreadPoolTaskScheduler实现动态管理定时任务

ThreadPoolTaskScheduler实现动态管理定时任务

ThreadPoolTaskScheduler实现动态管理定时任务

ThreadPoolTaskScheduler动态添加、移除定时任务

ThreadPoolTaskScheduler 周期任务原理

SpringBoot 自定义ThreadPoolTaskScheduler线程池执行定时任务