ThreadPoolTaskScheduler动态添加、移除定时任务

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了ThreadPoolTaskScheduler动态添加、移除定时任务相关的知识,希望对你有一定的参考价值。

参考技术A

最近遇到一个需求,定时任务的业务逻辑不会改变,但需要动态添加、移除定时任务,而且定时执行时间有可能随时改变,这可怎么实现呢?
首先,配置定时任务线程池;

第二步,建立任务,里面包含了定时任务需要实现的业务逻辑;

第三步,应用定时任务,包括添加、移除;

最后,运行入口程序,打开浏览器进行测试; 通过浏览器分别执行了localhost:8080/index/insert/1000/10、localhost:8080/index/insert/2000/20,也就是添加了两个任务,任务1000每10s执行一次,任务2000每20s执行一次;

执行 http://localhost:8080/index/remove/1000 ,把1000的任务移除掉,再看执行结果,只剩下任务2000,ok,动态添加、移除定时任务编码完成。

当然,这里为了测试,把管理任务的队列直接放到了Controller里,实际应用时应保持全局唯一。
最后总结

通过这个需求,我们又用到了一个类ThreadPoolTaskScheduler,它有别于ThreadPoolTaskExecutor类,有兴趣有时间的可以查看源码。

动态添加、移除定时任务的操作流程,大致可以分为以下四个步骤:

1.建立一个定时任务线程池;

2.为定时任务线程池建立一个队列,来管理这些任务;

3.根据唯一标识,往定时任务线程池和队列里分别添加这个任务;

4.根据唯一标识,从定时任务线程池里取消一个任务,并从队列里移除这个任务。

基于springboot ThreadPoolTaskScheduler类实现定时任务动态添加修改

ThreadPoolTaskScheduler的使用 - 西红柿里没有番茄 - 博客园 (cnblogs.com)https://www.cnblogs.com/lyd447113735/p/14955337.html(1357条消息) 在 SpringBoot 中使用 ThreadPoolTaskScheduler 实现定时任务_Wayfreem的博客-CSDN博客_threadpooltaskschedulerhttps://blog.csdn.net/qq_18948359/article/details/125499389

创建  ScheduledController

package com.example.demo.task.controller;

import com.example.demo.task.entity.ScheduledTaskBean;
import com.example.demo.task.service.ScheduledTaskService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

import java.util.List;

/**
 * @program: teak-damo
 * @author: xlk
 *
 * 定时任务 controller
 */
@RestController
@RequestMapping("/scheduled")
public class ScheduledController 
    @Autowired
    private ScheduledTaskService scheduledTaskService;
    /**
     * 所有任务列表 /scheduled/taskList
     */
    @RequestMapping("/taskList")
    public List<ScheduledTaskBean> taskList() 
        return scheduledTaskService.taskList();
    

    /**
     * 根据任务key => 启动任务
     */
    @RequestMapping("/start")
    public String start(@RequestParam("taskKey") String taskKey) 
        scheduledTaskService.start(taskKey);
        return "start success";
    

    /**
     * 根据任务key => 停止任务
     */
    @RequestMapping("/stop")
    public String stop(@RequestParam("taskKey") String taskKey) 
        scheduledTaskService.stop(taskKey);
        return "stop success";
    

    /**
     * 根据任务key => 重启任务
     */
    @RequestMapping("/restart")
    public String restart(@RequestParam("taskKey") String taskKey) 
        scheduledTaskService.restart(taskKey);
        return "restart success";
    


    @RequestMapping("/cron")
    public String cron(@RequestParam("taskKey") String taskKey,@RequestParam("cron") String cron) 
        scheduledTaskService.cron(taskKey,cron);
        return "cron success";
    



创建实体类 ScheduledTaskBean

package com.example.demo.task.entity;

/**
 * @program: teak-damo
 * @author: xlk
 *
 */

