Quartz任务调度源码分析

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Quartz任务调度源码分析相关的知识,希望对你有一定的参考价值。

从源码分析中可以看出,任务的整个调度过程为,初始化线程池,及调度器QuartzScheduler,然后由线程池去执行QuartzSchedulerThread,将触发器任务(job与触发器)添加到存储器(TreeSet,timeTrriger)中,然后启动调度器,QuartzSchedulerThread从timeTrriger去除待触发的任务,并包装成TriggerFiredBundle,然后由JobRunShellFactory 
创建TriggerFiredBundle的执行线程JobRunShell, 调度执行通过线程池SimpleThreadPool去执行JobRunShell,而JobRunShell执行的就是job.execute(JobExecutionContext context)。Quartz主要中的集合类有ArrayList,LinkedList,HashMap,TreeSet(TreeMap);之所以用到上面四个集合类,主要用到集合的如下特点:ArrayList访问速度快,LinkedList添加删除元素快;HashMap添加删除快,TreeSet访问速度快。

触发任务创建工厂类 

Java代码  下载 

  1. public class JTAJobRunShellFactory  

  2.     implements JobRunShellFactory  

  3. {  

  4.     public void initialize(Scheduler sched)  

  5.         throws SchedulerConfigException  

  6.     {  

  7.         scheduler = sched;  

  8.     }  

  9.     public JobRunShell createJobRunShell(TriggerFiredBundle bundle)  

  10.         throws SchedulerException  

  11.     {  

  12.         return new JTAJobRunShell(scheduler, bundle);  

  13.     }  

  14.     private Scheduler scheduler;  

  15. }  


//触发任务运行类 

