spring-boot-starter-quartz集群实践

Posted eelve

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了spring-boot-starter-quartz集群实践相关的知识,希望对你有一定的参考价值。

【**前情提要**】由于项目需要,需要一个定时任务集群,故此有了这个spring-boot-starter-quartz集群的实践。springboot的版本为:2.1.6.RELEASE;quartz的版本为:2.3.1.假如这里一共有两个定时任务的节点,它们的代码完全一样。 --- # 壹.jar包依赖 ~~~pom 1.8 org.springframework.boot spring-boot-starter org.springframework.boot spring-boot-starter-quartz mysql mysql-connector-java runtime org.springframework.boot spring-boot-starter-jdbc org.projectlombok lombok true org.springframework.boot spring-boot-starter-test test ~~~ 这里选择将定时任务的数据入库,避免数据直接存在内存中,因应用重启造成的数据丢失和做集群控制。 # 贰、项目配置 ~~~yaml spring: server: port: 8080 servlet: context-path: /lovin datasource: url: jdbc:mysql://127.0.0.1:3306/training?serverTimezone=UTC&useUnicode=true&characterEncoding=utf-8&useSSL=true username: root password: root driver-class-name: com.mysql.cj.jdbc.Driver quartz: job-store-type: jdbc #数据库方式 jdbc: initialize-schema: never #不初始化表结构 properties: org: quartz: scheduler: instanceId: AUTO #默认主机名和时间戳生成实例ID,可以是任何字符串,但对于所有调度程序来说,必须是唯一的 对应qrtz_scheduler_state INSTANCE_NAME字段 #instanceName: clusteredScheduler #quartzScheduler jobStore: class: org.quartz.impl.jdbcjobstore.JobStoreTX #持久化配置 driverDelegateClass: org.quartz.impl.jdbcjobstore.StdJDBCDelegate #我们仅为数据库制作了特定于数据库的代理 useProperties: false #以指示JDBCJobStore将JobDataMaps中的所有值都作为字符串,因此可以作为名称 - 值对存储而不是在BLOB列中以其序列化形式存储更多复杂的对象。从长远来看,这是更安全的,因为您避免了将非String类序列化为BLOB的类版本问题。 tablePrefix: qrtz_ #数据库表前缀 misfireThreshold: 60000 #在被认为“失火”之前,调度程序将“容忍”一个Triggers将其下一个启动时间通过的毫秒数。默认值(如果您在配置中未输入此属性)为60000(60秒)。 clusterCheckinInterval: 5000 #设置此实例“检入”*与群集的其他实例的频率(以毫秒为单位)。影响检测失败实例的速度。 isClustered: true #打开群集功能 threadPool: #连接池 class: org.quartz.simpl.SimpleThreadPool threadCount: 10 threadPriority: 5 threadsInheritContextClassLoaderOfInitializingThread: true ~~~ **这里需要注意的是两个节点的端口号应该不一致,避免冲突** # 叁、实现一个Job ~~~java @Slf4j public class Job extends QuartzJobBean @Override protected void executeInternal(JobExecutionContext jobExecutionContext) throws JobExecutionException // 获取参数 JobDataMap jobDataMap = jobExecutionContext.getJobDetail().getJobDataMap(); // 业务逻辑 ... log.info("------springbootquartzonejob执行"+jobDataMap.get("name").toString()+"###############"+jobExecutionContext.getTrigger()); ~~~ **其中的日志输出是为了便于观察任务执行情况** # 肆、封装定时任务操作 ~~~java @Service public class QuartzService @Autowired private Scheduler scheduler; @PostConstruct public void startScheduler() try scheduler.start(); catch (SchedulerException e) e.printStackTrace(); /** * 增加一个job * * @param jobClass * 任务实现类 * @param jobName * 任务名称 * @param jobGroupName * 任务组名 * @param jobTime * 时间表达式 (这是每隔多少秒为一次任务) * @param jobTimes * 运行的次数 (<0:表示不限次数) * @param jobData * 参数 */ public void addJob(Class jobClass, String jobName, String jobGroupName, int jobTime, int jobTimes, Map jobData) try // 任务名称和组构成任务key JobDetail jobDetail = JobBuilder.newJob(jobClass).withIdentity(jobName, jobGroupName) .build(); // 设置job参数 if(jobData!= null && jobData.size()>0) jobDetail.getJobDataMap().putAll(jobData); // 使用simpleTrigger规则 Trigger trigger = null; if (jobTimes < 0) trigger = TriggerBuilder.newTrigger().withIdentity(jobName, jobGroupName) .withSchedule(SimpleScheduleBuilder.repeatSecondlyForever(1).withIntervalInSeconds(jobTime)) .startNow().build(); else trigger = TriggerBuilder .newTrigger().withIdentity(jobName, jobGroupName).withSchedule(SimpleScheduleBuilder .repeatSecondlyForever(1).withIntervalInSeconds(jobTime).withRepeatCount(jobTimes)) .startNow().build(); scheduler.scheduleJob(jobDetail, trigger); catch (SchedulerException e) e.printStackTrace(); /** * 增加一个job * * @param jobClass * 任务实现类 * @param jobName * 任务名称(建议唯一) * @param jobGroupName * 任务组名 * @param jobTime * 时间表达式 (如:0/5 * * * * ? ) * @param jobData * 参数 */ public void addJob(Class jobClass, String jobName, String jobGroupName, String jobTime, Map jobData) try // 创建jobDetail实例,绑定Job实现类 // 指明job的名称,所在组的名称,以及绑定job类 // 任务名称和组构成任务key JobDetail jobDetail = JobBuilder.newJob(jobClass).withIdentity(jobName, jobGroupName) .build(); // 设置job参数 if(jobData!= null && jobData.size()>0) jobDetail.getJobDataMap().putAll(jobData); // 定义调度触发规则 // 使用cornTrigger规则 // 触发器key Trigger trigger = TriggerBuilder.newTrigger().withIdentity(jobName, jobGroupName) .startAt(DateBuilder.futureDate(1, IntervalUnit.SECOND)) .withSchedule(CronScheduleBuilder.cronSchedule(jobTime)).startNow().build(); // 把作业和触发器注册到任务调度中 scheduler.scheduleJob(jobDetail, trigger); catch (Exception e) e.printStackTrace(); /** * 修改 一个job的 时间表达式 * * @param jobName * @param jobGroupName * @param jobTime */ public void updateJob(String jobName, String jobGroupName, String jobTime) try TriggerKey triggerKey = TriggerKey.triggerKey(jobName, jobGroupName); CronTrigger trigger = (CronTrigger) scheduler.getTrigger(triggerKey); trigger = trigger.getTriggerBuilder().withIdentity(triggerKey) .withSchedule(CronScheduleBuilder.cronSchedule(jobTime)).build(); // 重启触发器 scheduler.rescheduleJob(triggerKey, trigger); catch (SchedulerException e) e.printStackTrace(); /** * 删除任务一个job * * @param jobName * 任务名称 * @param jobGroupName * 任务组名 */ public void deleteJob(String jobName, String jobGroupName) try scheduler.deleteJob(new JobKey(jobName, jobGroupName)); catch (Exception e) e.printStackTrace(); /** * 暂停一个job * * @param jobName * @param jobGroupName */ public void pauseJob(String jobName, String jobGroupName) try JobKey jobKey = JobKey.jobKey(jobName, jobGroupName); scheduler.pauseJob(jobKey); catch (SchedulerException e) e.printStackTrace(); /** * 恢复一个job * * @param jobName * @param jobGroupName */ public void resumeJob(String jobName, String jobGroupName) try JobKey jobKey = JobKey.jobKey(jobName, jobGroupName); scheduler.resumeJob(jobKey); catch (SchedulerException e) e.printStackTrace(); /** * 立即执行一个job * * @param jobName * @param jobGroupName */ public void runAJobNow(String jobName, String jobGroupName) try JobKey jobKey = JobKey.jobKey(jobName, jobGroupName); scheduler.triggerJob(jobKey); catch (SchedulerException e) e.printStackTrace(); /** * 获取所有计划中的任务列表 * * @return */ public List> queryAllJob() List> jobList = null; try GroupMatcher matcher = GroupMatcher.anyJobGroup(); Set jobKeys = scheduler.getJobKeys(matcher); jobList = new ArrayList>(); for (JobKey jobKey : jobKeys) List triggers = scheduler.getTriggersOfJob(jobKey); for (Trigger trigger : triggers) Map map = new HashMap<>(); map.put("jobName", jobKey.getName()); map.put("jobGroupName", jobKey.getGroup()); map.put("description", "触发器:" + trigger.getKey()); Trigger.TriggerState triggerState = scheduler.getTriggerState(trigger.getKey()); map.put("jobStatus", triggerState.name()); if (trigger instanceof CronTrigger) CronTrigger cronTrigger = (CronTrigger) trigger; String cronExpression = cronTrigger.getCronExpression(); map.put("jobTime", cronExpression); jobList.add(map); catch (SchedulerException e) e.printStackTrace(); return jobList; /** * 获取所有正在运行的job * * @return */ public List> queryRunJob() List> jobList = null; try List executingJobs = scheduler.getCurrentlyExecutingJobs(); jobList = new ArrayList>(executingJobs.size()); for (JobExecutionContext executingJob : executingJobs) Map map = new HashMap(); JobDetail jobDetail = executingJob.getJobDetail(); JobKey jobKey = jobDetail.getKey(); Trigger trigger = executingJob.getTrigger(); map.put("jobName", jobKey.getName()); map.put("jobGroupName", jobKey.getGroup()); map.put("description", "触发器:" + trigger.getKey()); Trigger.TriggerState triggerState = scheduler.getTriggerState(trigger.getKey()); map.put("jobStatus", triggerState.name()); if (trigger instanceof CronTrigger) CronTrigger cronTrigger = (CronTrigger) trigger; String cronExpression = cronTrigger.getCronExpression(); map.put("jobTime", cronExpression); jobList.add(map); catch (SchedulerException e) e.printStackTrace(); return jobList; ~~~ # 陆、初始化任务 这里不准备给用户用web界面来配置定时任务,故此采用**CommandLineRunner**来子啊应用初始化的时候来初始化任务。只需要实现CommandLineRunner的run()方法即可。 ~~~java @Override public void run(String... args) throws Exception HashMap map = new HashMap<>(); map.put("name",1); quartzService.deleteJob("job", "test"); quartzService.addJob(Job.class, "job", "test", "0 * * * * ?", map); map.put("name",2); quartzService.deleteJob("job2", "test"); quartzService.addJob(Job.class, "job2", "test", "10 * * * * ?", map); map.put("name",3); quartzService.deleteJob("job3", "test2"); quartzService.addJob(Job.class, "job3", "test2", "15 * * * * ?", map); ~~~ # 柒、测试验证 分别夏侯启动两个应用,然后观察任务执行,以及在运行过程中杀死某个服务,来观察定时任务的执行。 ![SpringbootquartzoneApplication](https://img2018.cnblogs.com/blog/1035487/201908/1035487-20190810230438049-1371830870.png) ![SpringbootquartztwoApplication](https://img2018.cnblogs.com/blog/1035487/201908/1035487-20190810230443158-660952439.png) 【**写在后面的话**】下面给出的是所需要脚本的连接地址:[脚本下载地址](http://www.quartz-scheduler.org/downloads/),另外这边又一个自己实现的[demo](https://github.com/eelve/springbootquartzs.git)

以上是关于spring-boot-starter-quartz集群实践的主要内容,如果未能解决你的问题,请参考以下文章