public class ScheduledTaskBean 

    /**
     * 任务key值 唯一
     */
    private String taskKey;
    /**
     * 任务描述
     */
    private String taskDesc;
    /**
     * 任务表达式
     */
    private String taskCron;

    /**
     * 程序初始化是否启动 1 是 0 否
     */
    private Integer initStartFlag;

    // 启动状态-0关闭 1启动
    private Integer type;

    /**
     * 当前是否已启动
     */
    private boolean startFlag;



    public String getTaskKey() 
        return taskKey;
    

    public void setTaskKey(String taskKey) 
        this.taskKey = taskKey;
    

    public String getTaskDesc() 
        return taskDesc;
    

    public void setTaskDesc(String taskDesc) 
        this.taskDesc = taskDesc;
    

    public String getTaskCron() 
        return taskCron;
    

    public void setTaskCron(String taskCron) 
        this.taskCron = taskCron;
    

    public Integer getInitStartFlag() 
        return initStartFlag;
    

    public void setInitStartFlag(Integer initStartFlag) 
        this.initStartFlag = initStartFlag;
    

    public boolean isStartFlag() 
        return startFlag;
    

    public void setStartFlag(boolean startFlag) 
        this.startFlag = startFlag;
    

    public Integer getType() 
        return type;
    

    public void setType(Integer type) 
        this.type = type;
    

创建mapper

package com.example.demo.task.mapper;

import com.example.demo.task.entity.ScheduledTaskBean;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Param;
import org.apache.ibatis.annotations.Select;

import java.util.List;

/**
 * 定时任务表 mapper
 * demo 项目未使用 xml方式,使用注解方式查询数据以便演示
 */
@Mapper
public interface ScheduledTaskMapper 

    /**
     * 根据key 获取 任务信息
     */
    @Select("select task_key as taskKey,task_desc as taskDesc,task_cron as taskCron,init_start_flag as initStartFlag  from scheduled_task where task_key = '$taskKey' ")
    ScheduledTaskBean getByKey(@Param("taskKey") String taskKey);

    /**
     * 获取程序初始化需要自启的任务信息
     */
    @Select("select task_key as taskKey,task_desc as taskDesc,task_cron as taskCron,init_start_flag as initStartFlag from scheduled_task where init_start_flag=1 ")
    List<ScheduledTaskBean> getAllNeedStartTask();

    /**
     * 获取所有任务
     */
    @Select("select task_key as taskKey,task_desc as taskDesc,task_cron as taskCron,init_start_flag as initStartFlag  from scheduled_task ")
    List<ScheduledTaskBean> getAllTask();


    @Select("update scheduled_task set type = 0 where task_key = '$taskKey' ")
    void stopByKey(@Param("taskKey") String taskKey);

    @Select("update scheduled_task set type = 1 where task_key = '$taskKey' ")
    void startByKey(@Param("taskKey") String taskKey);

    @Select("update scheduled_task set task_cron = '$cron' where task_key = '$taskKey' ")
    void cron(@Param("taskKey") String taskKey,@Param("cron") String cron);




创建services

package com.example.demo.task.service;

import com.example.demo.task.entity.ScheduledTaskBean;

import java.util.List;

/**
 * 定时任务接口
 */
public interface ScheduledTaskService 

    /**
     * 所有任务列表
     */
    List<ScheduledTaskBean> taskList();

    /**
     * 根据任务key 启动任务
     */
    Boolean start(String taskKey);

    /**
     * 根据任务key 停止任务
     */
    Boolean stop(String taskKey);

    /**
     * 根据任务key 重启任务
     */
    Boolean restart(String taskKey);


    /**
     * 程序启动时初始化  ==> 启动所有正常状态的任务
     */
    void initAllTask(List<ScheduledTaskBean> scheduledTaskBeanList);

    void cron(String taskKey, String cron);


创建实现类

package com.example.demo.task.service.impl;

/**
 * @program: teak-damo
 * @author: xlk
 *
 */

import com.example.demo.task.ScheduledTaskJob;
import com.example.demo.task.mapper.ScheduledTaskMapper;
import com.example.demo.task.entity.ScheduledTaskBean;
import com.example.demo.task.service.ScheduledTaskService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.scheduling.Trigger;
import org.springframework.scheduling.TriggerContext;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import org.springframework.scheduling.support.CronTrigger;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;

import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.locks.ReentrantLock;

/**
 * 定时任务实现
 */
