java quartz 定时任务,每次执行都有多个线程同时执行
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了java quartz 定时任务,每次执行都有多个线程同时执行相关的知识,希望对你有一定的参考价值。
定时任务设置了每秒执行一次,然后每次执行的时候都有多个线程来执行这个任务。少的时候有3个,多的时候会出现8个线程。
有没有大神告诉我是什么原因造成的?有没有办法设成每次都一个线程执行任务
定时任务配置文件:
<job>
<job-detail>
<name>ScanClearJob</name>
<group>SMSGroup</group>
<job-class>com.forms.sms.service.ScanClearJob</job-class>
<job-data-map allows-transient-data="true">
</job-data-map>
</job-detail>
<trigger>
<cron>
<name>ScanClearTrigger</name>
<group>SMSGroup</group>
<job-name>ScanClearJob</job-name>
<job-group>SMSGroup</job-group>
<cron-expression>0/1 * * * * ?</cron-expression><!-- 每1秒钟执行 -->%>
</cron>
</trigger>
</job>
好吧,我知道怎么弄了。。继承StatefulJob接口就好了!!。
一文揭秘定时任务调度框架quartz
之前写过quartz或者引用过quartz的一些文章,有很多人给我发消息问quartz的相关问题,
quartz 报错:java.lang.classNotFoundException
quartz源码分析之深刻理解job,sheduler,calendar,trigger及listener之间的关系
Quartz框架多个trigger任务执行出现漏执行的问题分析--转
趁着年底比较清闲,把quartz的问题整理了一下,顺带翻了翻源码,做了一些总结,希望能帮助到一些人或者减少人们探索的时间。
注意,使用版本为quartz2.2.3 spring boot2.1.3
1.quartz的核心组件
1.1 Job组件
1.1.1Job
Job负责任务执行的逻辑,所有逻辑在execute()方法中,执行所需要的数据存放在JobExecutionContext 中
Job实例:
@PersistJobDataAfterExecution @DisallowConcurrentExecution public class ColorJob implements Job { private static Logger _log = LoggerFactory.getLogger(ColorJob.class); // parameter names specific to this job public static final String FAVORITE_COLOR = "favorite color"; public static final String EXECUTION_COUNT = "count"; // Since Quartz will re-instantiate a class every time it // gets executed, members non-static member variables can // not be used to maintain state! private int _counter = 1; /** * <p> * Empty constructor for job initialization * </p> * <p> * Quartz requires a public empty constructor so that the * scheduler can instantiate the class whenever it needs. * </p> */ public ColorJob() { } /** * <p> * Called by the <code>{@link org.quartz.Scheduler}</code> when a * <code>{@link org.quartz.Trigger}</code> fires that is associated with * the <code>Job</code>. * </p> * * @throws JobExecutionException * if there is an exception while executing the job. */ public void execute(JobExecutionContext context) throws JobExecutionException { // This job simply prints out its job name and the // date and time that it is running JobKey jobKey = context.getJobDetail().getKey(); // Grab and print passed parameters JobDataMap data = context.getJobDetail().getJobDataMap(); String favoriteColor = data.getString(FAVORITE_COLOR); int count = data.getInt(EXECUTION_COUNT); _log.info("ColorJob: " + jobKey + " executing at " + new Date() + " " + " favorite color is " + favoriteColor + " " + " execution count (from job map) is " + count + " " + " execution count (from job member variable) is " + _counter); // increment the count and store it back into the // job map so that job state can be properly maintained count++; data.put(EXECUTION_COUNT, count); // Increment the local member variable // This serves no real purpose since job state can not // be maintained via member variables! _counter++; } }
1.1.2 JobDetail存储Job的信息
主要负责
1.指定执行的Job类,唯一标识(job名称和组别 名称)
2.存储JobDataMap信息
// job1 will only run 5 times (at start time, plus 4 repeats), every 10 seconds JobDetail job1 = newJob(ColorJob.class).withIdentity("job1", "group1").build(); // pass initialization parameters into the job job1.getJobDataMap().put(ColorJob.FAVORITE_COLOR, "Green"); job1.getJobDataMap().put(ColorJob.EXECUTION_COUNT, 1);
数据库存储如下:
1.1.3 Quartz JobBuilder提供了一个链式api创建JobDetail
@Bean public JobDetail jobDetail() { return JobBuilder.newJob().ofType(SampleJob.class) .storeDurably() .withIdentity("Qrtz_Job_Detail") .withDescription("Invoke Sample Job service...") .build(); }
1.1.4 Spring JobDetailFactoryBean
spring提供的一个创建JobDetail的方式工厂bean
@Bean public JobDetailFactoryBean jobDetail() { JobDetailFactoryBean jobDetailFactory = new JobDetailFactoryBean(); jobDetailFactory.setJobClass(SampleJob.class); jobDetailFactory.setDescription("Invoke Sample Job service..."); jobDetailFactory.setDurability(true); return jobDetailFactory; }
1.2 Trigger组件
trigger的状态不同
trigger的状态
// STATES
String STATE_WAITING = "WAITING";
String STATE_ACQUIRED = "ACQUIRED";
String STATE_EXECUTING = "EXECUTING";
String STATE_COMPLETE = "COMPLETE";
String STATE_BLOCKED = "BLOCKED";
String STATE_ERROR = "ERROR";
String STATE_PAUSED = "PAUSED";
String STATE_PAUSED_BLOCKED = "PAUSED_BLOCKED";
String STATE_DELETED = "DELETED";
状态的表结构
trigger的类型
// TRIGGER TYPES
/** Simple Trigger type. */
String TTYPE_SIMPLE = "SIMPLE";
/** Cron Trigger type. */
String TTYPE_CRON = "CRON";
/** Calendar Interval Trigger type. */
String TTYPE_CAL_INT = "CAL_INT";
/** Daily Time Interval Trigger type. */
String TTYPE_DAILY_TIME_INT = "DAILY_I";
/** A general blob Trigger type. */
String TTYPE_BLOB = "BLOB";
对应表结构
1.2.1 trigger实例
SimpleTrigger trigger1 = newTrigger().withIdentity("trigger1", "group1").startAt(startTime)
.withSchedule(simpleSchedule().withIntervalInSeconds(10).withRepeatCount(4)).build();
Trigger存储在mysql中
1.2.2 Quartz TriggerBuilder
提供了一个链式创建Trigger的api
@Bean public Trigger trigger(JobDetail job) { return TriggerBuilder.newTrigger().forJob(job) .withIdentity("Qrtz_Trigger") .withDescription("Sample trigger") .withSchedule(simpleSchedule().repeatForever().withIntervalInHours(1)) .build(); }
1.2.3 Spring SimpleTriggerFactoryBean
spring提供的一个创建SimpleTrigger的工厂类
@Bean public SimpleTriggerFactoryBean trigger(JobDetail job) { SimpleTriggerFactoryBean trigger = new SimpleTriggerFactoryBean(); trigger.setJobDetail(job); trigger.setRepeatInterval(3600000); trigger.setRepeatCount(SimpleTrigger.REPEAT_INDEFINITELY); return trigger; }
1.3 调度组件
1.3.1 quartz提供的工厂类
@Bean public Scheduler scheduler(Trigger trigger, JobDetail job) { StdSchedulerFactory factory = new StdSchedulerFactory(); factory.initialize(new ClassPathResource("quartz.properties").getInputStream()); Scheduler scheduler = factory.getScheduler(); scheduler.setJobFactory(springBeanJobFactory()); scheduler.scheduleJob(job, trigger); scheduler.start(); return scheduler; }
1.3.2 spring提供的工厂bean
@Bean public SchedulerFactoryBean scheduler(Trigger trigger, JobDetail job) { SchedulerFactoryBean schedulerFactory = new SchedulerFactoryBean(); schedulerFactory.setConfigLocation(new ClassPathResource("quartz.properties")); schedulerFactory.setJobFactory(springBeanJobFactory()); schedulerFactory.setJobDetails(job); schedulerFactory.setTriggers(trigger); return schedulerFactory; }
2.工作原理
2.1 核心类QuartzScheduler
Scheduler实现类StdScheduler封装了核心工作类QuartzScheduler
/** * <p> * Construct a <code>StdScheduler</code> instance to proxy the given * <code>QuartzScheduler</code> instance, and with the given <code>SchedulingContext</code>. * </p> */ public StdScheduler(QuartzScheduler sched) { this.sched = sched; }
2.2 JobDetail的存取
public void addJob(JobDetail jobDetail, boolean replace, boolean storeNonDurableWhileAwaitingScheduling) throws SchedulerException { validateState(); if (!storeNonDurableWhileAwaitingScheduling && !jobDetail.isDurable()) { throw new SchedulerException( "Jobs added with no trigger must be durable."); } resources.getJobStore().storeJob(jobDetail, replace); notifySchedulerThread(0L); notifySchedulerListenersJobAdded(jobDetail); }
2.2.1 存储JobDetail信息(以mysql Jdbc方式为例)
/** * <p> * Insert or update a job. * </p> */ protected void storeJob(Connection conn, JobDetail newJob, boolean replaceExisting) throws JobPersistenceException { boolean existingJob = jobExists(conn, newJob.getKey()); try { if (existingJob) { if (!replaceExisting) { throw new ObjectAlreadyExistsException(newJob); } getDelegate().updateJobDetail(conn, newJob); } else { getDelegate().insertJobDetail(conn, newJob); } } catch (IOException e) { throw new JobPersistenceException("Couldn‘t store job: " + e.getMessage(), e); } catch (SQLException e) { throw new JobPersistenceException("Couldn‘t store job: " + e.getMessage(), e); } }
调用StdJDBCDelegate实现
/** * <p> * Insert the job detail record. * </p> * * @param conn * the DB Connection * @param job * the job to insert * @return number of rows inserted * @throws IOException * if there were problems serializing the JobDataMap */ public int insertJobDetail(Connection conn, JobDetail job) throws IOException, SQLException { ByteArrayOutputStream baos = serializeJobData(job.getJobDataMap()); PreparedStatement ps = null; int insertResult = 0; try { ps = conn.prepareStatement(rtp(INSERT_JOB_DETAIL)); ps.setString(1, job.getKey().getName()); ps.setString(2, job.getKey().getGroup()); ps.setString(3, job.getDescription()); ps.setString(4, job.getJobClass().getName()); setBoolean(ps, 5, job.isDurable()); setBoolean(ps, 6, job.isConcurrentExectionDisallowed()); setBoolean(ps, 7, job.isPersistJobDataAfterExecution()); setBoolean(ps, 8, job.requestsRecovery()); setBytes(ps, 9, baos); insertResult = ps.executeUpdate(); } finally { closeStatement(ps); } return insertResult; }
注意:JobDataMap序列化后以Blob形式存储到数据库中
StdJDBCConstants中执行sql如下:
String INSERT_JOB_DETAIL = "INSERT INTO "
+ TABLE_PREFIX_SUBST + TABLE_JOB_DETAILS + " ("
+ COL_SCHEDULER_NAME + ", " + COL_JOB_NAME
+ ", " + COL_JOB_GROUP + ", " + COL_DESCRIPTION + ", "
+ COL_JOB_CLASS + ", " + COL_IS_DURABLE + ", "
+ COL_IS_NONCONCURRENT + ", " + COL_IS_UPDATE_DATA + ", "
+ COL_REQUESTS_RECOVERY + ", "
+ COL_JOB_DATAMAP + ") " + " VALUES(" + SCHED_NAME_SUBST + ", ?, ?, ?, ?, ?, ?, ?, ?, ?)";
2.2.2 查询JobDetail
强调一下,因JobDetail中的JobDataMap是以Blob形式存放到数据库中的(也可以通过useProperties属性修改成string存储,默认是false,Blob形式存储),所以查询时需要特殊处理:StdJDBCDelegate.java
/** * <p> * Select the JobDetail object for a given job name / group name. * </p> * * @param conn * the DB Connection * @return the populated JobDetail object * @throws ClassNotFoundException * if a class found during deserialization cannot be found or if * the job class could not be found * @throws IOException * if deserialization causes an error */ public JobDetail selectJobDetail(Connection conn, JobKey jobKey, ClassLoadHelper loadHelper) throws ClassNotFoundException, IOException, SQLException { PreparedStatement ps = null; ResultSet rs = null; try { ps = conn.prepareStatement(rtp(SELECT_JOB_DETAIL)); ps.setString(1, jobKey.getName()); ps.setString(2, jobKey.getGroup()); rs = ps.executeQuery(); JobDetailImpl job = null; if (rs.next()) { job = new JobDetailImpl(); job.setName(rs.getString(COL_JOB_NAME)); job.setGroup(rs.getString(COL_JOB_GROUP)); job.setDescription(rs.getString(COL_DESCRIPTION)); job.setJobClass( loadHelper.loadClass(rs.getString(COL_JOB_CLASS), Job.class)); job.setDurability(getBoolean(rs, COL_IS_DURABLE)); job.setRequestsRecovery(getBoolean(rs, COL_REQUESTS_RECOVERY)); Map<?, ?> map = null; if (canUseProperties()) { map = getMapFromProperties(rs); } else { map = (Map<?, ?>) getObjectFromBlob(rs, COL_JOB_DATAMAP); } if (null != map) { job.setJobDataMap(new JobDataMap(map)); } } return job; } finally { closeResultSet(rs); closeStatement(ps); } }
2.3 查询trigger
/** * <p> * Retrieve the given <code>{@link org.quartz.Trigger}</code>. * </p> * * @return The desired <code>Trigger</code>, or null if there is no * match. */ public OperableTrigger retrieveTrigger(final TriggerKey triggerKey) throws JobPersistenceException { return (OperableTrigger)executeWithoutLock( // no locks necessary for read... new TransactionCallback() { public Object execute(Connection conn) throws JobPersistenceException { return retrieveTrigger(conn, triggerKey); } }); } protected OperableTrigger retrieveTrigger(Connection conn, TriggerKey key) throws JobPersistenceException { try { return getDelegate().selectTrigger(conn, key); } catch (Exception e) { throw new JobPersistenceException("Couldn‘t retrieve trigger: " + e.getMessage(), e); } }
StdJDBCDelegate.java
/** * <p> * Select a trigger. * </p> * * @param conn * the DB Connection * @return the <code>{@link org.quartz.Trigger}</code> object * @throws JobPersistenceException */ public OperableTrigger selectTrigger(Connection conn, TriggerKey triggerKey) throws SQLException, ClassNotFoundException, IOException, JobPersistenceException { PreparedStatement ps = null; ResultSet rs = null; try { OperableTrigger trigger = null; ps = conn.prepareStatement(rtp(SELECT_TRIGGER)); ps.setString(1, triggerKey.getName()); ps.setString(2, triggerKey.getGroup()); rs = ps.executeQuery(); if (rs.next()) { String jobName = rs.getString(COL_JOB_NAME); String jobGroup = rs.getString(COL_JOB_GROUP); String description = rs.getString(COL_DESCRIPTION); long nextFireTime = rs.getLong(COL_NEXT_FIRE_TIME); long prevFireTime = rs.getLong(COL_PREV_FIRE_TIME); String triggerType = rs.getString(COL_TRIGGER_TYPE); long startTime = rs.getLong(COL_START_TIME); long endTime = rs.getLong(COL_END_TIME); String calendarName = rs.getString(COL_CALENDAR_NAME); int misFireInstr = rs.getInt(COL_MISFIRE_INSTRUCTION); int priority = rs.getInt(COL_PRIORITY); Map<?, ?> map = null; if (canUseProperties()) { map = getMapFromProperties(rs); } else { map = (Map<?, ?>) getObjectFromBlob(rs, COL_JOB_DATAMAP); } Date nft = null; if (nextFireTime > 0) { nft = new Date(nextFireTime); } Date pft = null; if (prevFireTime > 0) { pft = new Date(prevFireTime); } Date startTimeD = new Date(startTime); Date endTimeD = null; if (endTime > 0) { endTimeD = new Date(endTime); } if (triggerType.equals(TTYPE_BLOB)) { rs.close(); rs = null; ps.close(); ps = null; ps = conn.prepareStatement(rtp(SELECT_BLOB_TRIGGER)); ps.setString(1, triggerKey.getName()); ps.setString(2, triggerKey.getGroup()); rs = ps.executeQuery(); if (rs.next()) { trigger = (OperableTrigger) getObjectFromBlob(rs, COL_BLOB); } } else { TriggerPersistenceDelegate tDel = findTriggerPersistenceDelegate(triggerType); if(tDel == null) throw new JobPersistenceException("No TriggerPersistenceDelegate for trigger discriminator type: " + triggerType); TriggerPropertyBundle triggerProps = null; try { triggerProps = tDel.loadExtendedTriggerProperties(conn, triggerKey); } catch (IllegalStateException isex) { if (isTriggerStillPresent(ps)) { throw isex; } else { // QTZ-386 Trigger has been deleted return null; } } TriggerBuilder<?> tb = newTrigger() .withDescription(description) .withPriority(priority) .startAt(startTimeD) .endAt(endTimeD) .withIdentity(triggerKey) .modifiedByCalendar(calendarName) .withSchedule(triggerProps.getScheduleBuilder()) .forJob(jobKey(jobName, jobGroup)); if (null != map) { tb.usingJobData(new JobDataMap(map)); } trigger = (OperableTrigger) tb.build(); trigger.setMisfireInstruction(misFireInstr); trigger.setNextFireTime(nft); trigger.setPreviousFireTime(pft); setTriggerStateProperties(trigger, triggerProps); } } return trigger; } finally { closeResultSet(rs); closeStatement(ps); } }
执行的sql:
String SELECT_TRIGGER = "SELECT * FROM "
+ TABLE_PREFIX_SUBST + TABLE_TRIGGERS + " WHERE "
+ COL_SCHEDULER_NAME + " = " + SCHED_NAME_SUBST
+ " AND " + COL_TRIGGER_NAME + " = ? AND " + COL_TRIGGER_GROUP + " = ?";
和JobDetail一样,也存在Blob的问题,不再赘述。
2.4 调度执行线程QuartzSchedulerThread
/** * <p> * The main processing loop of the <code>QuartzSchedulerThread</code>. * </p> */ @Override public void run() { boolean lastAcquireFailed = false; while (!halted.get()) { try { // check if we‘re supposed to pause... synchronized (sigLock) { while (paused && !halted.get()) { try { // wait until togglePause(false) is called... sigLock.wait(1000L); } catch (InterruptedException ignore) { } } if (halted.get()) { break; } } int availThreadCount = qsRsrcs.getThreadPool().blockForAvailableThreads(); if(availThreadCount > 0) { // will always be true, due to semantics of blockForAvailableThreads... List<OperableTrigger> triggers = null; long now = System.currentTimeMillis(); clearSignaledSchedulingChange(); try { triggers = qsRsrcs.getJobStore().acquireNextTriggers( now + idleWaitTime, Math.min(availThreadCount, qsRsrcs.getMaxBatchSize()), qsRsrcs.getBatchTimeWindow()); //1. lastAcquireFailed = false; if (log.isDebugEnabled()) log.debug("batch acquisition of " + (triggers == null ? 0 : triggers.size()) + " triggers"); } catch (JobPersistenceException jpe) { if(!lastAcquireFailed) { qs.notifySchedulerListenersError( "An error occurred while scanning for the next triggers to fire.", jpe); } lastAcquireFailed = true; continue; } catch (RuntimeException e) { if(!lastAcquireFailed) { getLog().error("quartzSchedulerThreadLoop: RuntimeException " +e.getMessage(), e); } lastAcquireFailed = true; continue; } if (triggers != null && !triggers.isEmpty()) { now = System.currentTimeMillis(); long triggerTime = triggers.get(0).getNextFireTime().getTime(); long timeUntilTrigger = triggerTime - now; while(timeUntilTrigger > 2) { synchronized (sigLock) { if (halted.get()) { break; } if (!isCandidateNewTimeEarlierWithinReason(triggerTime, false)) { try { // we could have blocked a long while // on ‘synchronize‘, so we must recompute now = System.currentTimeMillis(); timeUntilTrigger = triggerTime - now; if(timeUntilTrigger >= 1) sigLock.wait(timeUntilTrigger); } catch (InterruptedException ignore) { } } } if(releaseIfScheduleChangedSignificantly(triggers, triggerTime)) { break; } now = System.currentTimeMillis(); timeUntilTrigger = triggerTime - now; } // this happens if releaseIfScheduleChangedSignificantly decided to release triggers if(triggers.isEmpty()) continue; // set triggers to ‘executing‘ List<TriggerFiredResult> bndles = new ArrayList<TriggerFiredResult>(); boolean goAhead = true; synchronized(sigLock) { goAhead = !halted.get(); } if(goAhead) { try { List<TriggerFiredResult> res = qsRsrcs.getJobStore().triggersFired(triggers); //2 if(res != null) bndles = res; } catch (SchedulerException se) { qs.notifySchedulerListenersError( "An error occurred while firing triggers ‘" + triggers + "‘", se); //QTZ-179 : a problem occurred interacting with the triggers from the db //we release them and loop again for (int i = 0; i < triggers.size(); i++) { qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i)); } continue; } } for (int i = 0; i < bndles.size(); i++) { TriggerFiredResult result = bndles.get(i); TriggerFiredBundle bndle = result.getTriggerFiredBundle(); Exception exception = result.getException(); if (exception instanceof RuntimeException) { getLog().error("RuntimeException while firing trigger " + triggers.get(i), exception); qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i)); continue; } // it‘s possible to get ‘null‘ if the triggers was paused, // blocked, or other similar occurrences that prevent it being // fired at this time... or if the scheduler was shutdown (halted) if (bndle == null) { qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i)); continue; } JobRunShell shell = null; try { shell = qsRsrcs.getJobRunShellFactory().createJobRunShell(bndle); shell.initialize(qs); } catch (SchedulerException se) { qsRsrcs.getJobStore().triggeredJobComplete(triggers.get(i), bndle.getJobDetail(), CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR); continue; } if (qsRsrcs.getThreadPool().runInThread(shell) == false) { // this case should never happen, as it is indicative of the // scheduler being shutdown or a bug in the thread pool or // a thread pool being used concurrently - which the docs // say not to do... getLog().error("ThreadPool.runInThread() return false!"); qsRsrcs.getJobStore().triggeredJobComplete(triggers.get(i), bndle.getJobDetail(), CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR); } } continue; // while (!halted) } } else { // if(availThreadCount > 0) // should never happen, if threadPool.blockForAvailableThreads() follows contract continue; // while (!halted) } long now = System.currentTimeMillis(); long waitTime = now + getRandomizedIdleWaitTime(); long timeUntilContinue = waitTime - now; synchronized(sigLock) { try { if(!halted.get()) { // QTZ-336 A job might have been completed in the mean time and we might have // missed the scheduled changed signal by not waiting for the notify() yet // Check that before waiting for too long in case this very job needs to be // scheduled very soon if (!isScheduleChanged()) { sigLock.wait(timeUntilContinue); } } } catch (InterruptedException ignore) { } } } catch(RuntimeException re) { getLog().error("Runtime error occurred in main trigger firing loop.", re); } } // while (!halted) // drop references to scheduler stuff to aid garbage collection... qs = null; qsRsrcs = null; }
2.4.1 获取trigger(红色1)
protected List<OperableTrigger> acquireNextTrigger(Connection conn, long noLaterThan, int maxCount, long timeWindow) throws JobPersistenceException { if (timeWindow < 0) { throw new IllegalArgumentException(); } List<OperableTrigger> acquiredTriggers = new ArrayList<OperableTrigger>(); Set<JobKey> acquiredJobKeysForNoConcurrentExec = new HashSet<JobKey>(); final int MAX_DO_LOOP_RETRY = 3; int currentLoopCount = 0; do { currentLoopCount ++; try { List<TriggerKey> keys = getDelegate().selectTriggerToAcquire(conn, noLaterThan + timeWindow, getMisfireTime(), maxCount); // No trigger is ready to fire yet. if (keys == null || keys.size() == 0) return acquiredTriggers; long batchEnd = noLaterThan; for(TriggerKey triggerKey: keys) { // If our trigger is no longer available, try a new one. OperableTrigger nextTrigger = retrieveTrigger(conn, triggerKey); if(nextTrigger == null) { continue; // next trigger } // If trigger‘s job is set as @DisallowConcurrentExecution, and it has already been added to result, then // put it back into the timeTriggers set and continue to search for next trigger. JobKey jobKey = nextTrigger.getJobKey(); JobDetail job; try { job = retrieveJob(conn, jobKey); } catch (JobPersistenceException jpe) { try { getLog().error("Error retrieving job, setting trigger state to ERROR.", jpe); getDelegate().updateTriggerState(conn, triggerKey, STATE_ERROR); } catch (SQLException sqle) { getLog().error("Unable to set trigger state to ERROR.", sqle); } continue; } if (job.isConcurrentExectionDisallowed()) { if (acquiredJobKeysForNoConcurrentExec.contains(jobKey)) { continue; // next trigger } else { acquiredJobKeysForNoConcurrentExec.add(jobKey); } } if (nextTrigger.getNextFireTime().getTime() > batchEnd) { break; } // We now have a acquired trigger, let‘s add to return list. // If our trigger was no longer in the expected state, try a new one. int rowsUpdated = getDelegate().updateTriggerStateFromOtherState(conn, triggerKey, STATE_ACQUIRED, STATE_WAITING); if (rowsUpdated <= 0) { continue; // next trigger } nextTrigger.setFireInstanceId(getFiredTriggerRecordId()); getDelegate().insertFiredTrigger(conn, nextTrigger, STATE_ACQUIRED, null); if(acquiredTriggers.isEmpty()) { batchEnd = Math.max(nextTrigger.getNextFireTime().getTime(), System.currentTimeMillis()) + timeWindow; } acquiredTriggers.add(nextTrigger); } // if we didn‘t end up with any trigger to fire from that first // batch, try again for another batch. We allow with a max retry count. if(acquiredTriggers.size() == 0 && currentLoopCount < MAX_DO_LOOP_RETRY) { continue; } // We are done with the while loop. break; } catch (Exception e) { throw new JobPersistenceException( "Couldn‘t acquire next trigger: " + e.getMessage(), e); } } while (true); // Return the acquired trigger list return acquiredTriggers; }
2.4.2 触发trigger(红色2)
protected TriggerFiredBundle triggerFired(Connection conn, OperableTrigger trigger) throws JobPersistenceException { JobDetail job; Calendar cal = null; // Make sure trigger wasn‘t deleted, paused, or completed... try { // if trigger was deleted, state will be STATE_DELETED String state = getDelegate().selectTriggerState(conn, trigger.getKey()); if (!state.equals(STATE_ACQUIRED)) { return null; } } catch (SQLException e) { throw new JobPersistenceException("Couldn‘t select trigger state: " + e.getMessage(), e); } try { job = retrieveJob(conn, trigger.getJobKey()); if (job == null) { return null; } } catch (JobPersistenceException jpe) { try { getLog().error("Error retrieving job, setting trigger state to ERROR.", jpe); getDelegate().updateTriggerState(conn, trigger.getKey(), STATE_ERROR); } catch (SQLException sqle) { getLog().error("Unable to set trigger state to ERROR.", sqle); } throw jpe; } if (trigger.getCalendarName() != null) { cal = retrieveCalendar(conn, trigger.getCalendarName()); if (cal == null) { return null; } } try { getDelegate().updateFiredTrigger(conn, trigger, STATE_EXECUTING, job); } catch (SQLException e) { throw new JobPersistenceException("Couldn‘t insert fired trigger: " + e.getMessage(), e); } Date prevFireTime = trigger.getPreviousFireTime(); // call triggered - to update the trigger‘s next-fire-time state... trigger.triggered(cal); String state = STATE_WAITING; boolean force = true; if (job.isConcurrentExectionDisallowed()) { state = STATE_BLOCKED; force = false; try { getDelegate().updateTriggerStatesForJobFromOtherState(conn, job.getKey(), STATE_BLOCKED, STATE_WAITING); getDelegate().updateTriggerStatesForJobFromOtherState(conn, job.getKey(), STATE_BLOCKED, STATE_ACQUIRED); getDelegate().updateTriggerStatesForJobFromOtherState(conn, job.getKey(), STATE_PAUSED_BLOCKED, STATE_PAUSED); } catch (SQLException e) { throw new JobPersistenceException( "Couldn‘t update states of blocked triggers: " + e.getMessage(), e); } } if (trigger.getNextFireTime() == null) { state = STATE_COMPLETE; force = true; } storeTrigger(conn, trigger, job, true, state, force, false); job.getJobDataMap().clearDirtyFlag(); return new TriggerFiredBundle(job, trigger, cal, trigger.getKey().getGroup() .equals(Scheduler.DEFAULT_RECOVERY_GROUP), new Date(), trigger .getPreviousFireTime(), prevFireTime, trigger.getNextFireTime()); }
2.4.3 数据库锁
StdRowLockSemaphore针对支持select for update的数据库如mysql
UpdateLockRowSemaphore针对不支持select for update的数据库如mssqlserver
StdRowLockSemaphore的实现如下:
public static final String SELECT_FOR_LOCK = "SELECT * FROM " + TABLE_PREFIX_SUBST + TABLE_LOCKS + " WHERE " + COL_SCHEDULER_NAME + " = " + SCHED_NAME_SUBST + " AND " + COL_LOCK_NAME + " = ? FOR UPDATE"; public static final String INSERT_LOCK = "INSERT INTO " + TABLE_PREFIX_SUBST + TABLE_LOCKS + "(" + COL_SCHEDULER_NAME + ", " + COL_LOCK_NAME + ") VALUES (" + SCHED_NAME_SUBST + ", ?)";
总结:
1.quartz的三大组件Job/trigger/scheduler,job负责业务逻辑,trigger负责执行时机,scheduler负责调度Job和trigger来执行。
2.使用mysql作为存储的话,使用StdJDBCDelegate和数据库进行交互,交互的sql在StdJDBCConstants中定义
3.QuartzScheduler是核心类,Scheduler做其代理,真正执行的是QuartzSchedulerThread
4.JobStore存储控制,JobStoreSupport的两个实现JobStoreCMT容器管理事务,不需要使用commit和rollback;JobStoreTX用在单机环境,需要处理commit和rollback
5.数据库锁使用了悲观锁select for update,定义为Semaphore
6.qrtz_scheduler_state定义了扫描间隔集群扫描间隔
参考文献:
【1】https://www.baeldung.com/spring-quartz-schedule
【2】https://blog.csdn.net/xiaojin21cen/article/details/79298883
以上是关于java quartz 定时任务,每次执行都有多个线程同时执行的主要内容,如果未能解决你的问题,请参考以下文章