Java代码  下载 

  1. public class JTAJobRunShell extends JobRunShell  

  2. {  

  3.   

  4.     public JTAJobRunShell(Scheduler scheduler, TriggerFiredBundle bndle)  

  5.     {  

  6.         super(scheduler, bndle);  

  7.         transactionTimeout = null;  

  8.     }  

  9. }  

  10. public class JobRunShell extends SchedulerListenerSupport  

  11.     implements Runnable  

  12. {  

  13. public JobRunShell(Scheduler scheduler, TriggerFiredBundle bndle)  

  14.     {  

  15.         jec = null;  

  16.         qs = null;  

  17.         firedTriggerBundle = null;  

  18.         this.scheduler = null;  

  19.         shutdownRequested = false;  

  20.         this.scheduler = scheduler;  

  21.         firedTriggerBundle = bndle;  

  22.     }  

  23.      public void run()  

  24.     {  

  25.         //添加到内部监听器  

  26.         qs.addInternalSchedulerListener(this);  

  27. label0:  

  28.         {  

  29.        //protected JobExecutionContextImpl jec,job执行上下文  

  30.             OperableTrigger trigger = (OperableTrigger)jec.getTrigger();  

  31.             JobDetail jobDetail = jec.getJobDetail();  

  32.             org.quartz.Trigger.CompletedExecutionInstruction instCode;  

  33.             do  

  34.             {  

  35.                 JobExecutionException jobExEx = null;  

  36.                 Job job = jec.getJobInstance();  

  37.                 try  

  38.                 {  

  39.                     begin();  

  40.                 }  

  41.                 catch(SchedulerException se)  

  42.                 {  

  43.                     qs.notifySchedulerListenersError((new StringBuilder()).append("Error executing Job (").append(jec.getJobDetail().getKey()).append(": couldn‘t begin execution.").toString(), se);  

  44.                     break label0;  

  45.                 }  

  46.                 try  

  47.                 {  

  48.                     if(!notifyListenersBeginning(jec))  

  49.                         break label0;  

  50.                 }  

  51.                 catch(VetoedException ve)  

  52.                 {  

  53.                     try  

  54.                     {  

  55.                         org.quartz.Trigger.CompletedExecutionInstruction instCode = trigger.executionComplete(jec, null);  

  56.                         qs.notifyJobStoreJobVetoed(trigger, jobDetail, instCode);  

  57.                         if(jec.getTrigger().getNextFireTime() == null)  

  58.                             qs.notifySchedulerListenersFinalized(jec.getTrigger());  

  59.                         complete(true);  

  60.                     }  

  61.                     catch(SchedulerException se)  

  62.                     {  

  63.                         qs.notifySchedulerListenersError((new StringBuilder()).append("Error during veto of Job (").append(jec.getJobDetail().getKey()).append(": couldn‘t finalize execution.").toString(), se);  

  64.                     }  

  65.                     break label0;  

  66.                 }  

  67.                 long startTime = System.currentTimeMillis();  

  68.                 long endTime = startTime;  

  69.                 try  

  70.                 {  

  71.                     log.debug((new StringBuilder()).append("Calling execute on job ").append(jobDetail.getKey()).toString());  

  72.                     //执行Job,关键  

  73.             job.execute(jec);  

  74.                     endTime = System.currentTimeMillis();  

  75.                 }  

  76.                 catch(JobExecutionException jee)  

  77.                 {  

  78.                     endTime = System.currentTimeMillis();  

  79.                     jobExEx = jee;  

  80.                     getLog().info((new StringBuilder()).append("Job ").append(jobDetail.getKey()).append(" threw a JobExecutionException: ").toString(), jobExEx);  

  81.                 }  

  82.                 catch(Throwable e)  

  83.                 {  

  84.                     endTime = System.currentTimeMillis();  

  85.                     getLog().error((new StringBuilder()).append("Job ").append(jobDetail.getKey()).append(" threw an unhandled Exception: ").toString(), e);  

  86.                     SchedulerException se = new SchedulerException("Job threw an unhandled exception.", e);  

  87.                     qs.notifySchedulerListenersError((new StringBuilder()).append("Job (").append(jec.getJobDetail().getKey()).append(" threw an exception.").toString(), se);  

  88.                     jobExEx = new JobExecutionException(se, false);  

  89.                 }  

  90.         //设置jJobExecutionContext运行时间  

  91.                 jec.setJobRunTime(endTime - startTime);  

  92.                 if(!notifyJobListenersComplete(jec, jobExEx))  

  93.                     break label0;  

  94.                 instCode = org.quartz.Trigger.CompletedExecutionInstruction.NOOP;  

  95.                 try  

  96.                 {  

  97.                     instCode = trigger.executionComplete(jec, jobExEx);  

  98.                 }  

  99.                 catch(Exception e)  

  100.                 {  

  101.                     SchedulerException se = new SchedulerException("Trigger threw an unhandled exception.", e);  

  102.                     qs.notifySchedulerListenersError("Please report this error to the Quartz developers.", se);  

  103.                 }  

  104.                 if(!notifyTriggerListenersComplete(jec, instCode))  

  105.                     break label0;  

  106.                 if(instCode == org.quartz.Trigger.CompletedExecutionInstruction.RE_EXECUTE_JOB)  

  107.                 {  

  108.                     jec.incrementRefireCount();  

  109.                     try  

  110.                     {  

  111.                         complete(false);  

  112.                     }  

  113.                     catch(SchedulerException se)  

  114.                     {  

  115.                         qs.notifySchedulerListenersError((new StringBuilder()).append("Error executing Job (").append(jec.getJobDetail().getKey()).append(": couldn‘t finalize execution.").toString(), se);  

  116.                     }  

  117.                     continue;  

  118.                 }  

  119.                 try  

  120.                 {  

  121.                     complete(true);  

  122.                     break;  

  123.                 }  

  124.                 catch(SchedulerException se)  

  125.                 {  

  126.                     qs.notifySchedulerListenersError((new StringBuilder()).append("Error executing Job (").append(jec.getJobDetail().getKey()).append(": couldn‘t finalize execution.").toString(), se);  

  127.                 }  

  128.             } while(true);  

  129.         //通知job执行完成  

  130.             qs.notifyJobStoreJobComplete(trigger, jobDetail, instCode);  

  131.         }  

  132.         qs.removeInternalSchedulerListener(this);  

  133.         break MISSING_BLOCK_LABEL_710;  

  134.         Exception exception;  

  135.         exception;  

  136.         qs.removeInternalSchedulerListener(this);  

  137.         throw exception;  

  138.     }  

  139.     protected JobExecutionContextImpl jec;//job执行上下文  

  140.     protected QuartzScheduler qs;  

  141.     protected TriggerFiredBundle firedTriggerBundle;  

  142.     protected Scheduler scheduler;  

  143.     protected volatile boolean shutdownRequested;  

  144.     private final Logger log = LoggerFactory.getLogger(getClass());  

  145. }  


//TriggerKey,JobKey包装类 

