Quartz框架汇总
Posted H2-134
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Quartz框架汇总相关的知识,希望对你有一定的参考价值。
目录
(二)SingleThreadScheduledExecutor
(三)Scheduler:调度器,基于trigger的设定执行job
(一)JobDetailFactoryBean或者CronTriggerFactoryBean
一.Quartz理论基础
(一)Timer
1.Quartz中的TaskQueue就是小顶堆的结构,用来存放timeTask
2.TimerThread:任务执行线程,死循环不断检查是否有有任务开始执行了,有就还是在这个线程执行它
3.单线程执行任务,有任务肯互相阻塞
schedule:执行任务超时,会导致后面的任务往后推移,预想在这个间隔内存存在的任务执行就不会执行了,就没有了
scheduleAtFixedRate:任务超时可能导致下一个任务马上开始执行
4.运行时异常会导致timer线程终止
5.任务调度是基于时间的,对系统时间敏感
二.线程池
(一)ScheduledThreadPoolExcutor
1.使用多线程执行任务,不会互相阻塞
2.如果线程失活,会新建线程执行任务
线程抛异常,任务会被丢弃,需要做捕获处理
3.DelayedWorkQueue:小顶堆,无界队列
在定时线程池当中,最大线程数是没有意义的
执行时间距离当前时间越近的任务在队列的前面
用于添加ScheduleFutureTask(继承于FutureTask,实现RunnableScheduledFuture接口)【提供异步执行的能力,并且可以返回执行】
线程池中的线程从DelayQueue中获取ScheduleFutureTask,然后执行任务
实现了Delayed接口,可以通过getDelay方法来获取延迟时间
4.Leader-Follower模式:避免没必要的唤醒和阻塞操作,这样会更加有效,且节省资源
5.应用场景:适用于多个后台线程执行周期性任务,同时为了满足资源管理的需求二需要限制后台线程数量
(二)SingleThreadScheduledExecutor
1.单线程的ScheduleThreadPoolExector
2.应用场景:适用于需要单个后台线程执行周期任务,同时需要保证任务顺序执行
二.quartz使用
(一) Job:封装成JobDetail设置属性
@DisallowConcurrentExecution //禁止并发的执行同一个job的定义的多个实例(jobDetial定义的) @PersistJobDataAfterExecution //持久化JobDetail中的JobDetailMap(对Trigger中的dataMap无效) //如果一个任务不是持久化的,则当没有触发器关联他的时候,Quartz会从scheduler中删除掉他boolean recovering = jobExecutionContext.isRecovering(); //如果一个任务请求恢复,一般是该任务执行期间发生了系统崩溃或其他关闭进程的操作 //当服务再次启动的时候,会再次执行该任务,此时jobExecutionContext.isRecovering()会返回true
@DisallowConcurrentExecution
@PersistJobDataAfterExecution
public class TestJob implements Job
@Override
public void execute(JobExecutionContext jobExecutionContext)
throws JobExecutionException
(二)Trigger:触发器:指定执行时间,开始结束时间等
1.优先级:同时触发的Trigger之间才会比较优先级
如果trigger是可恢复的,再恢复后再调度时,优先级不变
2.错过触发:
判断misfire的条件:
(1)job到达触发时间没有被执行
(2)被执行的延迟时间超过Quartz配置的misfireThreshold阈值
可能原因:
1.当job达到触发时间时,所有的线程都被其他job占用,没有可用线程
2.在job需要触发的时间点,schedule停止了(可能是意外停止的)
3.job使用了@DisallowConcurrentExecution注解,job不能并发执行,当达到下一个job执行点的时候,上一个任务还没没有完成
4.job指定了过去开始的执行时间,例如当前时间是8:00:00指定开始的时间为7:00:00
策略:
1.默认使用 MISFIRE_INSTRUCTION_SMART_POLICY策略
2.SimpleTrigger:
1)now*相关的策略,会立即执行第一个misfire任务,同时修改startTime和repeatCount, 一次会重新计算finalFireTime,原计划时间会被打乱
2)next*相关的策略,不会立即执行misfire的任务,也不会修改startTime和repeatCount, 因此finalFireTime也没有发生改变,发生了misfire也是按照原计划进行执行
3.CronTrigger:
1)MISFIRE_INSTRUCTION_SMART_POLICY: Quartz不会判断发生了misfire,立即执行所有发生了misfire的任务,然后按照原计划进行执行。 例如:10:15分立即执行9:00和10:O0的任务,然后等待下一个任务在11:00执行,后续按照原计划执行。 2)MISFIRE_INSTRUCTION_FIRE_ONCE_NOW: 立即执行第一个发生misfire的任务,忽略其他发生misfire的任务,然后按照原计划继续执行。 例如:在10:15立即执行9:00任务,忽略10:00任务,然后等待下一个任务在11:00执行,后续按照原计划执行。 3)MISFIRE_INSTRUCTION_DO_NOTHING: 所有发生的misfire的任务都被忽略,只是按照原计划继续执行
calender: 用于排除时间段
SimpleTrigger:具体时间点,指定间隔重复执行
CronTrigger:cron表达式
(三)Scheduler:调度器,基于trigger的设定执行job
1.SchedulerFactory:
(1)创建Scheduler
(2)DirectSchedulerFactory:在代码里定制Schedule参数
(3)StdSchedulerFactory:读取classpath下的quartz.properties配置来实例化Scheduler
2.JobStore:
存储运行时信息的,包括Trigger,Scheduler,JobDetail,业务锁等
RAMJobStore(内存实现)
JobStoreTX(JDBC,事务由Quartz管理)
JobStoreCMT(JDBC,使用容器事务)
ClusteredJobStore(集群实现)
TerracottaJobStore(Terracotta中间件)
3.ThreadPool:SimpleThreadPool 、自定义线程池
(四)JobDetailMap:保存任务实例的状态信息
1.jobDetail:
默认只在Job被添加到调度程序(任务执行计划表)schedule的时候,存储一次关于该任务的状态信息数据,可以使用注解@PersistJobDataAfterExecution标明在一个任务执行完毕后就存储一次
2.trigger
任务被多个触发器引用的时候,根据不同的触发时机,可以输入不同的输入条件
所以,JobDataMap在Job和Trigger存储的数据并不一致。
三.Quartz集群原理
Quartz急群众每个节点是一个独立的Quartz任务应用,它又管理者其他节点。该集群需要分别对每个节点分别启动或停止,独立的Quartz节点并不与另一个节点或是管理节点通信。Quartz应用是通过共有相同数据库表来感知到另一应用。也就是说只有使用持久化JobStore存储Job和Trigger才能完成Quartz集群。
Quartz的集群部署方案是分布式的,没有负责集中管理的节点,而是利用数据库杭锁的方式来实现集群环境下的并发控制。
一个scheduler实例在集群模式下首先获取0LOCKS表中的行锁;
Quartz主要由两个行级锁。
lock_name | desc |
---|---|
STATE_ACCESS | 状态访问锁 |
TRIGGER_ACCESS | 触发器访问锁 |
Quartz集群争用触发器行锁,锁被占用只能等待,获取触发器行锁之后,先获取需要等待触发的其他触发器信息。数据库更新触发器状态信息,及时是否触发器行锁,供其他调度实例获取,然后在进行触发器任务调度操作,对数据库操作就要先获取行锁。
同一集群下,instanceName必须相同,instanceId可自动生成,isClustered为true,持久化存储,指定数据库类型对应的驱动类和数据源连接。
注意问题
1. 时间同步问题
Quartz实际并不关心你是在相同还是不同的机器上运行节点。当集群放置在不同的机器上时,称之为水平集群。节点跑在同一台机器上时,称之为垂直集群。对于垂直集群,存在着单点故障的问题。这对高可用性的应用来说是无法接受的,因为一旦机器崩溃了,所有的节点也就被终止了。对于水平集群,存在着时间同步问题。
节点用时间戳来通知其他实例它自己的最后检入时间。假如节点的时钟被设置为将来的时间,那么运行中的Scheduler将再也意识不到那个结点已经宕掉了。另一方面,如果某个节点的时钟被设置为过去的时间,也许另一节点就会认定那个节点已宕掉并试图接过它的Job重运行。最简单的同步计算机时钟的方式是使用某一个Internet时间服务器(Internet Time Server ITS)。
2. 节点争抢Job问题
因为Quartz使用了一个随机的负载均衡算法,Job以随机的方式由不同的实例执行。Quartz官网上提到当前,还不存在一个方法来指派(钉住) 一个 Job 到集群中特定的节点。
四.Quartz实现异步通知
1. 一个Job绑定多个触发器
一个Trigger只能绑定一个Job,但是一个Job可以绑定多个Trigger。
那么,若是我们将一个Job上绑定多个触发器,且每个触发器只是触发一次的话,那么,实际上可以实现阶梯式的异步通知,下面的基础代码实现中监听器StartApplicationListener就使用了异步输出。
五.创建一个任务
(一)JobDetailFactoryBean或者CronTriggerFactoryBean
JobDetailFactoryBean或者CronTriggerFactoryBean都实现了FactoryBean接口和InitializingBean接口。虽然我们new JobDetailFactoryBean(),但是实际上是将JobDetail交由的IOC管理。而InitializingBean接口会在属性装载完毕之后,自动的回调afterPropertiesSet()
方法,完成Bean对象的最终构建:
@Bean
public JobDetailFactoryBean jobDetail()
//查询数据库或者配置文件
JobDetailFactoryBean jobDetailFactoryBean=new JobDetailFactoryBean();
jobDetailFactoryBean.setName("");
jobDetailFactoryBean.setBeanName("");
jobDetailFactoryBean.setJobClass((Class<? extends Job>) aClass);
jobDetailFactoryBean.setGroup("");
jobDetailFactoryBean.setDurability(true);
return jobDetailFactoryBean;
或者最后直接进行
JobDetail jobDetail= jobFactory.getObject();
(二)使用builder创建Job或者Trigger
// 定义任务调度实例, 并与TestJob绑定
JobDetail job = JobBuilder.newJob(TestJob.class)
.withIdentity("testJob", "testJobGroup")//调度器的名字和组别
.usingJobData("job","jobDetail")//给调度器当中定义key并赋值,相当于参数,之后可以在job中取到
.storeDurably() //表示当没有触发器与之关联时,仍然将job继续保存在Scheduler中
.build();
// 定义触发器, 会马上执行一次, 接着5秒执行一次
Trigger trigger = TriggerBuilder.newTrigger()
.withIdentity("testTrigger", "testTriggerGroup")//触发器的名字和分组
.startNow()//定义好后马上就启动
.withSchedule(SimpleScheduleBuilder.repeatSecondlyForever(5))//按照一定的策略触发
.usingJobData("job","jobDedfafafafafdsfal")//给触发器当中定义key并赋值,相当于参数,之后可
以在job中取到
//.repeatForever()表示一直执行
.forJob(jobKey.getName(), jobKey.getGroup()) //制定Trigger和Job的关联关系
.usingJobData(jobDataMap) //具体执行的方法中可以拿到这个传进去的信息。
.build();
六.监听机制
Quartz的监听器用于当任务调度中关注的事件发生时,能够及时获取这一事件的通知。
- Quartz监听的种类:
- JobListener:任务监听;
- TriggerListener:触发器监听;
- SchedulerListener:调度器监听;
- 监听器的作用域
- 全局监听器:能够接收到所有的Job/Trigger的事件通知;
- 局部监听器:只能接收在其上注册Job或者Trigger的事件;
1.JobListener
public interface JobListener
//获取该JobListener的名称
String getName();
//Scheduler在JobDetail将要被执行时调用该方法
void jobToBeExecuted(JobExecutionContext context);
//Scheduler在JobDetail将要被执行时,但又被TriggerListener否决调用
void jobExecutionVetoed(JobExecutionContext context);
//任务执行完毕调用该方法
void jobWasExecuted(JobExecutionContext context,
JobExecutionException jobException);
将JobListener绑定到Scheduler中:
//监听所有的Job
scheduler.getListenerManager().addJobListener(new SimpleJobListener(), EverythingMatcher.allJobs());
//监听特定的Job
scheduler.getListenerManager().addJobListener(new SimpleJobListener(), KeyMatcher.keyEquals(JobKey.jobKey("HelloWorld1_Job", "HelloWorld1_Group")));
//监听同一任务组的Job
scheduler.getListenerManager().addJobListener(new SimpleJobListener(), GroupMatcher.jobGroupEquals("HelloWorld2_Group"));
//监听两个任务组的Job
scheduler.getListenerManager().addJobListener(new SimpleJobListener(), OrMatcher.or(GroupMatcher.jobGroupEquals("HelloWorld1_Group"), GroupMatcher.jobGroupEquals("HelloWorld2_Group")));
2.TriggerListener
触发器监听,即在任务调度过程中,与触发器Trigger相关的事件:触发器触发、触发器未正常触发、触发器触发完成等。
public interface TriggerListener
//获取触发器的名字
public String getName();
//Job的execute()方法被调用时调用该方法。
public void triggerFired(Trigger trigger, JobExecutionContext context);
//Trigger触发后,TriggerListener给了一个选择否定Job的执行。假如该方法返回true,该Job将不会被触发
public boolean vetoJobExecution(Trigger trigger, JobExecutionContext context);
//Trigger错过触发时间触发该方法,此方法不应该含有长时间的处理逻辑。
public void triggerMisfired(Trigger trigger);
//Trigger被触发并且完成Job后触发。
public void triggerComplete(Trigger trigger, JobExecutionContext context,
int triggerInstructionCode);
将TriggerListener绑定到Scheduler中:
//监听所有的Trigger
scheduler.getListenerManager().addTriggerListener(new SimpleTriggerListener("SimpleTrigger"), EverythingMatcher.allTriggers());
//监听特定的Trigger
scheduler.getListenerManager().addTriggerListener(new SimpleTriggerListener("SimpleTrigger"), KeyMatcher.keyEquals(TriggerKey.triggerKey("HelloWord1_Job", "HelloWorld1_Group")));
//监听一组Trigger
scheduler.getListenerManager().addTriggerListener(new SimpleTriggerListener("SimpleTrigger"), GroupMatcher.groupEquals("HelloWorld1_Group"));
//移除监听器
scheduler.getListenerManager().removeTriggerListener("SimpleTrigger");
3.SchedulerListener
SchedulerListener会在Scheduler的生命周期关键事件发生时调用。与Scheduler有关的事件包括:增加一个job/trigger,删除一个job/Trigger,scheduler发生严重错误,关闭Scheduler等。
public interface SchedulerListener
//用于部署JobDetail时调用
public void jobScheduled(Trigger trigger);
//用于卸载JobDetail时调用
public void jobUnscheduled(String triggerName, String triggerGroup);
//当一个Trigger没有触发次数时调用。
public void triggerFinalized(Trigger trigger);
public void triggersPaused(String triggerName, String triggerGroup);
public void triggersResumed(String triggerName, String triggerGroup);
//当一个或一组 JobDetail 暂停时调用这个方法。
public void jobsPaused(String jobName, String jobGroup);
//当一个或一组 Job 从暂停上恢复时调用这个方法。假如是一个 Job 组,jobName 参数将为 null。
public void jobsResumed(String jobName, String jobGroup);
//在 Scheduler 的正常运行期间产生一个严重错误时调用这个方法。
public void schedulerError(String msg, SchedulerException cause);
//当Scheduler 开启时,调用该方法
public void schedulerStarted();
//当Scheduler处于StandBy模式时,调用该方法
public void schedulerInStandbyMode();
//当Scheduler停止时,调用该方法
public void schedulerShutdown();
//当Scheduler中的数据被清除时,调用该方法。
public void schedulingDataCleared();
将SchedulerListener绑定到Scheduler中:
//创建监听
scheduler.getListenerManager().addSchedulerListener(new SimpleSchedulerListener());
//移除监听
scheduler.getListenerManager().removeSchedulerListener(new SimpleSchedulerListener());
七.基础代码实现
Start.java
@SpringBootApplication
public class Start
public static void main(String[] args)
SpringApplication.run(Start.class,args);
QuartzConfiguration.java
@Configuration
public class QuartzConfiguration
@Autowired
private DataSource dataSource;
@Bean
public Scheduler scheduler() throws IOException
return schedulerFactoryBean().getScheduler();
@Bean
public Properties quartzProperties() throws IOException
PropertiesFactoryBean propertiesFactoryBean = new PropertiesFactoryBean();
propertiesFactoryBean.setLocation(new ClassPathResource("/application.yml"));
propertiesFactoryBean.afterPropertiesSet();
return propertiesFactoryBean.getObject();
@Bean
public Executor schedulerThreadPool()
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(Runtime.getRuntime().availableProcessors());
executor.setMaxPoolSize(Runtime.getRuntime().availableProcessors());
executor.setQueueCapacity(Runtime.getRuntime().availableProcessors());
return executor;
@Bean
public SchedulerFactoryBean schedulerFactoryBean() throws IOException
SchedulerFactoryBean factory = new SchedulerFactoryBean();
factory.setSchedulerName("cluster_scheduler");
factory.setDataSource(dataSource);
factory.setApplicationContextSchedulerContextKey("application");
factory.setQuartzProperties(quartzProperties());
factory.setTaskExecutor(schedulerThreadPool());
factory.setStartupDelay(10);
return factory;
StartApplicationListener.java
@Component
//@Controller也可以
public class StartApplicationListener implements ApplicationListener<ContextRefreshedEvent>
@Autowired
private Scheduler scheduler;
@Override
public void onApplicationEvent(ContextRefreshedEvent event)
try
TriggerKey triggerKey1 = TriggerKey.triggerKey("trigger1","group1");
Trigger trigger1 = scheduler.getTrigger(triggerKey1);
if (trigger1 ==null)
trigger1 = TriggerBuilder.newTrigger()
.withIdentity(triggerKey1)
.withSchedule(CronScheduleBuilder.cronSchedule("0/3 * * * * ?"))//cron表达式,每个10秒执行一次
.build();
JobDetail jobDetail = JobBuilder.newJob(TestJob.class)
.withIdentity("job1","group1")
.build();
scheduler.scheduleJob(jobDetail,trigger1);
TriggerKey triggerKey2 = TriggerKey.triggerKey("trigger2","group2");
Trigger trigger2 = scheduler.getTrigger(triggerKey2);
if (trigger2 ==null)
trigger2 = TriggerBuilder.newTrigger()
.withIdentity(triggerKey2)
.withSchedule(CronScheduleBuilder.cronSchedule("0/3 * * * * ?"))//cron表达式,每个10秒执行一次
.build();
JobDetail jobDetail = JobBuilder.newJob(TestJob.class)
.withIdentity("job2", "group2")
.build();
scheduler.scheduleJob(jobDetail, trigger2);
scheduler.start();
catch (SchedulerException e)
e.printStackTrace();
从Start.java程序入口进入程序,俩个trigger一起异步输出如下
//间隔3s输出一次
开启时间 :2022-10-28 16:50:27
开启时间 :2022-10-28 16:50:27
开启时间 :2022-10-28 16:50:30
开启时间 :2022-10-28 16:50:30
开启时间 :2022-10-28 16:50:33
开启时间 :2022-10-28 16:50:33
开启时间 :2022-10-28 16:50:36
开启时间 :2022-10-28 16:50:36进程已结束,退出代码为 -1
八.数据库各表及其作用
1.qrtz_job_details:记录每个任务的详细信息。 2.qrtz_triggers:记录每个触发器的详细信息。 3.qrtz_corn_triggers:记录cornTrigger的信息。 4.qrtz_scheduler_state:记录 调度器(每个机器节点)的生命状态。 5.qrtz_fired_triggers:记录每个正在执行的触发器。 6. qrtz_simple_triggers:存储简单的trigger,包括重复次数,间隔,以及触发次数。 7. qrtz_simprop_triggers:存储CalendarIntervalTrigger和DailyTimeIntervalTrigger两种类型的触发器, 8. qrtz_blob_triggers:自定义的triggers使用blog类型进行存储,非自定义的triggers不会存放在此表中, Quartz提供的triggers包括:CronTrigger,CalendarIntervalTrigger,DailyTimeIntervalTrigger以及SimpleTrigger。 9. qrtz_calendars:以 Blob 类型存储 Quartz 的 Calendar 信息 10. qrtz_paused_trigger_grps:存储已暂停的 Trigger 组的信息 11.qrtz_locks:记录程序的悲观锁(防止多个节点同时执行同一个定时任务)。
本笔记基于图灵的课堂和https://www.jianshu.com/p/ece144448134这位大佬的笔记
以上是关于Quartz框架汇总的主要内容,如果未能解决你的问题,请参考以下文章