@Service
public class ScheduledTaskServiceImpl implements ScheduledTaskService 
    /**
     * 日志
     */
    private static final Logger LOGGER = LoggerFactory.getLogger(ScheduledTaskServiceImpl.class);

    @Autowired
    private ScheduledTaskMapper taskMapper;
    /**
     * 可重入锁
     */
    private ReentrantLock lock = new ReentrantLock();
    /**
     * 定时任务线程池
     */
    @Autowired
    private ThreadPoolTaskScheduler threadPoolTaskScheduler;
    /**
     * 所有定时任务存放Map
     * key :任务 key
     * value:任务实现
     */
    @Autowired
    @Qualifier(value = "scheduledTaskJobMap")
    private Map<String, ScheduledTaskJob> scheduledTaskJobMap;

    /**
     * 存放已经启动的任务map
     */
    private Map<String, ScheduledFuture> scheduledFutureMap = new ConcurrentHashMap<>();

    /**
     * 所有任务列表
     */
    @Override
    public List<ScheduledTaskBean> taskList() 
        LOGGER.info(">>>>>> 获取任务列表开始 >>>>>> ");
        //数据库查询所有任务 => 未做分页
        List<ScheduledTaskBean> taskBeanList = taskMapper.getAllTask();
        if (CollectionUtils.isEmpty(taskBeanList)) 
            return new ArrayList<>();
        

        for (ScheduledTaskBean taskBean : taskBeanList) 
            String taskKey = taskBean.getTaskKey();
            //是否启动标记处理
            taskBean.setStartFlag(this.isStart(taskKey));
        
        LOGGER.info(">>>>>> 获取任务列表结束 >>>>>> ");
        return taskBeanList;
    


    /**
     * 根据任务key 启动任务
     */
    @Override
    public Boolean start(String taskKey) 
        LOGGER.info(">>>>>> 启动任务  开始 >>>>>>", taskKey);
        //添加锁放一个线程启动,防止多人启动多次
        lock.lock();
        LOGGER.info(">>>>>> 添加任务启动锁完毕");
        try 
            //校验是否已经启动
            if (this.isStart(taskKey)) 
                LOGGER.info(">>>>>> 当前任务已经启动,无需重复启动!");
                return false;
            
            //校验任务是否存在
            if (!scheduledTaskJobMap.containsKey(taskKey)) 
                return false;
            
            //根据key数据库获取任务配置信息
            ScheduledTaskBean scheduledTask = taskMapper.getByKey(taskKey);
            taskMapper.startByKey(taskKey);
            //启动任务
            this.doStartTask(scheduledTask);
         finally 
            // 释放锁
            lock.unlock();
            LOGGER.info(">>>>>> 释放任务启动锁完毕");
        
        LOGGER.info(">>>>>> 启动任务  结束 >>>>>>", taskKey);
        return true;
    

    /**
     * 根据 key 停止任务
     */
    @Override
    public Boolean stop(String taskKey) 
        LOGGER.info(">>>>>> 进入停止任务   >>>>>>", taskKey);
        //当前任务实例是否存在
        boolean taskStartFlag = scheduledFutureMap.containsKey(taskKey);
        LOGGER.info(">>>>>> 当前任务实例是否存在 ", taskStartFlag);
        if (taskStartFlag) 
            //获取任务实例
            ScheduledFuture scheduledFuture = scheduledFutureMap.get(taskKey);
            //关闭实例
            scheduledFuture.cancel(true);
           taskMapper.stopByKey(taskKey);
        
        LOGGER.info(">>>>>> 结束停止任务   >>>>>>", taskKey);
        return taskStartFlag;
    

    /**
     * 根据任务key 重启任务
     */
    @Override
    public Boolean restart(String taskKey) 
        LOGGER.info(">>>>>> 进入重启任务   >>>>>>", taskKey);
        //先停止
        this.stop(taskKey);
        //再启动
        return this.start(taskKey);
    

    /**
     * 程序启动时初始化  ==> 启动所有正常状态的任务
     */
    @Override
    public void initAllTask(List<ScheduledTaskBean> scheduledTaskBeanList) 
        LOGGER.info("程序启动 ==> 初始化所有任务开始 !size=", scheduledTaskBeanList.size());
        if (CollectionUtils.isEmpty(scheduledTaskBeanList)) 
            return;
        
        for (ScheduledTaskBean scheduledTask : scheduledTaskBeanList) 
            //任务 key
            String taskKey = scheduledTask.getTaskKey();
            //校验是否已经启动
            if (this.isStart(taskKey)) 
                continue;
            
            //启动任务
            this.doStartTask(scheduledTask);
        
        LOGGER.info("程序启动 ==> 初始化所有任务结束 !size=", scheduledTaskBeanList.size());
    

    @Override
    public void cron(String taskKey, String cron) 
        taskMapper.cron(taskKey,cron);
    

    /**
     * 执行启动任务
     */
    private void doStartTask(ScheduledTaskBean scheduledTask) 
        //任务key
        String taskKey = scheduledTask.getTaskKey();
        //定时表达式
        String taskCron = scheduledTask.getTaskCron();
        //获取需要定时调度的接口
        ScheduledTaskJob scheduledTaskJob = scheduledTaskJobMap.get(taskKey);
        LOGGER.info(">>>>>> 任务 [  ] ,cron=", scheduledTask.getTaskDesc(), taskCron);
        ScheduledFuture scheduledFuture = threadPoolTaskScheduler.schedule(scheduledTaskJob,
                new Trigger() 
                    @Override
                    public Date nextExecutionTime(TriggerContext triggerContext) 
                        CronTrigger cronTrigger = new CronTrigger(taskCron);
                        return cronTrigger.nextExecutionTime(triggerContext);
                    
                );
        //将启动的任务放入 map
        scheduledFutureMap.put(taskKey, scheduledFuture);
    

    /**
     * 任务是否已经启动
     */
    private Boolean isStart(String taskKey) 
        //校验是否已经启动
        if (scheduledFutureMap.containsKey(taskKey)) 
            if (!scheduledFutureMap.get(taskKey).isCancelled()) 
                return true;
            
        
        return false;
    



