ThreadPoolTaskScheduler动态添加、移除定时任务
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了ThreadPoolTaskScheduler动态添加、移除定时任务相关的知识,希望对你有一定的参考价值。
参考技术A 最近遇到一个需求,定时任务的业务逻辑不会改变,但需要动态添加、移除定时任务,而且定时执行时间有可能随时改变,这可怎么实现呢?
首先,配置定时任务线程池;
第二步,建立任务,里面包含了定时任务需要实现的业务逻辑;
第三步,应用定时任务,包括添加、移除;
最后,运行入口程序,打开浏览器进行测试; 通过浏览器分别执行了localhost:8080/index/insert/1000/10、localhost:8080/index/insert/2000/20,也就是添加了两个任务,任务1000每10s执行一次,任务2000每20s执行一次;
执行 http://localhost:8080/index/remove/1000 ,把1000的任务移除掉,再看执行结果,只剩下任务2000,ok,动态添加、移除定时任务编码完成。
当然,这里为了测试,把管理任务的队列直接放到了Controller里,实际应用时应保持全局唯一。
最后总结
通过这个需求,我们又用到了一个类ThreadPoolTaskScheduler,它有别于ThreadPoolTaskExecutor类,有兴趣有时间的可以查看源码。
动态添加、移除定时任务的操作流程,大致可以分为以下四个步骤:
1.建立一个定时任务线程池;
2.为定时任务线程池建立一个队列,来管理这些任务;
3.根据唯一标识,往定时任务线程池和队列里分别添加这个任务;
4.根据唯一标识,从定时任务线程池里取消一个任务,并从队列里移除这个任务。
基于springboot ThreadPoolTaskScheduler类实现定时任务动态添加修改
ThreadPoolTaskScheduler的使用 - 西红柿里没有番茄 - 博客园 (cnblogs.com)https://www.cnblogs.com/lyd447113735/p/14955337.html(1357条消息) 在 SpringBoot 中使用 ThreadPoolTaskScheduler 实现定时任务_Wayfreem的博客-CSDN博客_threadpooltaskschedulerhttps://blog.csdn.net/qq_18948359/article/details/125499389
创建 ScheduledController
package com.example.demo.task.controller;
import com.example.demo.task.entity.ScheduledTaskBean;
import com.example.demo.task.service.ScheduledTaskService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import java.util.List;
/**
* @program: teak-damo
* @author: xlk
*
* 定时任务 controller
*/
@RestController
@RequestMapping("/scheduled")
public class ScheduledController
@Autowired
private ScheduledTaskService scheduledTaskService;
/**
* 所有任务列表 /scheduled/taskList
*/
@RequestMapping("/taskList")
public List<ScheduledTaskBean> taskList()
return scheduledTaskService.taskList();
/**
* 根据任务key => 启动任务
*/
@RequestMapping("/start")
public String start(@RequestParam("taskKey") String taskKey)
scheduledTaskService.start(taskKey);
return "start success";
/**
* 根据任务key => 停止任务
*/
@RequestMapping("/stop")
public String stop(@RequestParam("taskKey") String taskKey)
scheduledTaskService.stop(taskKey);
return "stop success";
/**
* 根据任务key => 重启任务
*/
@RequestMapping("/restart")
public String restart(@RequestParam("taskKey") String taskKey)
scheduledTaskService.restart(taskKey);
return "restart success";
@RequestMapping("/cron")
public String cron(@RequestParam("taskKey") String taskKey,@RequestParam("cron") String cron)
scheduledTaskService.cron(taskKey,cron);
return "cron success";
创建实体类 ScheduledTaskBean
package com.example.demo.task.entity;
/**
* @program: teak-damo
* @author: xlk
*
*/
public class ScheduledTaskBean
/**
* 任务key值 唯一
*/
private String taskKey;
/**
* 任务描述
*/
private String taskDesc;
/**
* 任务表达式
*/
private String taskCron;
/**
* 程序初始化是否启动 1 是 0 否
*/
private Integer initStartFlag;
// 启动状态-0关闭 1启动
private Integer type;
/**
* 当前是否已启动
*/
private boolean startFlag;
public String getTaskKey()
return taskKey;
public void setTaskKey(String taskKey)
this.taskKey = taskKey;
public String getTaskDesc()
return taskDesc;
public void setTaskDesc(String taskDesc)
this.taskDesc = taskDesc;
public String getTaskCron()
return taskCron;
public void setTaskCron(String taskCron)
this.taskCron = taskCron;
public Integer getInitStartFlag()
return initStartFlag;
public void setInitStartFlag(Integer initStartFlag)
this.initStartFlag = initStartFlag;
public boolean isStartFlag()
return startFlag;
public void setStartFlag(boolean startFlag)
this.startFlag = startFlag;
public Integer getType()
return type;
public void setType(Integer type)
this.type = type;
创建mapper
package com.example.demo.task.mapper;
import com.example.demo.task.entity.ScheduledTaskBean;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Param;
import org.apache.ibatis.annotations.Select;
import java.util.List;
/**
* 定时任务表 mapper
* demo 项目未使用 xml方式,使用注解方式查询数据以便演示
*/
@Mapper
public interface ScheduledTaskMapper
/**
* 根据key 获取 任务信息
*/
@Select("select task_key as taskKey,task_desc as taskDesc,task_cron as taskCron,init_start_flag as initStartFlag from scheduled_task where task_key = '$taskKey' ")
ScheduledTaskBean getByKey(@Param("taskKey") String taskKey);
/**
* 获取程序初始化需要自启的任务信息
*/
@Select("select task_key as taskKey,task_desc as taskDesc,task_cron as taskCron,init_start_flag as initStartFlag from scheduled_task where init_start_flag=1 ")
List<ScheduledTaskBean> getAllNeedStartTask();
/**
* 获取所有任务
*/
@Select("select task_key as taskKey,task_desc as taskDesc,task_cron as taskCron,init_start_flag as initStartFlag from scheduled_task ")
List<ScheduledTaskBean> getAllTask();
@Select("update scheduled_task set type = 0 where task_key = '$taskKey' ")
void stopByKey(@Param("taskKey") String taskKey);
@Select("update scheduled_task set type = 1 where task_key = '$taskKey' ")
void startByKey(@Param("taskKey") String taskKey);
@Select("update scheduled_task set task_cron = '$cron' where task_key = '$taskKey' ")
void cron(@Param("taskKey") String taskKey,@Param("cron") String cron);
创建services
package com.example.demo.task.service;
import com.example.demo.task.entity.ScheduledTaskBean;
import java.util.List;
/**
* 定时任务接口
*/
public interface ScheduledTaskService
/**
* 所有任务列表
*/
List<ScheduledTaskBean> taskList();
/**
* 根据任务key 启动任务
*/
Boolean start(String taskKey);
/**
* 根据任务key 停止任务
*/
Boolean stop(String taskKey);
/**
* 根据任务key 重启任务
*/
Boolean restart(String taskKey);
/**
* 程序启动时初始化 ==> 启动所有正常状态的任务
*/
void initAllTask(List<ScheduledTaskBean> scheduledTaskBeanList);
void cron(String taskKey, String cron);
创建实现类
package com.example.demo.task.service.impl;
/**
* @program: teak-damo
* @author: xlk
*
*/
import com.example.demo.task.ScheduledTaskJob;
import com.example.demo.task.mapper.ScheduledTaskMapper;
import com.example.demo.task.entity.ScheduledTaskBean;
import com.example.demo.task.service.ScheduledTaskService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.scheduling.Trigger;
import org.springframework.scheduling.TriggerContext;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import org.springframework.scheduling.support.CronTrigger;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.locks.ReentrantLock;
/**
* 定时任务实现
*/
@Service
public class ScheduledTaskServiceImpl implements ScheduledTaskService
/**
* 日志
*/
private static final Logger LOGGER = LoggerFactory.getLogger(ScheduledTaskServiceImpl.class);
@Autowired
private ScheduledTaskMapper taskMapper;
/**
* 可重入锁
*/
private ReentrantLock lock = new ReentrantLock();
/**
* 定时任务线程池
*/
@Autowired
private ThreadPoolTaskScheduler threadPoolTaskScheduler;
/**
* 所有定时任务存放Map
* key :任务 key
* value:任务实现
*/
@Autowired
@Qualifier(value = "scheduledTaskJobMap")
private Map<String, ScheduledTaskJob> scheduledTaskJobMap;
/**
* 存放已经启动的任务map
*/
private Map<String, ScheduledFuture> scheduledFutureMap = new ConcurrentHashMap<>();
/**
* 所有任务列表
*/
@Override
public List<ScheduledTaskBean> taskList()
LOGGER.info(">>>>>> 获取任务列表开始 >>>>>> ");
//数据库查询所有任务 => 未做分页
List<ScheduledTaskBean> taskBeanList = taskMapper.getAllTask();
if (CollectionUtils.isEmpty(taskBeanList))
return new ArrayList<>();
for (ScheduledTaskBean taskBean : taskBeanList)
String taskKey = taskBean.getTaskKey();
//是否启动标记处理
taskBean.setStartFlag(this.isStart(taskKey));
LOGGER.info(">>>>>> 获取任务列表结束 >>>>>> ");
return taskBeanList;
/**
* 根据任务key 启动任务
*/
@Override
public Boolean start(String taskKey)
LOGGER.info(">>>>>> 启动任务 开始 >>>>>>", taskKey);
//添加锁放一个线程启动,防止多人启动多次
lock.lock();
LOGGER.info(">>>>>> 添加任务启动锁完毕");
try
//校验是否已经启动
if (this.isStart(taskKey))
LOGGER.info(">>>>>> 当前任务已经启动,无需重复启动!");
return false;
//校验任务是否存在
if (!scheduledTaskJobMap.containsKey(taskKey))
return false;
//根据key数据库获取任务配置信息
ScheduledTaskBean scheduledTask = taskMapper.getByKey(taskKey);
taskMapper.startByKey(taskKey);
//启动任务
this.doStartTask(scheduledTask);
finally
// 释放锁
lock.unlock();
LOGGER.info(">>>>>> 释放任务启动锁完毕");
LOGGER.info(">>>>>> 启动任务 结束 >>>>>>", taskKey);
return true;
/**
* 根据 key 停止任务
*/
@Override
public Boolean stop(String taskKey)
LOGGER.info(">>>>>> 进入停止任务 >>>>>>", taskKey);
//当前任务实例是否存在
boolean taskStartFlag = scheduledFutureMap.containsKey(taskKey);
LOGGER.info(">>>>>> 当前任务实例是否存在 ", taskStartFlag);
if (taskStartFlag)
//获取任务实例
ScheduledFuture scheduledFuture = scheduledFutureMap.get(taskKey);
//关闭实例
scheduledFuture.cancel(true);
taskMapper.stopByKey(taskKey);
LOGGER.info(">>>>>> 结束停止任务 >>>>>>", taskKey);
return taskStartFlag;
/**
* 根据任务key 重启任务
*/
@Override
public Boolean restart(String taskKey)
LOGGER.info(">>>>>> 进入重启任务 >>>>>>", taskKey);
//先停止
this.stop(taskKey);
//再启动
return this.start(taskKey);
/**
* 程序启动时初始化 ==> 启动所有正常状态的任务
*/
@Override
public void initAllTask(List<ScheduledTaskBean> scheduledTaskBeanList)
LOGGER.info("程序启动 ==> 初始化所有任务开始 !size=", scheduledTaskBeanList.size());
if (CollectionUtils.isEmpty(scheduledTaskBeanList))
return;
for (ScheduledTaskBean scheduledTask : scheduledTaskBeanList)
//任务 key
String taskKey = scheduledTask.getTaskKey();
//校验是否已经启动
if (this.isStart(taskKey))
continue;
//启动任务
this.doStartTask(scheduledTask);
LOGGER.info("程序启动 ==> 初始化所有任务结束 !size=", scheduledTaskBeanList.size());
@Override
public void cron(String taskKey, String cron)
taskMapper.cron(taskKey,cron);
/**
* 执行启动任务
*/
private void doStartTask(ScheduledTaskBean scheduledTask)
//任务key
String taskKey = scheduledTask.getTaskKey();
//定时表达式
String taskCron = scheduledTask.getTaskCron();
//获取需要定时调度的接口
ScheduledTaskJob scheduledTaskJob = scheduledTaskJobMap.get(taskKey);
LOGGER.info(">>>>>> 任务 [ ] ,cron=", scheduledTask.getTaskDesc(), taskCron);
ScheduledFuture scheduledFuture = threadPoolTaskScheduler.schedule(scheduledTaskJob,
new Trigger()
@Override
public Date nextExecutionTime(TriggerContext triggerContext)
CronTrigger cronTrigger = new CronTrigger(taskCron);
return cronTrigger.nextExecutionTime(triggerContext);
);
//将启动的任务放入 map
scheduledFutureMap.put(taskKey, scheduledFuture);
/**
* 任务是否已经启动
*/
private Boolean isStart(String taskKey)
//校验是否已经启动
if (scheduledFutureMap.containsKey(taskKey))
if (!scheduledFutureMap.get(taskKey).isCancelled())
return true;
return false;
创建 ScheduledTaskConfig
package com.example.demo.task;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import java.util.Map;
/**
* @program: teak-damo
* @author: xlk
*
*/
@Configuration
public class ScheduledTaskConfig
/**
* 日志
*/
private static final Logger LOGGER = LoggerFactory.getLogger(ScheduledTaskConfig.class);
@Bean
public ThreadPoolTaskScheduler threadPoolTaskScheduler()
LOGGER.info("创建定时任务调度线程池 start");
ThreadPoolTaskScheduler threadPoolTaskScheduler = new ThreadPoolTaskScheduler();
threadPoolTaskScheduler.setPoolSize(20);
threadPoolTaskScheduler.setThreadNamePrefix("taskExecutor-");
threadPoolTaskScheduler.setWaitForTasksToCompleteOnShutdown(true);
threadPoolTaskScheduler.setAwaitTerminationSeconds(60);
LOGGER.info("创建定时任务调度线程池 end");
return threadPoolTaskScheduler;
/**
* 初始化定时任务Map
* key :任务key
* value : 执行接口实现
*/
@Bean(name = "scheduledTaskJobMap")
public Map<String, ScheduledTaskJob> scheduledTaskJobMap()
return ScheduledTaskEnum.initScheduledTask();
创建 ScheduledTaskEnum
package com.example.demo.task;
import com.example.demo.task.task.ScheduledTask01;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* 定时任务枚举值
* 注:key 需要与数据库保持一致
*/
public enum ScheduledTaskEnum
/**
* 任务1
*/
TASK_01("scheduledTask01", new ScheduledTask01());
/**
* 定时任务key
*/
private String taskKey;
/**
* 定时任务 执行实现类
*/
private ScheduledTaskJob scheduledTaskJob;
ScheduledTaskEnum(String taskKey, ScheduledTaskJob scheduledTaskJob)
this.taskKey = taskKey;
this.scheduledTaskJob = scheduledTaskJob;
/**
* 初始化 所有任务
*/
public static Map<String, ScheduledTaskJob> initScheduledTask()
if (ScheduledTaskEnum.values().length < 0)
return new ConcurrentHashMap<>();
Map<String, ScheduledTaskJob> scheduledTaskJobMap = new ConcurrentHashMap<>();
for (ScheduledTaskEnum taskEnum : ScheduledTaskEnum.values())
scheduledTaskJobMap.put(taskEnum.taskKey, taskEnum.scheduledTaskJob);
return scheduledTaskJobMap;
创建 ScheduledTaskJob
package com.example.demo.task;
/**
* @program: teak-damo
* @author: xlk
*
*/
/**
* 调度任务公共父接口
*/
public interface ScheduledTaskJob extends Runnable
创建 ScheduledTaskRunner
package com.example.demo.task;
/**
* @program: teak-damo
* @author: xlk
*
*/
import com.example.demo.task.entity.ScheduledTaskBean;
import com.example.demo.task.mapper.ScheduledTaskMapper;
import com.example.demo.task.service.ScheduledTaskService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;
import java.util.List;
/**
* @see @Order注解的执行优先级是按value值从小到大顺序。
* 项目启动完毕后开启需要自启的任务
*/
@Component
@Order(value = 1)
public class ScheduledTaskRunner implements ApplicationRunner
/**
* 日志
*/
private static final Logger LOGGER = LoggerFactory.getLogger(ScheduledTaskRunner.class);
@Autowired
private ScheduledTaskMapper taskMapper;
@Autowired
private ScheduledTaskService scheduledTaskService;
/**
* 程序启动完毕后,需要自启的任务
*/
@Override
public void run(ApplicationArguments applicationArguments) throws Exception
LOGGER.info(" >>>>>> 项目启动完毕, 开启 => 需要自启的任务 开始!");
List<ScheduledTaskBean> scheduledTaskBeanList = taskMapper.getAllNeedStartTask();
scheduledTaskService.initAllTask(scheduledTaskBeanList);
LOGGER.info(" >>>>>> 项目启动完毕, 开启 => 需要自启的任务 结束!");
创建 DemoApplication
package com.example.demo;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableScheduling;
@EnableScheduling
@SpringBootApplication
public class DemoApplication
public static void main(String[] args)
SpringApplication.run(DemoApplication.class, args);
创建 application.properties
server.port=8082
spring.datasource.url:jdbc:mysql://192.168.1.215:3306/task_demo?useSSL=false
spring.datasource.username:root
spring.datasource.password:root
数据库脚本
SET NAMES utf8mb4;
SET FOREIGN_KEY_CHECKS = 0;
-- ----------------------------
-- Table structure for scheduled_task
-- ----------------------------
DROP TABLE IF EXISTS `scheduled_task`;
CREATE TABLE `scheduled_task` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`task_key` varchar(128) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL COMMENT '任务key值(使用bean名称)',
`task_desc` varchar(128) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '任务描述',
`task_cron` varchar(128) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL COMMENT '任务表达式',
`init_start_flag` int(2) NOT NULL DEFAULT 1 COMMENT '程序初始化是否启动 1 是 0 否',
`create_time` timestamp(0) NOT NULL DEFAULT CURRENT_TIMESTAMP(0) COMMENT '创建时间',
`update_time` timestamp(0) NOT NULL DEFAULT CURRENT_TIMESTAMP(0) ON UPDATE CURRENT_TIMESTAMP(0) COMMENT '更新时间',
PRIMARY KEY (`id`) USING BTREE,
UNIQUE INDEX `uniqu_task_key`(`task_key`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 2 CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;
-- ----------------------------
-- Records of scheduled_task
-- ----------------------------
INSERT INTO `scheduled_task` VALUES (1, 'scheduledTask01', '定时任务01', '0/5 * * * * ?', 1, '2021-12-16 04:17:44', '2021-12-16 04:17:44');
SET FOREIGN_KEY_CHECKS = 1;
打印日志
demo地址:
链接:https://pan.baidu.com/s/1kiEX5XTW7lKouWe4ODushw
提取码:1234
以上是关于ThreadPoolTaskScheduler动态添加、移除定时任务的主要内容,如果未能解决你的问题,请参考以下文章
ThreadPoolTaskScheduler实现动态管理定时任务
ThreadPoolTaskScheduler实现动态管理定时任务
ThreadPoolTaskScheduler实现动态管理定时任务
ThreadPoolTaskScheduler动态添加、移除定时任务