XXL-JOB原理--任务执行时间轮

Posted 归田

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了XXL-JOB原理--任务执行时间轮相关的知识,希望对你有一定的参考价值。

1、介绍

之前 XXL-JOB 任务执行是通过 Quartz来进行任务管理触发的,在之前的博客 《Quartz任务调度框架–任务执行流程》 我们进行了任务执行的流程介绍,目前 XXL-JOB 任务执行已经摒弃 Quartz 框架,目前通过时间轮方式来管理任务触发任务。

2、任务执行

XXL-JOB 任务执行中启动了两个线程:
(1)线程 scheduleThread 运行中不断的从任务表中查询 查询近 5000 毫秒(5秒)中要执行的任务,如果当前时间大于任务接下来要执行的时间则立即执行,否则将任务执行时间除以 1000 变为秒之后再与 60 求余添加到时间轮中。
(2)XXL-JOB 时间轮实现方式比较简单,就是一个 Map 结构数据,key值0-60,value是任务ID列表
Map<Integer, List> ringData
(3)线程 ringThread 运行中不断根据当前时间求余从 时间轮 ringData 中获取任务列表,取出任务之后执行任务。

public class JobScheduleHelper 
    private static Logger logger = LoggerFactory.getLogger(JobScheduleHelper.class);

    private static JobScheduleHelper instance = new JobScheduleHelper();
    public static JobScheduleHelper getInstance()
        return instance;
    
    // 任务间隔大小
    public static final long PRE_READ_MS = 5000;    // pre read

    private Thread scheduleThread;
    private Thread ringThread;
    private volatile boolean scheduleThreadToStop = false;
    private volatile boolean ringThreadToStop = false;
    // 时间轮,key 0-60,value 任务ID列表,两个线程同时处理这个对象
    private volatile static Map<Integer, List<Integer>> ringData = new ConcurrentHashMap<>();

    public void start()

        // schedule thread
        scheduleThread = new Thread(new Runnable() 
            @Override
            public void run() 

                try 
                    TimeUnit.MILLISECONDS.sleep(5000 - System.currentTimeMillis()%1000 );
                 catch (InterruptedException e) 
                    if (!scheduleThreadToStop) 
                        logger.error(e.getMessage(), e);
                    
                
                logger.info(">>>>>>>>> init xxl-job admin scheduler success.");

                // pre-read count: treadpool-size * trigger-qps (each trigger cost 50ms, qps = 1000/50 = 20)
                // 每次读取到任务个数
                int preReadCount = (XxlJobAdminConfig.getAdminConfig().getTriggerPoolFastMax() + XxlJobAdminConfig.getAdminConfig().getTriggerPoolSlowMax()) * 20;

                while (!scheduleThreadToStop) 

                    // Scan Job
                    long start = System.currentTimeMillis();

                    Connection conn = null;
                    Boolean connAutoCommit = null;
                    PreparedStatement preparedStatement = null;

                    boolean preReadSuc = true;
                    try 

                        conn = XxlJobAdminConfig.getAdminConfig().getDataSource().getConnection();
                        connAutoCommit = conn.getAutoCommit();
                        conn.setAutoCommit(false);
                        // 分布式下获取锁
                        preparedStatement = conn.prepareStatement(  "select * from xxl_job_lock where lock_name = 'schedule_lock' for update" );
                        preparedStatement.execute();

                        // tx start

                        // 1、pre read
                        long nowTime = System.currentTimeMillis();
                        // 查询 当前时间 + 5000 毫秒,就是接下来 5 秒要执行到任务
                        List<XxlJobInfo> scheduleList = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleJobQuery(nowTime + PRE_READ_MS, preReadCount);
                        if (scheduleList!=null && scheduleList.size()>0) 
                            // 2、push time-ring
                            for (XxlJobInfo jobInfo: scheduleList) 

                                // time-ring jump
                                // 如果当前时间大于要接下来执行到时间 + 5 秒
                                if (nowTime > jobInfo.getTriggerNextTime() + PRE_READ_MS) 
                                    // 2.1、trigger-expire > 5s:pass && make next-trigger-time
                                    logger.warn(">>>>>>>>>>> xxl-job, schedule misfire, jobId = " + jobInfo.getId());

                                    // 1、misfire match
                                    // 如果每次都有执行则立即触发执行
                                    MisfireStrategyEnum misfireStrategyEnum = MisfireStrategyEnum.match(jobInfo.getMisfireStrategy(), MisfireStrategyEnum.DO_NOTHING);
                                    if (MisfireStrategyEnum.FIRE_ONCE_NOW == misfireStrategyEnum) 
                                        // FIRE_ONCE_NOW 》 trigger
                                        JobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.MISFIRE, -1, null, null, null);
                                        logger.debug(">>>>>>>>>>> xxl-job, schedule push trigger : jobId = " + jobInfo.getId() );
                                    

                                    // 2、fresh next
                                    // 刷新接下来要执行时间
                                    refreshNextValidTime(jobInfo, new Date());

                                 else if (nowTime > jobInfo.getTriggerNextTime()) 
                                    // 2.2、trigger-expire < 5s:direct-trigger && make next-trigger-time

                                    // 1、trigger
                                    // 如果当前时间大于接下来要执行到时间则立即触发执行
                                    JobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.CRON, -1, null, null, null);
                                    logger.debug(">>>>>>>>>>> xxl-job, schedule push trigger : jobId = " + jobInfo.getId() );

                                    // 2、fresh next
                                    // 刷新下次执行时间
                                    refreshNextValidTime(jobInfo, new Date());

                                    // next-trigger-time in 5s, pre-read again
                                    if (jobInfo.getTriggerStatus()==1 && nowTime + PRE_READ_MS > jobInfo.getTriggerNextTime()) 

                                        // 1、make ring second
                                        // 如果接下来 5 秒内还执行则直接放到时间轮中
                                        int ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)%60);

                                        // 2、push time ring
                                        pushTimeRing(ringSecond, jobInfo.getId());

                                        // 3、fresh next
                                        // 刷新下次执行时间
                                        refreshNextValidTime(jobInfo, new Date(jobInfo.getTriggerNextTime()));

                                    

                                 else 
                                    // 2.3、trigger-pre-read:time-ring trigger && make next-trigger-time

                                    // 1、make ring second
                                    // 任务还没有到执行时间则直接放到时间轮中
                                    int ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)%60);

                                    // 2、push time ring
                                    pushTimeRing(ringSecond, jobInfo.getId());

                                    // 3、fresh next
                                    // 刷新下次执行时间
                                    refreshNextValidTime(jobInfo, new Date(jobInfo.getTriggerNextTime()));

                                

                            

                            // 3、update trigger info
                            for (XxlJobInfo jobInfo: scheduleList) 
                            // 更新任务信息
                                XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleUpdate(jobInfo);
                            

                         else 
                            preReadSuc = false;
                        

                        // tx stop


                     catch (Exception e) 
                        if (!scheduleThreadToStop) 
                            logger.error(">>>>>>>>>>> xxl-job, JobScheduleHelper#scheduleThread error:", e);
                        
                     finally 

                        // commit
                        if (conn != null) 
                            try 
                                conn.commit();
                             catch (SQLException e) 
                                if (!scheduleThreadToStop) 
                                    logger.error(e.getMessage(), e);
                                
                            
                            try 
                                conn.setAutoCommit(connAutoCommit);
                             catch (SQLException e) 
                                if (!scheduleThreadToStop) 
                                    logger.error(e.getMessage(), e);
                                
                            
                            try 
                                conn.close();
                             catch (SQLException e) 
                                if (!scheduleThreadToStop) 
                                    logger.error(e.getMessage(), e);
                                
                            
                        

                        // close PreparedStatement
                        if (null != preparedStatement) 
                            try 
                                preparedStatement.close();
                             catch (SQLException e) 
                                if (!scheduleThreadToStop) 
                                    logger.error(e.getMessage(), e);
                                
                            
                        
                    
                    long cost = System.currentTimeMillis()-start;


                    // Wait seconds, align second
                    if (cost < 1000)   // scan-overtime, not wait
                        try 
                            // pre-read period: success > scan each second; fail > skip this period;
                            TimeUnit.MILLISECONDS.sleep((preReadSuc?1000:PRE_READ_MS) - System.currentTimeMillis()%1000);
                         catch (InterruptedException e) 
                            if (!scheduleThreadToStop) 
                                logger.error(e.getMessage(), e);
                            
                        
                    

                

                logger.info(">>>>>>>>>>> xxl-job, JobScheduleHelper#scheduleThread stop");
            
        );
        scheduleThread.setDaemon(true);
        scheduleThread.setName("xxl-job, admin JobScheduleHelper#scheduleThread");
        scheduleThread.start();


        // ring thread
        ringThread = new Thread(new Runnable() 
            @Override
            public void run() 

                // align second
                try 
                    TimeUnit.MILLISECONDS.sleep(1000 - System.currentTimeMillis()%1000 );
                 catch (InterruptedException e) 
                    if (!ringThreadToStop) 
                        logger.error(e.getMessage(), e);
                    
                

                while (!ringThreadToStop) 

                    try 
                        // second data
                        List<Integer> ringItemData = new ArrayList<>();
                        int nowSecond = Calendar.getInstance().get(Calendar.SECOND);   // 避免处理耗时太长,跨过刻度,向前校验一个刻度;
                        // 获取最近1秒和 2秒要执行到任务
                        for (int i = 0; i < 2; i++) 
                            List<Integer> tmpData = ringData.remove( (nowSecond+60-i)%60 );
                            if (tmpData != null) 
                                ringItemData.addAll(tmpData);
                            
                        

                        // ring trigger
                        logger.debug(">>>>>>>>>>> xxl-job, time-ring beat : " + nowSecond + " = " + Arrays.asList(ringItemData) );
                        if (ringItemData.size() > 0) 
                            // do trigger
                            for (int jobId: ringItemData) 
                                // do trigger
                                // 执行任务
                                JobTriggerPoolHelper.trigger(jobId, TriggerTypeEnum.CRON, -1, null, null, null);
                            
                            // clear
                            ringItemData.clear();
                        
                     catch (Exception e) 
                        if (!ringThreadToStop) 
                            logger.error(">>>>>>>>>>> xxl-job, JobScheduleHelper#ringThread error:", e);
                        
                    

                    // next second, align second
                    try 
                        TimeUnit.MILLISECONDS.sleep(1000 - System.currentTimeMillis()%1000);
                     catch (InterruptedException e) 
                        if (!ringThreadToStop) 
                            logger.error(e.getMessage(), e);
                        
                    
                
                logger.info(">>>>>>>>>>> xxl-job, JobScheduleHelper#ringThread stop");
            
        );
        ringThread.setDaemon(true);
        ringThread.setName("xxl-job, admin JobScheduleHelper#ringThread");
        ringThread.start();
    

    private void refreshNextValidTime(XxlJobInfo jobInfo, Date fromTime) throws Exception 
        Date nextValidTime = generateNextValidTime(jobInfo, fromTime);
        if (nextValidTime != null) 
            jobInfo.setTriggerLastTime(jobInfo.getTriggerNextTime());
            jobInfo.setTriggerNextTime(nextValidTime.getTime());
         else 
            jobInfo.setTriggerStatus(0);
            jobInfo.setTriggerLastTime(0);
            jobInfo.setTriggerNextTime(0);
            logger.warn(">>>>>>>>>>> xxl-job, refreshNextValidTime fail for job: jobId=, scheduleType=, scheduleConf=",
                    jobInfo.getId(), jobInfo.getScheduleType(), jobInfo.getScheduleConf());
        
    

    private void pushTimeRing(int ringSecond, int jobId)
        // push async ring
        List<Integer> ringItemData = ringData.get(ringSecond);
        if (ringItemData == null) 
            ringItemData = new ArrayList<Integer>();
            ringData.put(ringSecond, ringItemData);
        
        ringItemData.add(jobId);

        logger.debug(">>>>>>>>>>> xxl-job, schedule push time-ring : " + ringSecond + " = " + Arrays.asList(ringItemData) );
    

    public void toStop()

        // 1、stop schedule
        scheduleThreadToStop = true;
        try 
            TimeUnit.SECONDS.sleep(1);  // wait
         catch (InterruptedException e) 
            logger.error(e.getMessage(), e);
        
        if (scheduleThread.getState() != Thread.State.TERMINATED)
            // interrupt and wait
            scheduleThread.interrupt();
            try 
                scheduleThread.join();
             catch (InterruptedException e) 
                logger.error(e.getMessage(), e);
            
        

        // if has ring data
        boolean hasRingData = false;
        if (!ringData.isEmpty()) 
            for (int second : ringData.keySet()) 
                List<Integer> tmpData = ringData.get(second);
                if (tmpData!=null && tmpData.size()>0) 
                    hasRingData = true;
                    break;
                
            
        
        if (hasRingData) 
            try 
                TimeUnit.SECONDS.sleep(8);
             catch (InterruptedException e) 
                logger.error(e.getMessage(), e);
            
        

        // stop ring (wait job-in-memory stop)
        ringThreadToStop = true;
        try 
            TimeUnit.SECONDS.sleep(1);
         catch (InterruptedException e) 
            logger.error(e.getMessage(), e);
        
        if (ringThread.getState() != Thread.State.TERMINATED)
            // interrupt and wait
            ringThread.interrupt();
            try 
                ringThread.join();
             catch (InterruptedException e) 
                logger.error(e.getMessage(), e);
            
        

        logger.info(">>>>>>>>>>> xxl-job, JobScheduleHelper stop");
    


    // ---------------------- tools ----------------------
    public static Date generateNextValidTime(XxlJobInfo jobInfo, Date fromTime) throws Exception 
        ScheduleTypeEnum scheduleTypeEnum = ScheduleTypeEnum.match(jobInfo.getScheduleType(), null);
        if (ScheduleTypeEnum.CRON == scheduleTypeEnum) 
            Date nextValidTime = new CronExpression(jobInfo.getScheduleConf()).getNextValidTimeAfter(fromTime);
            return nextValidTime;
         else if (ScheduleTypeEnum.FIX_RATE == scheduleTypeEnum /*|| ScheduleTypeEnum.FIX_DELAY == scheduleTypeEnum*/) 
            return new Date(fromTime.getTime() + Integer.valueOf(jobInfo.getScheduleConf())*1000 );
        
        return null;
    


通过以上分析,我们发现 XXL-JOB 通过一个简单到时间轮就可以完成一个任务到倒计时执行操作。

以上是关于XXL-JOB原理--任务执行时间轮的主要内容,如果未能解决你的问题,请参考以下文章

8. xxl-job 原理-- 调度中心任务执行或触发

xxl-job调度效率与分布式锁等待问题

XXL-JOB分布式任务调度框架-源码分析-调度中心对执行器的上下线感知实现原理

xxl-job后继任务导致前一个任务执行一半,源码分析xxljob

XXL-JOB分析(一任务执行的过程源码分析)

定时任务的实现原理:时间轮算法