Quartz源码解读-任务是如何定时执行的

Posted 低调的洋仔

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Quartz源码解读-任务是如何定时执行的相关的知识,希望对你有一定的参考价值。

例子

 

import org.quartz.*;
import org.quartz.impl.StdSchedulerFactory;

import java.util.Date;

/**
 * quartz定时器测试
 *
 * @author leizhimin 2009-7-23 8:49:01
 */
class MyJob implements Job 
    public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException 
        System.out.println(new Date() + ": doing something...");
    


class Test 
    public static void main(String[] args) 
        //1、创建JobDetial对象
        JobDetail jobDetail = new JobDetail();
        //设置工作项
        jobDetail.setJobClass(MyJob.class);
        jobDetail.setName("MyJob_1");
        jobDetail.setGroup("JobGroup_1");

        //2、创建Trigger对象
        SimpleTrigger strigger = new SimpleTrigger();
        strigger.setName("Trigger_1");
        strigger.setGroup("Trigger_Group_1");
        strigger.setStartTime(new Date());
        //设置重复停止时间,并销毁该Trigger对象
        java.util.Calendar c = java.util.Calendar.getInstance();
        c.setTimeInMillis(System.currentTimeMillis() + 1000 * 1L);
        strigger.setEndTime(c.getTime());
        strigger.setFireInstanceId("Trigger_1_id_001");
        //设置重复间隔时间
        strigger.setRepeatInterval(1000 * 1L);
        //设置重复执行次数
        strigger.setRepeatCount(3);

        //3、创建Scheduler对象,并配置JobDetail和Trigger对象
        SchedulerFactory sf = new StdSchedulerFactory();
        Scheduler scheduler = null;
        try 
            scheduler = sf.getScheduler();
            scheduler.scheduleJob(jobDetail, strigger);
            //4、并执行启动、关闭等操作
            scheduler.start();

         catch (SchedulerException e) 
            e.printStackTrace();
        
//                try  
//                        //关闭调度器 
//                        scheduler.shutdown(true); 
//                 catch (SchedulerException e)  
//                        e.printStackTrace(); 
//                 
    

 

 

先创建好JobDetail类的实例,这个实例主要是用于描述Job的详细信息,包括类,name和group是什么等等的用于描述这个job的相关信息。

接着创建了一个Trigger用于描述触发器的,执行时间啊,频次啊之类的。

 

Scheduler

这个类中主要涉及到了任务的调度了,那么总的来看的话,在创建一个Scheduler的时候,会创建QuartzSchedulerThread的线程池,然后这个线程池不断的取出trigger来判断这个是不是已经到了执行的时间了,然后会调用已经设置进来的job执行。

 

try 
            scheduler = sf.getScheduler();
            scheduler.scheduleJob(jobDetail, strigger);
            //4、并执行启动、关闭等操作
            scheduler.start();


调用这个scheduler的start方法、

 

 

public void start() throws SchedulerException 
        if (!this.shuttingDown && !this.closed) 
            if (this.initialStart == null) 
                this.initialStart = new Date();
                this.resources.getJobStore().schedulerStarted();
                this.startPlugins();
            

            this.schedThread.togglePause(false);
            this.getLog().info("Scheduler " + this.resources.getUniqueIdentifier() + " started.");
         else 
            throw new SchedulerException("The Scheduler cannot be restarted after shutdown() has been called.");
        
    


运行togglePause方法

 

 

void togglePause(boolean pause) 
        Object var2 = this.sigLock;
        synchronized(this.sigLock) 
            this.paused = pause;
            if (this.paused) 
                this.signalSchedulingChange(0L);
             else 
                this.sigLock.notifyAll();
            

        
    

这里调用了sigLock的notifyAll方法来运行。

 

再次之前已经创建了一个线程池,然后用于任务的调度。

 

public void run() 
        boolean lastAcquireFailed = false;

        while(!this.halted) 
            try 
                Object var2 = this.sigLock;
                synchronized(this.sigLock) 
                    while(this.paused && !this.halted) 
                        try 
                            this.sigLock.wait(1000L);
                         catch (InterruptedException var28) 
                            ;
                        
                    

                    if (this.halted) 
                        break;
                    
                


