Quartz源码解读-任务是如何定时执行的
Posted 低调的洋仔
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Quartz源码解读-任务是如何定时执行的相关的知识,希望对你有一定的参考价值。
例子
import org.quartz.*;
import org.quartz.impl.StdSchedulerFactory;
import java.util.Date;
/**
* quartz定时器测试
*
* @author leizhimin 2009-7-23 8:49:01
*/
class MyJob implements Job
public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException
System.out.println(new Date() + ": doing something...");
class Test
public static void main(String[] args)
//1、创建JobDetial对象
JobDetail jobDetail = new JobDetail();
//设置工作项
jobDetail.setJobClass(MyJob.class);
jobDetail.setName("MyJob_1");
jobDetail.setGroup("JobGroup_1");
//2、创建Trigger对象
SimpleTrigger strigger = new SimpleTrigger();
strigger.setName("Trigger_1");
strigger.setGroup("Trigger_Group_1");
strigger.setStartTime(new Date());
//设置重复停止时间,并销毁该Trigger对象
java.util.Calendar c = java.util.Calendar.getInstance();
c.setTimeInMillis(System.currentTimeMillis() + 1000 * 1L);
strigger.setEndTime(c.getTime());
strigger.setFireInstanceId("Trigger_1_id_001");
//设置重复间隔时间
strigger.setRepeatInterval(1000 * 1L);
//设置重复执行次数
strigger.setRepeatCount(3);
//3、创建Scheduler对象,并配置JobDetail和Trigger对象
SchedulerFactory sf = new StdSchedulerFactory();
Scheduler scheduler = null;
try
scheduler = sf.getScheduler();
scheduler.scheduleJob(jobDetail, strigger);
//4、并执行启动、关闭等操作
scheduler.start();
catch (SchedulerException e)
e.printStackTrace();
// try
// //关闭调度器
// scheduler.shutdown(true);
// catch (SchedulerException e)
// e.printStackTrace();
//
先创建好JobDetail类的实例,这个实例主要是用于描述Job的详细信息,包括类,name和group是什么等等的用于描述这个job的相关信息。
接着创建了一个Trigger用于描述触发器的,执行时间啊,频次啊之类的。
Scheduler
这个类中主要涉及到了任务的调度了,那么总的来看的话,在创建一个Scheduler的时候,会创建QuartzSchedulerThread的线程池,然后这个线程池不断的取出trigger来判断这个是不是已经到了执行的时间了,然后会调用已经设置进来的job执行。
try
scheduler = sf.getScheduler();
scheduler.scheduleJob(jobDetail, strigger);
//4、并执行启动、关闭等操作
scheduler.start();
调用这个scheduler的start方法、
public void start() throws SchedulerException
if (!this.shuttingDown && !this.closed)
if (this.initialStart == null)
this.initialStart = new Date();
this.resources.getJobStore().schedulerStarted();
this.startPlugins();
this.schedThread.togglePause(false);
this.getLog().info("Scheduler " + this.resources.getUniqueIdentifier() + " started.");
else
throw new SchedulerException("The Scheduler cannot be restarted after shutdown() has been called.");
运行togglePause方法
void togglePause(boolean pause)
Object var2 = this.sigLock;
synchronized(this.sigLock)
this.paused = pause;
if (this.paused)
this.signalSchedulingChange(0L);
else
this.sigLock.notifyAll();
这里调用了sigLock的notifyAll方法来运行。
再次之前已经创建了一个线程池,然后用于任务的调度。
public void run()
boolean lastAcquireFailed = false;
while(!this.halted)
try
Object var2 = this.sigLock;
synchronized(this.sigLock)
while(this.paused && !this.halted)
try
this.sigLock.wait(1000L);
catch (InterruptedException var28)
;
if (this.halted)
break;
当刚才的方法notifyAll的时候,会激活线程去运行,同时传入的参数为false,然后,既然传入了false,在唤醒的时候,这个循环就会退出了。
int availTreadCount = this.qsRsrcs.getThreadPool().blockForAvailableThreads();
if (availTreadCount > 0)
Trigger trigger = null;
long now = System.currentTimeMillis();
this.clearSignaledSchedulingChange();
try
trigger = this.qsRsrcs.getJobStore().acquireNextTrigger(this.ctxt, now + this.idleWaitTime);
lastAcquireFailed = false;
catch (JobPersistenceException var30)
。。。
catch (RuntimeException var31)
。。。
if (trigger == null)
long now = System.currentTimeMillis();
long waitTime = now + this.getRandomizedIdleWaitTime();
long timeUntilContinue = waitTime - now; // 继续等待
Object var9 = this.sigLock;
synchronized(this.sigLock)
try
this.sigLock.wait(timeUntilContinue);
catch (InterruptedException var16)
;
else
now = System.currentTimeMillis();
long triggerTime = trigger.getNextFireTime().getTime();
// 这里是一个for循环,然后遍历triggerTime时间不断的计算与当前时间的差值,如果当前差值>1可以休眠处理否则就退出循环
for(long timeUntilTrigger = triggerTime - now; timeUntilTrigger > 0L; timeUntilTrigger = triggerTime - now)
Object var10 = this.sigLock;
synchronized(this.sigLock)
try
now = System.currentTimeMillis();
timeUntilTrigger = triggerTime - now;
if (timeUntilTrigger >= 1L)
this.sigLock.wait(timeUntilTrigger);
catch (InterruptedException var26)
;
if (this.isScheduleChanged() && this.isCandidateNewTimeEarlierWithinReason(triggerTime))
try
this.qsRsrcs.getJobStore().releaseAcquiredTrigger(this.ctxt, trigger);
catch (JobPersistenceException var24)
。。。。
trigger = null;
break;
now = System.currentTimeMillis();
if (trigger != null)
TriggerFiredBundle bndle = null;
boolean goAhead = true;
Object var12 = this.sigLock;
synchronized(this.sigLock)
goAhead = !this.halted;
if (goAhead)
try
bndle = this.qsRsrcs.getJobStore().triggerFired(this.ctxt, trigger);
catch (SchedulerException var21)
。。。。
if (bndle == null)
try
this.qsRsrcs.getJobStore().releaseAcquiredTrigger(this.ctxt, trigger);
catch (SchedulerException var20)
。。。。
else
var12 = null;
JobRunShell shell;
try
shell = this.qsRsrcs.getJobRunShellFactory().borrowJobRunShell();
shell.initialize(this.qs, bndle);
catch (SchedulerException var29)
try
this.qsRsrcs.getJobStore().triggeredJobComplete(this.ctxt, trigger, bndle.getJobDetail(), 6);
catch (SchedulerException var19)
。。。。
continue;
if (!this.qsRsrcs.getThreadPool().runInThread(shell)) // 真正执行在这里
try
this.getLog().error("ThreadPool.runInThread() return false!");
this.qsRsrcs.getJobStore().triggeredJobComplete(this.ctxt, trigger, bndle.getJobDetail(), 6);
catch (SchedulerException var18)
。。。。。
catch (RuntimeException var33)
this.getLog().error("Runtime error occured in main trigger firing loop.", var33);
this.qs = null;
this.qsRsrcs = null;
acquireNextTrigger(this.ctxt, now + this.idleWaitTime);
lastAcquireFailed = false;
catch (JobPersistenceException var30)
。。。
catch (RuntimeException var31)
。。。
if (trigger == null)
long now = System.currentTimeMillis();
long waitTime = now + this.getRandomizedIdleWaitTime();
long timeUntilContinue = waitTime - now; // 继续等待
Object var9 = this.sigLock;
synchronized(this.sigLock)
try
this.sigLock.wait(timeUntilContinue);
catch (InterruptedException var16)
;
else
now = System.currentTimeMillis();
long triggerTime = trigger.getNextFireTime().getTime();
// 这里是一个for循环,然后遍历triggerTime时间不断的计算与当前时间的差值,如果当前差值>1可以休眠处理否则就退出循环
for(long timeUntilTrigger = triggerTime - now; timeUntilTrigger > 0L; timeUntilTrigger = triggerTime - now)
Object var10 = this.sigLock;
synchronized(this.sigLock)
try
now = System.currentTimeMillis();
timeUntilTrigger = triggerTime - now;
if (timeUntilTrigger >= 1L)
this.sigLock.wait(timeUntilTrigger);
catch (InterruptedException var26)
;
if (this.isScheduleChanged() && this.isCandidateNewTimeEarlierWithinReason(triggerTime))
try
this.qsRsrcs.getJobStore().releaseAcquiredTrigger(this.ctxt, trigger);
catch (JobPersistenceException var24)
。。。。
trigger = null;
break;
now = System.currentTimeMillis();
if (trigger != null)
TriggerFiredBundle bndle = null;
boolean goAhead = true;
Object var12 = this.sigLock;
synchronized(this.sigLock)
goAhead = !this.halted;
if (goAhead)
try
bndle = this.qsRsrcs.getJobStore().triggerFired(this.ctxt, trigger);
catch (SchedulerException var21)
。。。。
if (bndle == null)
try
this.qsRsrcs.getJobStore().releaseAcquiredTrigger(this.ctxt, trigger);
catch (SchedulerException var20)
。。。。
else
var12 = null;
JobRunShell shell;
try
shell = this.qsRsrcs.getJobRunShellFactory().borrowJobRunShell();
shell.initialize(this.qs, bndle);
catch (SchedulerException var29)
try
this.qsRsrcs.getJobStore().triggeredJobComplete(this.ctxt, trigger, bndle.getJobDetail(), 6);
catch (SchedulerException var19)
。。。。
continue;
if (!this.qsRsrcs.getThreadPool().runInThread(shell)) // 真正执行在这里
try
this.getLog().error("ThreadPool.runInThread() return false!");
this.qsRsrcs.getJobStore().triggeredJobComplete(this.ctxt, trigger, bndle.getJobDetail(), 6);
catch (SchedulerException var18)
。。。。。
catch (RuntimeException var33)
this.getLog().error("Runtime error occured in main trigger firing loop.", var33);
this.qs = null;
this.qsRsrcs = null;
以上代码基本上做的就是以下轮询(服务器启动后不断地执行run方法):
qsRsrcs.getJobStore().acquireNextTriggers【查找即将触发的Trigger】 ---->
sigLock.wait(timeUntilTrigger)【等待执行】 ---->
qsRsrcs.getJobStore().triggersFired(triggers)【执行】---->
qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i)) 【释放Trigger】
这个地方其实是Scheduler线程来调度的,在while的死循环里面不断的进行检查的。
然后调用的默认JobStore用的是RAMJobStore,在这里面实际上使用的是TreeSet来存储的每一个触发器包装器。
public TriggerFiredBundle triggerFired(SchedulingContext ctxt, Trigger trigger)
Object var3 = this.lock;
synchronized(this.lock)
TriggerWrapper tw = (TriggerWrapper)this.triggersByFQN.get(TriggerWrapper.getTriggerNameKey(trigger));
if (tw != null && tw.trigger != null)
if (tw.state != 1)
return null;
else
Calendar cal = null;
if (tw.trigger.getCalendarName() != null)
cal = this.retrieveCalendar(ctxt, tw.trigger.getCalendarName());
if (cal == null)
return null;
Date prevFireTime = trigger.getPreviousFireTime();
this.timeTriggers.remove(tw);
tw.trigger.triggered(cal);
trigger.triggered(cal);
tw.state = 0;
TriggerFiredBundle bndle = new TriggerFiredBundle(this.retrieveJob(ctxt, trigger.getJobName(), trigger.getJobGroup()), trigger, cal, false, new Date(), trigger.getPreviousFireTime(), prevFireTime, trigger.getNextFireTime());
JobDetail job = bndle.getJobDetail();
if (job.isStateful())
ArrayList trigs = this.getTriggerWrappersForJob(job.getName(), job.getGroup());
TriggerWrapper ttw;
for(Iterator itr = trigs.iterator(); itr.hasNext(); this.timeTriggers.remove(ttw))
ttw = (TriggerWrapper)itr.next();
if (ttw.state == 0)
ttw.state = 5;
if (ttw.state == 4)
ttw.state = 6;
this.blockedJobs.add(JobWrapper.getJobNameKey(job));
else if (tw.trigger.getNextFireTime() != null)
Object var16 = this.lock;
synchronized(this.lock)
this.timeTriggers.add(tw);
return bndle;
else
return null;
实际执行不在这里,这里只是做了一个封装。
在SimpleThreadPool里面执行
public boolean runInThread(Runnable runnable)
if (runnable == null)
return false;
else
Object var2 = this.nextRunnableLock;
synchronized(this.nextRunnableLock)
this.handoffPending = true;
while(this.availWorkers.size() < 1 && !this.isShutdown)
try
this.nextRunnableLock.wait(500L);
catch (InterruptedException var5)
;
SimpleThreadPool.WorkerThread wt;
if (!this.isShutdown)
wt = (SimpleThreadPool.WorkerThread)this.availWorkers.removeFirst();
this.busyWorkers.add(wt);
wt.run(runnable);
else
wt = new SimpleThreadPool.WorkerThread(this, this.threadGroup, "WorkerThread-LastJob", this.prio, this.isMakeThreadsDaemons(), runnable);
this.busyWorkers.add(wt);
this.workers.add(wt);
wt.start();
this.nextRunnableLock.notifyAll();
this.handoffPending = false;
return true;
获取一个线程来运行,然后这个任务会进行启动然后去执行。
以上是关于Quartz源码解读-任务是如何定时执行的的主要内容,如果未能解决你的问题,请参考以下文章