XXL-JOB原理--任务执行时间轮
Posted 归田
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了XXL-JOB原理--任务执行时间轮相关的知识,希望对你有一定的参考价值。
1、介绍
之前 XXL-JOB 任务执行是通过 Quartz来进行任务管理触发的,在之前的博客 《Quartz任务调度框架–任务执行流程》 我们进行了任务执行的流程介绍,目前 XXL-JOB 任务执行已经摒弃 Quartz 框架,目前通过时间轮方式来管理任务触发任务。
2、任务执行
XXL-JOB 任务执行中启动了两个线程:
(1)线程 scheduleThread 运行中不断的从任务表中查询 查询近 5000 毫秒(5秒)中要执行的任务,如果当前时间大于任务接下来要执行的时间则立即执行,否则将任务执行时间除以 1000 变为秒之后再与 60 求余添加到时间轮中。
(2)XXL-JOB 时间轮实现方式比较简单,就是一个 Map 结构数据,key值0-60,value是任务ID列表
Map<Integer, List> ringData
(3)线程 ringThread 运行中不断根据当前时间求余从 时间轮 ringData 中获取任务列表,取出任务之后执行任务。
public class JobScheduleHelper
private static Logger logger = LoggerFactory.getLogger(JobScheduleHelper.class);
private static JobScheduleHelper instance = new JobScheduleHelper();
public static JobScheduleHelper getInstance()
return instance;
// 任务间隔大小
public static final long PRE_READ_MS = 5000; // pre read
private Thread scheduleThread;
private Thread ringThread;
private volatile boolean scheduleThreadToStop = false;
private volatile boolean ringThreadToStop = false;
// 时间轮,key 0-60,value 任务ID列表,两个线程同时处理这个对象
private volatile static Map<Integer, List<Integer>> ringData = new ConcurrentHashMap<>();
public void start()
// schedule thread
scheduleThread = new Thread(new Runnable()
@Override
public void run()
try
TimeUnit.MILLISECONDS.sleep(5000 - System.currentTimeMillis()%1000 );
catch (InterruptedException e)
if (!scheduleThreadToStop)
logger.error(e.getMessage(), e);
logger.info(">>>>>>>>> init xxl-job admin scheduler success.");
// pre-read count: treadpool-size * trigger-qps (each trigger cost 50ms, qps = 1000/50 = 20)
// 每次读取到任务个数
int preReadCount = (XxlJobAdminConfig.getAdminConfig().getTriggerPoolFastMax() + XxlJobAdminConfig.getAdminConfig().getTriggerPoolSlowMax()) * 20;
while (!scheduleThreadToStop)
// Scan Job
long start = System.currentTimeMillis();
Connection conn = null;
Boolean connAutoCommit = null;
PreparedStatement preparedStatement = null;
boolean preReadSuc = true;
try
conn = XxlJobAdminConfig.getAdminConfig().getDataSource().getConnection();
connAutoCommit = conn.getAutoCommit();
conn.setAutoCommit(false);
// 分布式下获取锁
preparedStatement = conn.prepareStatement( "select * from xxl_job_lock where lock_name = 'schedule_lock' for update" );
preparedStatement.execute();
// tx start
// 1、pre read
long nowTime = System.currentTimeMillis();
// 查询 当前时间 + 5000 毫秒,就是接下来 5 秒要执行到任务
List<XxlJobInfo> scheduleList = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleJobQuery(nowTime + PRE_READ_MS, preReadCount);
if (scheduleList!=null && scheduleList.size()>0)
// 2、push time-ring
for (XxlJobInfo jobInfo: scheduleList)
// time-ring jump
// 如果当前时间大于要接下来执行到时间 + 5 秒
if (nowTime > jobInfo.getTriggerNextTime() + PRE_READ_MS)
// 2.1、trigger-expire > 5s:pass && make next-trigger-time
logger.warn(">>>>>>>>>>> xxl-job, schedule misfire, jobId = " + jobInfo.getId());
// 1、misfire match
// 如果每次都有执行则立即触发执行
MisfireStrategyEnum misfireStrategyEnum = MisfireStrategyEnum.match(jobInfo.getMisfireStrategy(), MisfireStrategyEnum.DO_NOTHING);
if (MisfireStrategyEnum.FIRE_ONCE_NOW == misfireStrategyEnum)
// FIRE_ONCE_NOW 》 trigger
JobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.MISFIRE, -1, null, null, null);
logger.debug(">>>>>>>>>>> xxl-job, schedule push trigger : jobId = " + jobInfo.getId() );
// 2、fresh next
// 刷新接下来要执行时间
refreshNextValidTime(jobInfo, new Date());
else if (nowTime > jobInfo.getTriggerNextTime())
// 2.2、trigger-expire < 5s:direct-trigger && make next-trigger-time
// 1、trigger
// 如果当前时间大于接下来要执行到时间则立即触发执行
JobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.CRON, -1, null, null, null);
logger.debug(">>>>>>>>>>> xxl-job, schedule push trigger : jobId = " + jobInfo.getId() );
// 2、fresh next
// 刷新下次执行时间
refreshNextValidTime(jobInfo, new Date());
// next-trigger-time in 5s, pre-read again
if (jobInfo.getTriggerStatus()==1 && nowTime + PRE_READ_MS > jobInfo.getTriggerNextTime())
// 1、make ring second
// 如果接下来 5 秒内还执行则直接放到时间轮中
int ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)%60);
// 2、push time ring
pushTimeRing(ringSecond, jobInfo.getId());
// 3、fresh next
// 刷新下次执行时间
refreshNextValidTime(jobInfo, new Date(jobInfo.getTriggerNextTime()));
else
// 2.3、trigger-pre-read:time-ring trigger && make next-trigger-time
// 1、make ring second
// 任务还没有到执行时间则直接放到时间轮中
int ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)%60);
// 2、push time ring
pushTimeRing(ringSecond, jobInfo.getId());
// 3、fresh next
// 刷新下次执行时间
refreshNextValidTime(jobInfo, new Date(jobInfo.getTriggerNextTime()));
// 3、update trigger info
for (XxlJobInfo jobInfo: scheduleList)
// 更新任务信息
XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleUpdate(jobInfo);
else
preReadSuc = false;
// tx stop
catch (Exception e)
if (!scheduleThreadToStop)
logger.error(">>>>>>>>>>> xxl-job, JobScheduleHelper#scheduleThread error:", e);
finally
// commit
if (conn != null)
try
conn.commit();
catch (SQLException e)
if (!scheduleThreadToStop)
logger.error(e.getMessage(), e);
try
conn.setAutoCommit(connAutoCommit);
catch (SQLException e)
if (!scheduleThreadToStop)
logger.error(e.getMessage(), e);
try
conn.close();
catch (SQLException e)
if (!scheduleThreadToStop)
logger.error(e.getMessage(), e);
// close PreparedStatement
if (null != preparedStatement)
try
preparedStatement.close();
catch (SQLException e)
if (!scheduleThreadToStop)
logger.error(e.getMessage(), e);
long cost = System.currentTimeMillis()-start;
// Wait seconds, align second
if (cost < 1000) // scan-overtime, not wait
try
// pre-read period: success > scan each second; fail > skip this period;
TimeUnit.MILLISECONDS.sleep((preReadSuc?1000:PRE_READ_MS) - System.currentTimeMillis()%1000);
catch (InterruptedException e)
if (!scheduleThreadToStop)
logger.error(e.getMessage(), e);
logger.info(">>>>>>>>>>> xxl-job, JobScheduleHelper#scheduleThread stop");
);
scheduleThread.setDaemon(true);
scheduleThread.setName("xxl-job, admin JobScheduleHelper#scheduleThread");
scheduleThread.start();
// ring thread
ringThread = new Thread(new Runnable()
@Override
public void run()
// align second
try
TimeUnit.MILLISECONDS.sleep(1000 - System.currentTimeMillis()%1000 );
catch (InterruptedException e)
if (!ringThreadToStop)
logger.error(e.getMessage(), e);
while (!ringThreadToStop)
try
// second data
List<Integer> ringItemData = new ArrayList<>();
int nowSecond = Calendar.getInstance().get(Calendar.SECOND); // 避免处理耗时太长,跨过刻度,向前校验一个刻度;
// 获取最近1秒和 2秒要执行到任务
for (int i = 0; i < 2; i++)
List<Integer> tmpData = ringData.remove( (nowSecond+60-i)%60 );
if (tmpData != null)
ringItemData.addAll(tmpData);
// ring trigger
logger.debug(">>>>>>>>>>> xxl-job, time-ring beat : " + nowSecond + " = " + Arrays.asList(ringItemData) );
if (ringItemData.size() > 0)
// do trigger
for (int jobId: ringItemData)
// do trigger
// 执行任务
JobTriggerPoolHelper.trigger(jobId, TriggerTypeEnum.CRON, -1, null, null, null);
// clear
ringItemData.clear();
catch (Exception e)
if (!ringThreadToStop)
logger.error(">>>>>>>>>>> xxl-job, JobScheduleHelper#ringThread error:", e);
// next second, align second
try
TimeUnit.MILLISECONDS.sleep(1000 - System.currentTimeMillis()%1000);
catch (InterruptedException e)
if (!ringThreadToStop)
logger.error(e.getMessage(), e);
logger.info(">>>>>>>>>>> xxl-job, JobScheduleHelper#ringThread stop");
);
ringThread.setDaemon(true);
ringThread.setName("xxl-job, admin JobScheduleHelper#ringThread");
ringThread.start();
private void refreshNextValidTime(XxlJobInfo jobInfo, Date fromTime) throws Exception
Date nextValidTime = generateNextValidTime(jobInfo, fromTime);
if (nextValidTime != null)
jobInfo.setTriggerLastTime(jobInfo.getTriggerNextTime());
jobInfo.setTriggerNextTime(nextValidTime.getTime());
else
jobInfo.setTriggerStatus(0);
jobInfo.setTriggerLastTime(0);
jobInfo.setTriggerNextTime(0);
logger.warn(">>>>>>>>>>> xxl-job, refreshNextValidTime fail for job: jobId=, scheduleType=, scheduleConf=",
jobInfo.getId(), jobInfo.getScheduleType(), jobInfo.getScheduleConf());
private void pushTimeRing(int ringSecond, int jobId)
// push async ring
List<Integer> ringItemData = ringData.get(ringSecond);
if (ringItemData == null)
ringItemData = new ArrayList<Integer>();
ringData.put(ringSecond, ringItemData);
ringItemData.add(jobId);
logger.debug(">>>>>>>>>>> xxl-job, schedule push time-ring : " + ringSecond + " = " + Arrays.asList(ringItemData) );
public void toStop()
// 1、stop schedule
scheduleThreadToStop = true;
try
TimeUnit.SECONDS.sleep(1); // wait
catch (InterruptedException e)
logger.error(e.getMessage(), e);
if (scheduleThread.getState() != Thread.State.TERMINATED)
// interrupt and wait
scheduleThread.interrupt();
try
scheduleThread.join();
catch (InterruptedException e)
logger.error(e.getMessage(), e);
// if has ring data
boolean hasRingData = false;
if (!ringData.isEmpty())
for (int second : ringData.keySet())
List<Integer> tmpData = ringData.get(second);
if (tmpData!=null && tmpData.size()>0)
hasRingData = true;
break;
if (hasRingData)
try
TimeUnit.SECONDS.sleep(8);
catch (InterruptedException e)
logger.error(e.getMessage(), e);
// stop ring (wait job-in-memory stop)
ringThreadToStop = true;
try
TimeUnit.SECONDS.sleep(1);
catch (InterruptedException e)
logger.error(e.getMessage(), e);
if (ringThread.getState() != Thread.State.TERMINATED)
// interrupt and wait
ringThread.interrupt();
try
ringThread.join();
catch (InterruptedException e)
logger.error(e.getMessage(), e);
logger.info(">>>>>>>>>>> xxl-job, JobScheduleHelper stop");
// ---------------------- tools ----------------------
public static Date generateNextValidTime(XxlJobInfo jobInfo, Date fromTime) throws Exception
ScheduleTypeEnum scheduleTypeEnum = ScheduleTypeEnum.match(jobInfo.getScheduleType(), null);
if (ScheduleTypeEnum.CRON == scheduleTypeEnum)
Date nextValidTime = new CronExpression(jobInfo.getScheduleConf()).getNextValidTimeAfter(fromTime);
return nextValidTime;
else if (ScheduleTypeEnum.FIX_RATE == scheduleTypeEnum /*|| ScheduleTypeEnum.FIX_DELAY == scheduleTypeEnum*/)
return new Date(fromTime.getTime() + Integer.valueOf(jobInfo.getScheduleConf())*1000 );
return null;
通过以上分析,我们发现 XXL-JOB 通过一个简单到时间轮就可以完成一个任务到倒计时执行操作。
以上是关于XXL-JOB原理--任务执行时间轮的主要内容,如果未能解决你的问题,请参考以下文章
XXL-JOB分布式任务调度框架-源码分析-调度中心对执行器的上下线感知实现原理