当刚才的方法notifyAll的时候,会激活线程去运行,同时传入的参数为false,然后,既然传入了false,在唤醒的时候,这个循环就会退出了。

 

 

int availTreadCount = this.qsRsrcs.getThreadPool().blockForAvailableThreads();
                if (availTreadCount > 0) 
                    Trigger trigger = null;
                    long now = System.currentTimeMillis();
                    this.clearSignaledSchedulingChange();

                    try 
                        trigger = this.qsRsrcs.getJobStore().acquireNextTrigger(this.ctxt, now + this.idleWaitTime);
                        lastAcquireFailed = false;
                     catch (JobPersistenceException var30) 
                       。。。
                     catch (RuntimeException var31) 
                       。。。
                    

                    if (trigger == null) 
                        long now = System.currentTimeMillis();
                        long waitTime = now + this.getRandomizedIdleWaitTime();
                        long timeUntilContinue = waitTime - now; // 继续等待
                        Object var9 = this.sigLock;
                        synchronized(this.sigLock) 
                            try 
                                this.sigLock.wait(timeUntilContinue);
                             catch (InterruptedException var16) 
                                ;
                            
                        
                     else 
                        now = System.currentTimeMillis();
                        long triggerTime = trigger.getNextFireTime().getTime();
			// 这里是一个for循环,然后遍历triggerTime时间不断的计算与当前时间的差值,如果当前差值>1可以休眠处理否则就退出循环
                        for(long timeUntilTrigger = triggerTime - now; timeUntilTrigger > 0L; timeUntilTrigger = triggerTime - now) 
                            Object var10 = this.sigLock;
                            synchronized(this.sigLock) 
                                try 
                                    now = System.currentTimeMillis();
                                    timeUntilTrigger = triggerTime - now;
                                    if (timeUntilTrigger >= 1L) 
                                        this.sigLock.wait(timeUntilTrigger);
                                    
                                 catch (InterruptedException var26) 
                                    ;
                                
                            

                            if (this.isScheduleChanged() && this.isCandidateNewTimeEarlierWithinReason(triggerTime)) 
                                try 
                                    this.qsRsrcs.getJobStore().releaseAcquiredTrigger(this.ctxt, trigger);
                                 catch (JobPersistenceException var24) 
                                   。。。。
                                

                                trigger = null;
                                break;
                            

                            now = System.currentTimeMillis();
                        

                        if (trigger != null) 
                            TriggerFiredBundle bndle = null;
                            boolean goAhead = true;
                            Object var12 = this.sigLock;
                            synchronized(this.sigLock) 
                                goAhead = !this.halted;
                            

                            if (goAhead) 
                                try 
                                    bndle = this.qsRsrcs.getJobStore().triggerFired(this.ctxt, trigger);
                                 catch (SchedulerException var21) 
                                    。。。。
                                
                            

                            if (bndle == null) 
                                try 
                                    this.qsRsrcs.getJobStore().releaseAcquiredTrigger(this.ctxt, trigger);
                                 catch (SchedulerException var20) 
                                    。。。。
                                
                             else 
                                var12 = null;

                                JobRunShell shell;
                                try 
                                    shell = this.qsRsrcs.getJobRunShellFactory().borrowJobRunShell();
                                    shell.initialize(this.qs, bndle);
                                 catch (SchedulerException var29) 
                                    try 
                                        this.qsRsrcs.getJobStore().triggeredJobComplete(this.ctxt, trigger, bndle.getJobDetail(), 6);
                                     catch (SchedulerException var19) 
                                        。。。。
                                    
                                    continue;
                                

                                if (!this.qsRsrcs.getThreadPool().runInThread(shell)) // 真正执行在这里
                                    try 
                                        this.getLog().error("ThreadPool.runInThread() return false!");
                                        this.qsRsrcs.getJobStore().triggeredJobComplete(this.ctxt, trigger, bndle.getJobDetail(), 6);
                                     catch (SchedulerException var18) 
                                        。。。。。
                                    
                                
                            
                        
                    
                
             catch (RuntimeException var33) 
                this.getLog().error("Runtime error occured in main trigger firing loop.", var33);
            
        

        this.qs = null;
        this.qsRsrcs = null;
    acquireNextTrigger(this.ctxt, now + this.idleWaitTime);
                        lastAcquireFailed = false;
                     catch (JobPersistenceException var30) 
                       。。。
                     catch (RuntimeException var31) 
                       。。。
                    

                    if (trigger == null) 
                        long now = System.currentTimeMillis();
                        long waitTime = now + this.getRandomizedIdleWaitTime();
                        long timeUntilContinue = waitTime - now; // 继续等待
                        Object var9 = this.sigLock;
                        synchronized(this.sigLock) 
                            try 
                                this.sigLock.wait(timeUntilContinue);
                             catch (InterruptedException var16) 
                                ;
                            
                        
                     else 
                        now = System.currentTimeMillis();
                        long triggerTime = trigger.getNextFireTime().getTime();
			// 这里是一个for循环,然后遍历triggerTime时间不断的计算与当前时间的差值,如果当前差值>1可以休眠处理否则就退出循环
                        for(long timeUntilTrigger = triggerTime - now; timeUntilTrigger > 0L; timeUntilTrigger = triggerTime - now) 
                            Object var10 = this.sigLock;
                            synchronized(this.sigLock) 
                                try 
                                    now = System.currentTimeMillis();
                                    timeUntilTrigger = triggerTime - now;
                                    if (timeUntilTrigger >= 1L) 
                                        this.sigLock.wait(timeUntilTrigger);
                                    
                                 catch (InterruptedException var26) 
                                    ;
                                
                            

                            if (this.isScheduleChanged() && this.isCandidateNewTimeEarlierWithinReason(triggerTime)) 
                                try 
                                    this.qsRsrcs.getJobStore().releaseAcquiredTrigger(this.ctxt, trigger);
                                 catch (JobPersistenceException var24) 
                                   。。。。
                                

                                trigger = null;
                                break;
                            

                            now = System.currentTimeMillis();
                        

                        if (trigger != null) 
                            TriggerFiredBundle bndle = null;
                            boolean goAhead = true;
                            Object var12 = this.sigLock;
                            synchronized(this.sigLock) 
                                goAhead = !this.halted;
                            

                            if (goAhead) 
                                try 
                                    bndle = this.qsRsrcs.getJobStore().triggerFired(this.ctxt, trigger);
                                 catch (SchedulerException var21) 
                                    。。。。
                                
                            

                            if (bndle == null) 
                                try 
                                    this.qsRsrcs.getJobStore().releaseAcquiredTrigger(this.ctxt, trigger);
                                 catch (SchedulerException var20) 
                                    。。。。
                                
                             else 
                                var12 = null;

                                JobRunShell shell;
                                try 
                                    shell = this.qsRsrcs.getJobRunShellFactory().borrowJobRunShell();
                                    shell.initialize(this.qs, bndle);
                                 catch (SchedulerException var29) 
                                    try 
                                        this.qsRsrcs.getJobStore().triggeredJobComplete(this.ctxt, trigger, bndle.getJobDetail(), 6);
                                     catch (SchedulerException var19) 
                                        。。。。
                                    
                                    continue;
                                

                                if (!this.qsRsrcs.getThreadPool().runInThread(shell)) // 真正执行在这里
                                    try 
                                        this.getLog().error("ThreadPool.runInThread() return false!");
                                        this.qsRsrcs.getJobStore().triggeredJobComplete(this.ctxt, trigger, bndle.getJobDetail(), 6);
                                     catch (SchedulerException var18) 
                                        。。。。。
                                    
                                
                            
                        
                    
                
             catch (RuntimeException var33) 
                this.getLog().error("Runtime error occured in main trigger firing loop.", var33);
            
        

        this.qs = null;
        this.qsRsrcs = null;
    


