SpringBoot 定时任务动态管理通用解决方案
Posted Java知音_
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了SpringBoot 定时任务动态管理通用解决方案相关的知识,希望对你有一定的参考价值。
一、功能说明
SpringBoot的定时任务的加强工具,实现对SpringBoot原生的定时任务进行动态管理,完全兼容原生@Scheduled注解,无需对原本的定时任务进行修改
二、快速使用
具体的功能已经封装成SpringBoot-starter即插即用
<dependency>
<groupId>com.github.guoyixing</groupId>
<artifactId>spring-boot-starter-super-scheduled</artifactId>
<version>0.3.1</version>
</dependency>
使用方法和源码:
码云:https://gitee.com/qiaodaimadewangcai/super-scheduled
github:https://github.com/guoyixing/super-scheduled
三、实现原理
1、动态管理实现
(1) 配置管理介绍
@Component("superScheduledConfig")
public class SuperScheduledConfig
/**
* 执行定时任务的线程池
*/
private ThreadPoolTaskScheduler taskScheduler;
/**
* 定时任务名称与定时任务回调钩子 的关联关系容器
*/
private Map<String, ScheduledFuture> nameToScheduledFuture = new ConcurrentHashMap<>();
/**
* 定时任务名称与定时任务需要执行的逻辑 的关联关系容器
*/
private Map<String, Runnable> nameToRunnable = new ConcurrentHashMap<>();
/**
* 定时任务名称与定时任务的源信息 的关联关系容器
*/
private Map<String, ScheduledSource> nameToScheduledSource = new ConcurrentHashMap<>();
/* 普通的get/sets省略 */
(2) 使用后处理器拦截SpringBoot原本的定时任务
实现
ApplicationContextAware
接口拿到SpringBoot的上下文实现
BeanPostProcessor
接口,将这个类标记为后处理器,后处理器会在每个bean实例化之后执行使用
@DependsOn
注解强制依赖SuperScheduledConfig
类,让SpringBoot实例化SuperScheduledPostProcessor
类之前先实例化SuperScheduledConfig
类主要实现逻辑在
postProcessAfterInitialization()
方法中
@DependsOn("superScheduledConfig")
@Component
@Order
public class SuperScheduledPostProcessor implements BeanPostProcessor, ApplicationContextAware
protected final Log logger = LogFactory.getLog(getClass());
private ApplicationContext applicationContext;
/**
* 实例化bean之前的操作
* @param bean bean实例
* @param beanName bean的Name
*/
@Override
public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException
return bean;
/**
* 实例化bean之后的操作
* @param bean bean实例
* @param beanName bean的Name
*/
@Override
public Object postProcessAfterInitialization(Object bean,
String beanName) throws BeansException
//1.获取配置管理器
SuperScheduledConfig superScheduledConfig = applicationContext.getBean(SuperScheduledConfig.class);
//2.获取当前实例化完成的bean的所有方法
Method[] methods = bean.getClass().getDeclaredMethods();
//循环处理对每个方法逐一处理
if (methods.length > 0)
for (Method method : methods)
//3.尝试在该方法上获取@Scheduled注解(SpringBoot的定时任务注解)
Scheduled annotation = method.getAnnotation(Scheduled.class);
//如果无法获取到@Scheduled注解,就跳过这个方法
if (annotation == null)
continue;
//4.创建定时任务的源属性
//创建定时任务的源属性(用来记录定时任务的配置,初始化的时候记录的是注解上原本的属性)
ScheduledSource scheduledSource = new ScheduledSource(annotation, method, bean);
//对注解上获取到源属性中的属性进行检测
if (!scheduledSource.check())
throw new SuperScheduledException("在" + beanName + "Bean中" + method.getName() + "方法的注解参数错误");
//生成定时任务的名称(id),使用beanName+“.”+方法名
String name = beanName + "." + method.getName();
//将以key-value的形式,将源数据存入配置管理器中,key:定时任务的名称 value:源数据
superScheduledConfig.addScheduledSource(name, scheduledSource);
try
//5.将原本SpringBoot的定时任务取消掉
clearOriginalScheduled(annotation);
catch (Exception e)
throw new SuperScheduledException("在关闭原始方法" + beanName + method.getName() + "时出现错误");
//最后bean保持原有返回
return bean;
/**
* 修改注解原先的属性
* @param annotation 注解实例对象
* @throws Exception
*/
private void clearOriginalScheduled(Scheduled annotation) throws Exception
changeAnnotationValue(annotation, "cron", Scheduled.CRON_DISABLED);
changeAnnotationValue(annotation, "fixedDelay", -1L);
changeAnnotationValue(annotation, "fixedDelayString", "");
changeAnnotationValue(annotation, "fixedRate", -1L);
changeAnnotationValue(annotation, "fixedRateString", "");
changeAnnotationValue(annotation, "initialDelay", -1L);
changeAnnotationValue(annotation, "initialDelayString", "");
/**
* 获取SpringBoot的上下文
* @param applicationContext SpringBoot的上下文
*/
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException
this.applicationContext = applicationContext;
(3) 使用ApplicationRunner
初始化自定义的定时任务运行器
实现
ApplicationContextAware
接口拿到SpringBoot的上下文使用
@DependsOn
注解强制依赖threadPoolTaskScheduler
类实现
ApplicationRunner
接口,在所有bean初始化结束之后,运行自定义逻辑主要实现逻辑在
run()
方法中
@DependsOn("threadPoolTaskScheduler")
@Component
public class SuperScheduledApplicationRunner implements ApplicationRunner, ApplicationContextAware
protected final Log logger = LogFactory.getLog(getClass());
private DateTimeFormatter df = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
private ApplicationContext applicationContext;
/**
* 定时任务配置管理器
*/
@Autowired
private SuperScheduledConfig superScheduledConfig;
/**
* 定时任务执行线程
*/
@Autowired
private ThreadPoolTaskScheduler threadPoolTaskScheduler;
@Override
public void run(ApplicationArguments args)
//1.定时任务配置管理器中缓存 定时任务执行线程
superScheduledConfig.setTaskScheduler(threadPoolTaskScheduler);
//2.获取所有定时任务源数据
Map<String, ScheduledSource> nameToScheduledSource = superScheduledConfig.getNameToScheduledSource();
//逐一处理定时任务
for (String name : nameToScheduledSource.keySet())
//3.获取定时任务源数据
ScheduledSource scheduledSource = nameToScheduledSource.get(name);
//4.获取所有增强类
String[] baseStrengthenBeanNames = applicationContext.getBeanNamesForType(BaseStrengthen.class);
//5.创建执行控制器
SuperScheduledRunnable runnable = new SuperScheduledRunnable();
//配置执行控制器
runnable.setMethod(scheduledSource.getMethod());
runnable.setBean(scheduledSource.getBean());
//6.逐一处理增强类(增强器实现原理后面具体分析)
List<Point> points = new ArrayList<>(baseStrengthenBeanNames.length);
for (String baseStrengthenBeanName : baseStrengthenBeanNames)
//7.将增强器代理成point
Object baseStrengthenBean = applicationContext.getBean(baseStrengthenBeanName);
//创建代理
Point proxy = ProxyUtils.getInstance(Point.class, new RunnableBaseInterceptor(baseStrengthenBean, runnable));
proxy.setSuperScheduledName(name);
//8.所有的points连成起来
points.add(proxy);
//将point形成调用链
runnable.setChain(new Chain(points));
//将执行逻辑封装并缓存到定时任务配置管理器中
superScheduledConfig.addRunnable(name, runnable::invoke);
try
//8.启动定时任务
ScheduledFuture<?> schedule = ScheduledFutureFactory.create(threadPoolTaskScheduler
, scheduledSource, runnable::invoke);
//将线程回调钩子存到任务配置管理器中
superScheduledConfig.addScheduledFuture(name, schedule);
logger.info(df.format(LocalDateTime.now()) + "任务" + name + "已经启动...");
catch (Exception e)
throw new SuperScheduledException("任务" + name + "启动失败,错误信息:" + e.getLocalizedMessage());
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException
this.applicationContext = applicationContext;
(4) 进行动态管理
@Component
public class SuperScheduledManager
protected final Log logger = LogFactory.getLog(getClass());
private DateTimeFormatter df = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
@Autowired
private SuperScheduledConfig superScheduledConfig;
/**
* 修改Scheduled的执行周期
*
* @param name scheduled的名称
* @param cron cron表达式
*/
public void setScheduledCron(String name, String cron)
//终止原先的任务
cancelScheduled(name);
//创建新的任务
ScheduledSource scheduledSource = superScheduledConfig.getScheduledSource(name);
scheduledSource.clear();
scheduledSource.setCron(cron);
addScheduled(name, scheduledSource);
/**
* 修改Scheduled的fixedDelay
*
* @param name scheduled的名称
* @param fixedDelay 上一次执行完毕时间点之后多长时间再执行
*/
public void setScheduledFixedDelay(String name, Long fixedDelay)
//终止原先的任务
cancelScheduled(name);
//创建新的任务
ScheduledSource scheduledSource = superScheduledConfig.getScheduledSource(name);
scheduledSource.clear();
scheduledSource.setFixedDelay(fixedDelay);
addScheduled(name, scheduledSource);
/**
* 修改Scheduled的fixedRate
*
* @param name scheduled的名称
* @param fixedRate 上一次开始执行之后多长时间再执行
*/
public void setScheduledFixedRate(String name, Long fixedRate)
//终止原先的任务
cancelScheduled(name);
//创建新的任务
ScheduledSource scheduledSource = superScheduledConfig.getScheduledSource(name);
scheduledSource.clear();
scheduledSource.setFixedRate(fixedRate);
addScheduled(name, scheduledSource);
/**
* 查询所有启动的Scheduled
*/
public List<String> getRunScheduledName()
Set<String> names = superScheduledConfig.getNameToScheduledFuture().keySet();
return new ArrayList<>(names);
/**
* 查询所有的Scheduled
*/
public List<String> getAllSuperScheduledName()
Set<String> names = superScheduledConfig.getNameToRunnable().keySet();
return new ArrayList<>(names);
/**
* 终止Scheduled
*
* @param name scheduled的名称
*/
public void cancelScheduled(String name)
ScheduledFuture scheduledFuture = superScheduledConfig.getScheduledFuture(name);
scheduledFuture.cancel(true);
superScheduledConfig.removeScheduledFuture(name);
logger.info(df.format(LocalDateTime.now()) + "任务" + name + "已经终止...");
/**
* 启动Scheduled
*
* @param name scheduled的名称
* @param scheduledSource 定时任务的源信息
*/
public void addScheduled(String name, ScheduledSource scheduledSource)
if (getRunScheduledName().contains(name))
throw new SuperScheduledException("定时任务" + name + "已经被启动过了");
if (!scheduledSource.check())
throw new SuperScheduledException("定时任务" + name + "源数据内容错误");
scheduledSource.refreshType();
Runnable runnable = superScheduledConfig.getRunnable(name);
ThreadPoolTaskScheduler taskScheduler = superScheduledConfig.getTaskScheduler();
ScheduledFuture<?> schedule = ScheduledFutureFactory.create(taskScheduler, scheduledSource, runnable);
logger.info(df.format(LocalDateTime.now()) + "任务" + name + "已经启动...");
superScheduledConfig.addScheduledSource(name, scheduledSource);
superScheduledConfig.addScheduledFuture(name, schedule);
/**
* 以cron类型启动Scheduled
*
* @param name scheduled的名称
* @param cron cron表达式
*/
public void addCronScheduled(String name, String cron)
ScheduledSource scheduledSource = new ScheduledSource();
scheduledSource.setCron(cron);
addScheduled(name, scheduledSource);
/**
* 以fixedDelay类型启动Scheduled
*
* @param name scheduled的名称
* @param fixedDelay 上一次执行完毕时间点之后多长时间再执行
* @param initialDelay 第一次执行的延迟时间
*/
public void addFixedDelayScheduled(String name, Long fixedDelay, Long... initialDelay)
ScheduledSource scheduledSource = new ScheduledSource();
scheduledSource.setFixedDelay(fixedDelay);
if (initialDelay != null && initialDelay.length == 1)
scheduledSource.setInitialDelay(initialDelay[0]);
else if (initialDelay != null && initialDelay.length > 1)
throw new SuperScheduledException("第一次执行的延迟时间只能传入一个参数");
addScheduled(name, scheduledSource);
/**
* 以fixedRate类型启动Scheduled
*
* @param name scheduled的名称
* @param fixedRate 上一次开始执行之后多长时间再执行
* @param initialDelay 第一次执行的延迟时间
*/
public void addFixedRateScheduled(String name, Long fixedRate, Long... initialDelay)
ScheduledSource scheduledSource = new ScheduledSource();
scheduledSource.setFixedRate(fixedRate);
if (initialDelay != null && initialDelay.length == 1)
scheduledSource.setInitialDelay(initialDelay[0]);
else if (initialDelay != null && initialDelay.length > 1)
throw new SuperScheduledException("第一次执行的延迟时间只能传入一个参数");
addScheduled(name, scheduledSource);
/**
* 手动执行一次任务
*
* @param name scheduled的名称
*/
public void runScheduled(String name)
Runnable runnable = superScheduledConfig.getRunnable(name);
runnable.run();
2、增强接口实现
增强器实现的整体思路与SpringAop的思路一致,实现没有Aop复杂
(1) 增强接口
@Order(Ordered.HIGHEST_PRECEDENCE)
public interface BaseStrengthen
/**
* 前置强化方法
*
* @param bean bean实例(或者是被代理的bean)
* @param method 执行的方法对象
* @param args 方法参数
*/
void before(Object bean, Method method, Object[] args);
/**
* 后置强化方法
* 出现异常不会执行
* 如果未出现异常,在afterFinally方法之后执行
*
* @param bean bean实例(或者是被代理的bean)
* @param method 执行的方法对象
* @param args 方法参数
*/
void after(Object bean, Method method, Object[] args);
/**
* 异常强化方法
*
* @param bean bean实例(或者是被代理的bean)
* @param method 执行的方法对象
* @param args 方法参数
*/
void exception(Object bean, Method method, Object[] args);
/**
* Finally强化方法,出现异常也会执行
*
* @param bean bean实例(或者是被代理的bean)
* @param method 执行的方法对象
* @param args 方法参数
*/
void afterFinally(Object bean, Method method, Object[] args);
(2) 代理抽象类
public abstract class Point
/**
* 定时任务名
*/
private String superScheduledName;
/**
* 抽象的执行方法,使用代理实现
* @param runnable 定时任务执行器
*/
public abstract Object invoke(SuperScheduledRunnable runnable);
/* 普通的get/sets省略 */
(3) 调用链类
public class Chain
private List<Point> list;
private int index = -1;
/**
* 索引自增1
*/
public int incIndex()
return ++index;
/**
* 索引还原
*/
public void resetIndex()
this.index = -1;
(4) cglib动态代理实现
使用cglib代理增强器,将增强器全部代理成调用链节点Point
public class RunnableBaseInterceptor implements MethodInterceptor
/**
* 定时任务执行器
*/
private SuperScheduledRunnable runnable;
/**
* 定时任务增强类
*/
private BaseStrengthen strengthen;
@Override
public Object intercept(Object obj, Method method, Object[] args, MethodProxy methodProxy) throws Throwable
Object result;
//如果执行的是invoke()方法
if ("invoke".equals(method.getName()))
//前置强化方法
strengthen.before(obj, method, args);
try
//调用执行器中的invoke()方法
result = runnable.invoke();
catch (Exception e)
//异常强化方法
strengthen.exception(obj, method, args);
throw new SuperScheduledException(strengthen.getClass() + "中强化执行时发生错误", e);
finally
//Finally强化方法,出现异常也会执行
strengthen.afterFinally(obj, method, args);
//后置强化方法
strengthen.after(obj, method, args);
else
//直接执行方法
result = methodProxy.invokeSuper(obj, args);
return result;
public RunnableBaseInterceptor(Object object, SuperScheduledRunnable runnable)
this.runnable = runnable;
if (BaseStrengthen.class.isAssignableFrom(object.getClass()))
this.strengthen = (BaseStrengthen) object;
else
throw new SuperScheduledException(object.getClass() + "对象不是BaseStrengthen类型");
public RunnableBaseInterceptor()
(5) 定时任务执行器实现
public class SuperScheduledRunnable
/**
* 原始的方法
*/
private Method method;
/**
* 方法所在的bean
*/
private Object bean;
/**
* 增强器的调用链
*/
private Chain chain;
public Object invoke()
Object result;
//索引自增1
if (chain.incIndex() == chain.getList().size())
//调用链中的增强方法已经全部执行结束
try
//调用链索引初始化
chain.resetIndex();
//增强器全部执行完毕,执行原本的方法
result = method.invoke(bean);
catch (IllegalAccessException | InvocationTargetException e)
throw new SuperScheduledException(e.getLocalizedMessage());
else
//获取被代理后的方法增强器
Point point = chain.getList().get(chain.getIndex());
//执行增强器代理
//增强器代理中,会回调方法执行器,形成调用链,逐一运行调用链中的增强器
result = point.invoke(this);
return result;
/* 普通的get/sets省略 */
(6) 增强器代理逻辑
com.gyx.superscheduled.core.SuperScheduledApplicationRunner
类中的代码片段
//创建执行控制器
SuperScheduledRunnable runnable = new SuperScheduledRunnable();
runnable.setMethod(scheduledSource.getMethod());
runnable.setBean(scheduledSource.getBean());
//用来存放 增强器的代理对象
List<Point> points = new ArrayList<>(baseStrengthenBeanNames.length);
//循环所有的增强器的beanName
for (String baseStrengthenBeanName : baseStrengthenBeanNames)
//获取增强器的bean对象
Object baseStrengthenBean = applicationContext.getBean(baseStrengthenBeanName);
//将增强器代理成Point节点
Point proxy = ProxyUtils.getInstance(Point.class, new RunnableBaseInterceptor(baseStrengthenBean, runnable));
proxy.setSuperScheduledName(name);
//增强器的代理对象缓存到list中
points.add(proxy);
//将增强器代理实例的集合生成调用链
//执行控制器中设置调用链
runnable.setChain(new Chain(points));
来源:blog.csdn.net/qq_34886352/article/details/106494637
推荐
PS:因为公众号平台更改了推送规则,如果不想错过内容,记得读完点一下“在看”,加个“星标”,这样每次新文章推送才会第一时间出现在你的订阅列表里。点“在看”支持我们吧!
以上是关于SpringBoot 定时任务动态管理通用解决方案的主要内容,如果未能解决你的问题,请参考以下文章