创建 ScheduledTaskConfig

package com.example.demo.task;


import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import java.util.Map;

/**
 * @program: teak-damo
 * @author: xlk
 *
 */

@Configuration
public class ScheduledTaskConfig 

    /**
     * 日志
     */
    private static final Logger LOGGER = LoggerFactory.getLogger(ScheduledTaskConfig.class);

    @Bean
    public ThreadPoolTaskScheduler threadPoolTaskScheduler() 
        LOGGER.info("创建定时任务调度线程池 start");
        ThreadPoolTaskScheduler threadPoolTaskScheduler = new ThreadPoolTaskScheduler();
        threadPoolTaskScheduler.setPoolSize(20);
        threadPoolTaskScheduler.setThreadNamePrefix("taskExecutor-");
        threadPoolTaskScheduler.setWaitForTasksToCompleteOnShutdown(true);
        threadPoolTaskScheduler.setAwaitTerminationSeconds(60);
        LOGGER.info("创建定时任务调度线程池 end");
        return threadPoolTaskScheduler;
    

    /**
     * 初始化定时任务Map
     * key :任务key
     * value : 执行接口实现
     */
    @Bean(name = "scheduledTaskJobMap")
    public Map<String, ScheduledTaskJob> scheduledTaskJobMap() 
        return ScheduledTaskEnum.initScheduledTask();
    



创建 ScheduledTaskEnum

package com.example.demo.task;

import com.example.demo.task.task.ScheduledTask01;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/**
 * 定时任务枚举值
 * 注:key 需要与数据库保持一致
 */
public enum ScheduledTaskEnum 

    /**
     * 任务1
     */
    TASK_01("scheduledTask01", new ScheduledTask01());
    /**
     * 定时任务key
     */
    private String taskKey;
    /**
     * 定时任务 执行实现类
     */
    private ScheduledTaskJob scheduledTaskJob;

    ScheduledTaskEnum(String taskKey, ScheduledTaskJob scheduledTaskJob) 
        this.taskKey = taskKey;
        this.scheduledTaskJob = scheduledTaskJob;
    

    /**
     * 初始化 所有任务
     */
    public static Map<String, ScheduledTaskJob> initScheduledTask() 
        if (ScheduledTaskEnum.values().length < 0) 
            return new ConcurrentHashMap<>();
        
        Map<String, ScheduledTaskJob> scheduledTaskJobMap = new ConcurrentHashMap<>();
        for (ScheduledTaskEnum taskEnum : ScheduledTaskEnum.values()) 
            scheduledTaskJobMap.put(taskEnum.taskKey, taskEnum.scheduledTaskJob);
        
        return scheduledTaskJobMap;
    