以上代码基本上做的就是以下轮询(服务器启动后不断地执行run方法): 
qsRsrcs.getJobStore().acquireNextTriggers【查找即将触发的Trigger】 ----> 
sigLock.wait(timeUntilTrigger)【等待执行】 ----> 
qsRsrcs.getJobStore().triggersFired(triggers)【执行】----> 

 

qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i)) 【释放Trigger】

 

 

 

这个地方其实是Scheduler线程来调度的,在while的死循环里面不断的进行检查的。

然后调用的默认JobStore用的是RAMJobStore,在这里面实际上使用的是TreeSet来存储的每一个触发器包装器。

 

 public TriggerFiredBundle triggerFired(SchedulingContext ctxt, Trigger trigger) 
        Object var3 = this.lock;
        synchronized(this.lock) 
            TriggerWrapper tw = (TriggerWrapper)this.triggersByFQN.get(TriggerWrapper.getTriggerNameKey(trigger));
            if (tw != null && tw.trigger != null) 
                if (tw.state != 1) 
                    return null;
                 else 
                    Calendar cal = null;
                    if (tw.trigger.getCalendarName() != null) 
                        cal = this.retrieveCalendar(ctxt, tw.trigger.getCalendarName());
                        if (cal == null) 
                            return null;
                        
                    

                    Date prevFireTime = trigger.getPreviousFireTime();
                    this.timeTriggers.remove(tw);
                    tw.trigger.triggered(cal);
                    trigger.triggered(cal);
                    tw.state = 0;
                    TriggerFiredBundle bndle = new TriggerFiredBundle(this.retrieveJob(ctxt, trigger.getJobName(), trigger.getJobGroup()), trigger, cal, false, new Date(), trigger.getPreviousFireTime(), prevFireTime, trigger.getNextFireTime());
                    JobDetail job = bndle.getJobDetail();
                    if (job.isStateful()) 
                        ArrayList trigs = this.getTriggerWrappersForJob(job.getName(), job.getGroup());

                        TriggerWrapper ttw;
                        for(Iterator itr = trigs.iterator(); itr.hasNext(); this.timeTriggers.remove(ttw)) 
                            ttw = (TriggerWrapper)itr.next();
                            if (ttw.state == 0) 
                                ttw.state = 5;
                            

                            if (ttw.state == 4) 
                                ttw.state = 6;
                            
                        

                        this.blockedJobs.add(JobWrapper.getJobNameKey(job));
                     else if (tw.trigger.getNextFireTime() != null) 
                        Object var16 = this.lock;
                        synchronized(this.lock) 
                            this.timeTriggers.add(tw);
                        
                    

                    return bndle;
                
             else 
                return null;
            
        
    