Java代码  下载 

  1. class TriggerWrapper  

  2. {  

  3.   

  4.     TriggerWrapper(OperableTrigger trigger)  

  5.     {  

  6.         state = 0;  

  7.         if(trigger == null)  

  8.         {  

  9.             throw new IllegalArgumentException("Trigger cannot be null!");  

  10.         } else  

  11.         {  

  12.             this.trigger = trigger;  

  13.             key = trigger.getKey();  

  14.             jobKey = trigger.getJobKey();  

  15.             return;  

  16.         }  

  17.     }  

  18.   

  19.     public boolean equals(Object obj)  

  20.     {  

  21.         if(obj instanceof TriggerWrapper)  

  22.         {  

  23.             TriggerWrapper tw = (TriggerWrapper)obj;  

  24.             if(tw.key.equals(key))  

  25.                 return true;  

  26.         }  

  27.         return false;  

  28.     }  

  29.   

  30.     public int hashCode()  

  31.     {  

  32.         return key.hashCode();  

  33.     }  

  34.   

  35.     public OperableTrigger getTrigger()  

  36.     {  

  37.         return trigger;  

  38.     }  

  39.   

  40.     public final TriggerKey key;  

  41.     public final JobKey jobKey;  

  42.     public final OperableTrigger trigger;  

  43.     public int state;  

  44.     public static final int STATE_WAITING = 0;//等待  

  45.     public static final int STATE_ACQUIRED = 1;//就绪  

  46.     public static final int STATE_EXECUTING = 2;//执行  

  47.     public static final int STATE_COMPLETE = 3;//完成  

  48.     public static final int STATE_PAUSED = 4;//暂停  

  49.     public static final int STATE_BLOCKED = 5;//阻塞  

  50.     public static final int STATE_PAUSED_BLOCKED = 6;//暂停阻塞  

  51.     public static final int STATE_ERROR = 7;//错误  

  52. }  


//简单触发器 

Java代码  下载 

  1. public class SimpleTriggerImpl extends AbstractTrigger  

  2.     implements SimpleTrigger, CoreTrigger  

  3. {  

  4.  //获取下一次触发时间  

  5.  public Date getNextFireTime()  

  6.     {  

  7.         return nextFireTime;  

  8.     }  

  9.  private Date startTime;  

  10.     private Date endTime;  

  11.     private Date nextFireTime;  

  12.     private Date previousFireTime;  

  13.     private int repeatCount;  

  14.     private long repeatInterval;  

  15.     private int timesTriggered;  

  16.     private boolean complete;  

  17.   

  18. }  


//触发任务包装类 

Java代码  下载 

  1. public class TriggerFiredBundle  

  2.     implements Serializable  

  3. {  

  4. public TriggerFiredBundle(JobDetail job, OperableTrigger trigger, Calendar cal, boolean jobIsRecovering, Date fireTime, Date scheduledFireTime, Date prevFireTime,   

  5.             Date nextFireTime)  

  6.     {  

  7.         this.job = job;  

  8.         this.trigger = trigger;  

  9.         this.cal = cal;  

  10.         this.jobIsRecovering = jobIsRecovering;  

  11.         this.fireTime = fireTime;  

  12.         this.scheduledFireTime = scheduledFireTime;  

  13.         this.prevFireTime = prevFireTime;  

  14.         this.nextFireTime = nextFireTime;  

  15.     }  

  16.     private JobDetail job;  

  17.     private OperableTrigger trigger;  

  18.     private Calendar cal;  

  19.     private boolean jobIsRecovering;  

  20.     private Date fireTime;  

  21.     private Date scheduledFireTime;  

  22.     private Date prevFireTime;  

  23.     private Date nextFireTime;  

  24. }  


//触发任务包装结果类 

Java代码  下载 

  1. public class TriggerFiredResult  

  2. {  

  3.     public TriggerFiredResult(TriggerFiredBundle triggerFiredBundle)  

  4.     {  

  5.         this.triggerFiredBundle = triggerFiredBundle;  

  6.     }  

  7.     private TriggerFiredBundle triggerFiredBundle;  

  8.     private Exception exception;  

  9. }  


以上是关于Quartz任务调度源码分析的主要内容,如果未能解决你的问题,请参考以下文章

一文揭秘定时任务调度框架quartz

[源码分析] Quartz 故障切换

xxx-job调度中心源码分析

TencentOS tiny深度源码分析——调度器

XXL-JOB分布式任务调度框架-源码分析-任务调度执行流程及实现原理

源码分析 Datax 调度以及数据传输流程