创建 ScheduledTaskJob 

package com.example.demo.task;

/**
 * @program: teak-damo
 * @author: xlk
 *
 */
/**
 * 调度任务公共父接口
 */
public interface ScheduledTaskJob extends Runnable 



创建 ScheduledTaskRunner 

package com.example.demo.task;

/**
 * @program: teak-damo
 * @author: xlk
 *
 */

import com.example.demo.task.entity.ScheduledTaskBean;
import com.example.demo.task.mapper.ScheduledTaskMapper;
import com.example.demo.task.service.ScheduledTaskService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;

import java.util.List;

/**
 * @see @Order注解的执行优先级是按value值从小到大顺序。
 * 项目启动完毕后开启需要自启的任务
 */
@Component
@Order(value = 1)
public class ScheduledTaskRunner implements ApplicationRunner 
    /**
     * 日志
     */
    private static final Logger LOGGER = LoggerFactory.getLogger(ScheduledTaskRunner.class);

    @Autowired
    private ScheduledTaskMapper taskMapper;

    @Autowired
    private ScheduledTaskService scheduledTaskService;

    /**
     * 程序启动完毕后,需要自启的任务
     */
    @Override
    public void run(ApplicationArguments applicationArguments) throws Exception 
        LOGGER.info(" >>>>>> 项目启动完毕, 开启 => 需要自启的任务 开始!");
        List<ScheduledTaskBean> scheduledTaskBeanList = taskMapper.getAllNeedStartTask();
        scheduledTaskService.initAllTask(scheduledTaskBeanList);
        LOGGER.info(" >>>>>> 项目启动完毕, 开启 => 需要自启的任务 结束!");
    


创建 DemoApplication 

package com.example.demo;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableScheduling;

@EnableScheduling
@SpringBootApplication
public class DemoApplication 

	public static void main(String[] args) 
		SpringApplication.run(DemoApplication.class, args);
	


创建 application.properties

server.port=8082
spring.datasource.url:jdbc:mysql://192.168.1.215:3306/task_demo?useSSL=false
spring.datasource.username:root
spring.datasource.password:root

数据库脚本



SET NAMES utf8mb4;
SET FOREIGN_KEY_CHECKS = 0;

-- ----------------------------
-- Table structure for scheduled_task
-- ----------------------------
DROP TABLE IF EXISTS `scheduled_task`;
CREATE TABLE `scheduled_task`  (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `task_key` varchar(128) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL COMMENT '任务key值(使用bean名称)',
  `task_desc` varchar(128) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '任务描述',
  `task_cron` varchar(128) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL COMMENT '任务表达式',
  `init_start_flag` int(2) NOT NULL DEFAULT 1 COMMENT '程序初始化是否启动 1 是 0 否',
  `create_time` timestamp(0) NOT NULL DEFAULT CURRENT_TIMESTAMP(0) COMMENT '创建时间',
  `update_time` timestamp(0) NOT NULL DEFAULT CURRENT_TIMESTAMP(0) ON UPDATE CURRENT_TIMESTAMP(0) COMMENT '更新时间',
  PRIMARY KEY (`id`) USING BTREE,
  UNIQUE INDEX `uniqu_task_key`(`task_key`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 2 CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;

-- ----------------------------
-- Records of scheduled_task
-- ----------------------------
INSERT INTO `scheduled_task` VALUES (1, 'scheduledTask01', '定时任务01', '0/5 * * * * ?', 1, '2021-12-16 04:17:44', '2021-12-16 04:17:44');

SET FOREIGN_KEY_CHECKS = 1;

打印日志

demo地址:

链接:https://pan.baidu.com/s/1kiEX5XTW7lKouWe4ODushw 
提取码:1234

以上是关于ThreadPoolTaskScheduler动态添加、移除定时任务的主要内容,如果未能解决你的问题,请参考以下文章

ThreadPoolTaskScheduler实现动态管理定时任务

ThreadPoolTaskScheduler实现动态管理定时任务

ThreadPoolTaskScheduler实现动态管理定时任务

ThreadPoolTaskScheduler动态添加、移除定时任务

ThreadPoolTaskScheduler 周期任务原理

SpringBoot 自定义ThreadPoolTaskScheduler线程池执行定时任务