实际执行不在这里,这里只是做了一个封装。

 

 

在SimpleThreadPool里面执行

 

public boolean runInThread(Runnable runnable) 
        if (runnable == null) 
            return false;
         else 
            Object var2 = this.nextRunnableLock;
            synchronized(this.nextRunnableLock) 
                this.handoffPending = true;

                while(this.availWorkers.size() < 1 && !this.isShutdown) 
                    try 
                        this.nextRunnableLock.wait(500L);
                     catch (InterruptedException var5) 
                        ;
                    
                

                SimpleThreadPool.WorkerThread wt;
                if (!this.isShutdown) 
                    wt = (SimpleThreadPool.WorkerThread)this.availWorkers.removeFirst();
                    this.busyWorkers.add(wt);
                    wt.run(runnable);
                 else 
                    wt = new SimpleThreadPool.WorkerThread(this, this.threadGroup, "WorkerThread-LastJob", this.prio, this.isMakeThreadsDaemons(), runnable);
                    this.busyWorkers.add(wt);
                    this.workers.add(wt);
                    wt.start();
                

                this.nextRunnableLock.notifyAll();
                this.handoffPending = false;
                return true;
            
        
    


获取一个线程来运行,然后这个任务会进行启动然后去执行。

 

 

 

 

 


 

以上是关于Quartz源码解读-任务是如何定时执行的的主要内容,如果未能解决你的问题,请参考以下文章

一文揭秘定时任务调度框架quartz

quartz spring 实现动态定时任务

应该如何正确使用Quartz

Quartz定时任务的并行与串行

quartz 定时任务第一次不执行

深入Quartz,更优雅地管理你的定时任务