分布式定时任务XxlJob用法及核心调度源码详解

Posted Dream_it_possible!

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了分布式定时任务XxlJob用法及核心调度源码详解相关的知识,希望对你有一定的参考价值。

  

目录

一、XxlJob 的Executor

1. 使用Spring框架注入

2. 不使用框架注入

3. 使用jar包的形式集成executor

二、XxlJob的核心工作原理

1. 注册JobHandler

2.  注册JobThread

3. JobThread---- 真正执行Job的地方

3. 执行一次任务

4. 启动任务

XxlJobScheduler

TimeRing


      XxlJob是目前最流行的分布式定时任务中间件,对比quartz,代码的侵入明显少了很多,而且admin组件提供了可视化ui, 简单易用,目前已经接入几百家互联网公司使用,XxlJob的强大任务调度能力为广大开发者和企业所认可,那XxlJob是怎么工作的?  

        Tip: 总字数2万多点字,阅读全文大概会耽误您一会喝茶时间~

        XxlJob最新依赖版本: 2.3.0 和源码地址:

 <dependency>
            <groupId>com.xuxueli</groupId>
            <artifactId>xxl-job-core</artifactId>
            <version>2.3.0</version>
</dependency>

GitHub - xuxueli/xxl-job: A distributed task scheduling framework.(分布式任务调度平台XXL-JOB)

        XxlJob主要包含2个核心模块:  xxl-job-admin 和xxl-job-core。

         

  •  xxl-job-admin 提供可视化的ui页面管理执行器、Job以及查看日志等功能, 默认登录地址为: localhost:8080/xxl-job-admin, 用户名为: admin, 密码为: 123456。
  •  xxl-job-executor 中基于netty实现一个embedServer, 与admin server是一个独立的server ,处理任务调度请求,包含了Job的核心调度实现。

        最新版本使用@XxlJob注解标记Job,  同时支持生命周期Job的任务。

一、XxlJob 的Executor

        XxlJob提供了2个任务执行器,简称Executor, XxlJob通过Executor来管理所有Job的生命周期,包括Job的初始化、启动和销毁等工作,目前的2个主要子类为XxlJobSimpleExecutor和XxlSpringExecutor。

  • XxlJobSimpleExecutor 提供不依赖Spring框架的实现方式。也就是说我不用Spring框架,使用纯Java代码也能使用XxlJob。
  • XxlSpringExecutor 提供基于Spring框架的实现方式。

        XxlJobSimpleExecutor和XxlSpringExecutor都继承了XxlJobExecutor, XxlJobExecutor提供注册Job、初始化Server等功能、核心方法 registJobHandler、initEmbedServer。

               

        注入Job的方式有2种: 基于Spring的Bean 和 纯Java(不使用Spring框架)两种。

1. 使用Spring框架注入

        覆盖XxlJobSpringExecutor, 使用@Value注解读取application.properties里的配置。

package com.xxl.job.executor.core.config;

import com.xxl.job.core.executor.impl.XxlJobSpringExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * xxl-job config
 *
 * @author xuxueli 2017-04-28
 */
@Configuration
public class XxlJobConfig 
    private Logger logger = LoggerFactory.getLogger(XxlJobConfig.class);

    @Value("$xxl.job.admin.addresses")
    private String adminAddresses;

    @Value("$xxl.job.accessToken")
    private String accessToken;

    @Value("$xxl.job.executor.appname")
    private String appname;

    @Value("$xxl.job.executor.address")
    private String address;

    @Value("$xxl.job.executor.ip")
    private String ip;

    @Value("$xxl.job.executor.port")
    private int port;

    @Value("$xxl.job.executor.logpath")
    private String logPath;

    @Value("$xxl.job.executor.logretentiondays")
    private int logRetentionDays;


    @Bean
    public XxlJobSpringExecutor xxlJobExecutor() 
        logger.info(">>>>>>>>>>> xxl-job config init.");
        XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();
        xxlJobSpringExecutor.setAdminAddresses(adminAddresses);
        xxlJobSpringExecutor.setAppname(appname);
        xxlJobSpringExecutor.setAddress(address);
        xxlJobSpringExecutor.setIp(ip);
        xxlJobSpringExecutor.setPort(port);
        xxlJobSpringExecutor.setAccessToken(accessToken);
        xxlJobSpringExecutor.setLogPath(logPath);
        xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays);

        return xxlJobSpringExecutor;
    

    /**
     * 针对多网卡、容器内部署等情况,可借助 "spring-cloud-commons" 提供的 "InetUtils" 组件灵活定制注册IP;
     *
     *      1、引入依赖:
     *          <dependency>
     *             <groupId>org.springframework.cloud</groupId>
     *             <artifactId>spring-cloud-commons</artifactId>
     *             <version>$version</version>
     *         </dependency>
     *
     *      2、配置文件,或者容器启动变量
     *          spring.cloud.inetutils.preferred-networks: 'xxx.xxx.xxx.'
     *
     *      3、获取IP
     *          String ip_ = inetUtils.findFirstNonLoopbackHostInfo().getIpAddress();
     */


配置application.properties文件: 

### xxl-job admin address list, such as "http://address" or "http://address01,http://address02"
xxl.job.admin.addresses=http://127.0.0.1:8080/xxl-job-admin

### xxl-job, access token
xxl.job.accessToken=

### xxl-job executor appname
xxl.job.executor.appname=xxl-job-executor-sample
### xxl-job executor registry-address: default use address to registry , otherwise use ip:port if address is null
xxl.job.executor.address=
### xxl-job executor server-info
xxl.job.executor.ip=
xxl.job.executor.port=9998
### xxl-job executor log-path
xxl.job.executor.logpath=/data/applogs/xxl-job/jobhandler
### xxl-job executor log-retention-days
xxl.job.executor.logretentiondays=30

创建一个Bean类SampleXxlJob, 每一个被@XxlJob标记方法都是一个Job,使用@XxlJob注解标记方法即可。

package com.xxl.job.executor.service.jobhandler;

import com.xxl.job.core.context.XxlJobHelper;
import com.xxl.job.core.handler.annotation.XxlJob;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;

import java.io.BufferedInputStream;
import java.io.BufferedReader;
import java.io.DataOutputStream;
import java.io.InputStreamReader;
import java.net.HttpURLConnection;
import java.net.URL;
import java.util.Arrays;
import java.util.concurrent.TimeUnit;

/**
 * XxlJob开发示例(Bean模式)
 *
 * 开发步骤:
 *      1、任务开发:在Spring Bean实例中,开发Job方法;
 *      2、注解配置:为Job方法添加注解 "@XxlJob(value="自定义jobhandler名称", init = "JobHandler初始化方法", destroy = "JobHandler销毁方法")",注解value值对应的是调度中心新建任务的JobHandler属性的值。
 *      3、执行日志:需要通过 "XxlJobHelper.log" 打印执行日志;
 *      4、任务结果:默认任务结果为 "成功" 状态,不需要主动设置;如有诉求,比如设置任务结果为失败,可以通过 "XxlJobHelper.handleFail/handleSuccess" 自主设置任务结果;
 *
 * @author xuxueli 2019-12-11 21:52:51
 */
@Component
public class SampleXxlJob 
    private static Logger logger = LoggerFactory.getLogger(SampleXxlJob.class);


    /**
     * 1、简单任务示例(Bean模式)
     */
    @XxlJob("demoJobHandler")
    public void demoJobHandler() throws Exception 
        XxlJobHelper.log("XXL-JOB, Hello World.");

        for (int i = 0; i < 5; i++) 
            XxlJobHelper.log("beat at:" + i);
            TimeUnit.SECONDS.sleep(2);
        
        // default success
    


    /**
     * 2、分片广播任务
     */
    @XxlJob("shardingJobHandler")
    public void shardingJobHandler() throws Exception 

        // 分片参数
        int shardIndex = XxlJobHelper.getShardIndex();
        int shardTotal = XxlJobHelper.getShardTotal();

        XxlJobHelper.log("分片参数:当前分片序号 = , 总分片数 = ", shardIndex, shardTotal);

        // 业务逻辑
        for (int i = 0; i < shardTotal; i++) 
            if (i == shardIndex) 
                XxlJobHelper.log("第  片, 命中分片开始处理", i);
             else 
                XxlJobHelper.log("第  片, 忽略", i);
            
        

    


    /**
     * 3、命令行任务
     */
    @XxlJob("commandJobHandler")
    public void commandJobHandler() throws Exception 
        String command = XxlJobHelper.getJobParam();
        int exitValue = -1;

        BufferedReader bufferedReader = null;
        try 
            // command process
            ProcessBuilder processBuilder = new ProcessBuilder();
            processBuilder.command(command);
            processBuilder.redirectErrorStream(true);

            Process process = processBuilder.start();
            //Process process = Runtime.getRuntime().exec(command);

            BufferedInputStream bufferedInputStream = new BufferedInputStream(process.getInputStream());
            bufferedReader = new BufferedReader(new InputStreamReader(bufferedInputStream));

            // command log
            String line;
            while ((line = bufferedReader.readLine()) != null) 
                XxlJobHelper.log(line);
            

            // command exit
            process.waitFor();
            exitValue = process.exitValue();
         catch (Exception e) 
            XxlJobHelper.log(e);
         finally 
            if (bufferedReader != null) 
                bufferedReader.close();
            
        

        if (exitValue == 0) 
            // default success
         else 
            XxlJobHelper.handleFail("command exit value("+exitValue+") is failed");
        

    


    /**
     * 4、跨平台Http任务
     *  参数示例:
     *      "url: http://www.baidu.com\\n" +
     *      "method: get\\n" +
     *      "data: content\\n";
     */
    @XxlJob("httpJobHandler")
    public void httpJobHandler() throws Exception 

        // param parse
        String param = XxlJobHelper.getJobParam();
        if (param==null || param.trim().length()==0) 
            XxlJobHelper.log("param["+ param +"] invalid.");

            XxlJobHelper.handleFail();
            return;
        

        String[] httpParams = param.split("\\n");
        String url = null;
        String method = null;
        String data = null;
        for (String httpParam: httpParams) 
            if (httpParam.startsWith("url:")) 
                url = httpParam.substring(httpParam.indexOf("url:") + 4).trim();
            
            if (httpParam.startsWith("method:")) 
                method = httpParam.substring(httpParam.indexOf("method:") + 7).trim().toUpperCase();
            
            if (httpParam.startsWith("data:")) 
                data = httpParam.substring(httpParam.indexOf("data:") + 5).trim();
            
        

        // param valid
        if (url==null || url.trim().length()==0) 
            XxlJobHelper.log("url["+ url +"] invalid.");

            XxlJobHelper.handleFail();
            return;
        
        if (method==null || !Arrays.asList("GET", "POST").contains(method)) 
            XxlJobHelper.log("method["+ method +"] invalid.");

            XxlJobHelper.handleFail();
            return;
        
        boolean isPostMethod = method.equals("POST");

        // request
        HttpURLConnection connection = null;
        BufferedReader bufferedReader = null;
        try 
            // connection
            URL realUrl = new URL(url);
            connection = (HttpURLConnection) realUrl.openConnection();

            // connection setting
            connection.setRequestMethod(method);
            connection.setDoOutput(isPostMethod);
            connection.setDoInput(true);
            connection.setUseCaches(false);
            connection.setReadTimeout(5 * 1000);
            connection.setConnectTimeout(3 * 1000);
            connection.setRequestProperty("connection", "Keep-Alive");
            connection.setRequestProperty("Content-Type", "application/json;charset=UTF-8");
            connection.setRequestProperty("Accept-Charset", "application/json;charset=UTF-8");

            // do connection
            connection.connect();

            // data
            if (isPostMethod && data!=null && data.trim().length()>0) 
                DataOutputStream dataOutputStream = new DataOutputStream(connection.getOutputStream());
                dataOutputStream.write(data.getBytes("UTF-8"));
                dataOutputStream.flush();
                dataOutputStream.close();
            

            // valid StatusCode
            int statusCode = connection.getResponseCode();
            if (statusCode != 200) 
                throw new RuntimeException("Http Request StatusCode(" + statusCode + ") Invalid.");
            

            // result
            bufferedReader = new BufferedReader(new InputStreamReader(connection.getInputStream(), "UTF-8"));
            StringBuilder result = new StringBuilder();
            String line;
            while ((line = bufferedReader.readLine()) != null) 
                result.append(line);
            
            String responseMsg = result.toString();

            XxlJobHelper.log(responseMsg);

            return;
         catch (Exception e) 
            XxlJobHelper.log(e);

            XxlJobHelper.handleFail();
            return;
         finally 
            try 
                if (bufferedReader != null) 
                    bufferedReader.close();
                
                if (connection != null) 
                    connection.disconnect();
                
             catch (Exception e2) 
                XxlJobHelper.log(e2);
            
        

    

    /**
     * 5、生命周期任务示例:任务初始化与销毁时,支持自定义相关逻辑;
     */
    @XxlJob(value = "demoJobHandler2", init = "init", destroy = "destroy")
    public void demoJobHandler2() throws Exception 
        XxlJobHelper.log("XXL-JOB, Hello World.");
    
    public void init()
        logger.info("init");
    
    public void destroy()
        logger.info("destroy");
    



2. 不使用框架注入

        不使用Spring框架也能实现Bean的注入,使用类加载器调用getResourceAsStream方法读取到Properties对象实例里,然后初始化XxlJobExecutor的子类, 在init的时候将所有的声明@XxlJob的类作为bean设置在List<Object> beans里。

package com.bing.sh.job.config;

import com.bing.sh.job.executor.SimpleExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.Configuration;

import java.io.IOException;
import java.io.InputStreamReader;
import java.util.List;
import java.util.Properties;


@Configuration
public class FrameLessXxlJobConfig 

    private Logger logger = LoggerFactory.getLogger(FrameLessXxlJobConfig.class);


    // singleTon
    private static final FrameLessXxlJobConfig instance = new FrameLessXxlJobConfig();

    public static FrameLessXxlJobConfig getInstance() 
        return instance;
    


    public SimpleExecutor initXxlJobExecutor(String appName, List<Object> beanLists) 
        Properties xxlJobProp = loadProperties("xxl-job-executor.properties");
        // init executor
        SimpleExecutor xxlJobExecutor = new SimpleExecutor();
        xxlJobExecutor.setAdminAddresses(xxlJobProp.getProperty("xxl.job.admin.addresses"));
        xxlJobExecutor.setAccessToken(xxlJobProp.getProperty("xxl.job.accessToken"));
        xxlJobExecutor.setAppname(appName);
        xxlJobExecutor.setAddress(xxlJobProp.getProperty("xxl.job.executor.address"));
        xxlJobExecutor.setIp(xxlJobProp.getProperty("xxl.job.executor.ip"));
        xxlJobExecutor.setPort(Integer.valueOf(xxlJobProp.getProperty("xxl.job.executor.port")));
        xxlJobExecutor.setLogPath(xxlJobProp.getProperty("xxl.job.executor.logpath"));
        xxlJobExecutor.setLogRetentionDays(Integer.valueOf(xxlJobProp.getProperty("xxl.job.executor.logretentiondays")));

        xxlJobExecutor.setXxlJobBeanLists(beanLists);
        try 
            xxlJobExecutor.start();
         catch (Exception e) 
            logger.error(e.getMessage(), e);
        
        return xxlJobExecutor;
    

    public Properties loadProperties(String fileName) 
        InputStreamReader isr = null;
        try 
            ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
            isr = new InputStreamReader(classLoader.getResourceAsStream(fileName), "utf-8");
            if (isr != null) 
                Properties prop = new Properties();
                prop.load(isr);
                return prop;
            
         catch (IOException e) 
            logger.error("load  propeties  error");
        
        return null;

    



        推荐采用第二种方式注入,分布式环境下我们可以使用第二种方式注入,将executor打成jar包,然后在微服务里扫描所有包含@XxlJob的bean, 每个依赖的服务只需要配置自己服务的appName即可。

xxl.job.executor.appname=xxl-job-user-service

        当然也可以采用Springboot的形式注入,只是在配置时,我们需要在每个服务里注入xxlJob的admin url和executor的所有相关信息。

3. 使用jar包的形式集成executor

        新创建一个base-service project ,  将executor的公共的配置放入到base-service里, 执行器的端口设置为:9998。

### xxl-job admin address list, such as "http://address" or "http://address01,http://address02"
xxl.job.admin.addresses=http://127.0.0.1:8000/xxl-job-admin

### xxl-job, access token
xxl.job.accessToken=

### xxl-job executor appname
#xxl.job.executor.appname=xxl-job-executor-sample
### xxl-job executor registry-address: default use address to registry , otherwise use ip:port if address is null
xxl.job.executor.address=
### xxl-job executor server-info
xxl.job.executor.ip=
xxl.job.executor.port=9998
### xxl-job executor log-path
xxl.job.executor.logpath=/data/applogs/xxl-job/jobhandler
### xxl-job executor log-retention-days
xxl.job.executor.logretentiondays=30

打包发布到本地仓库和私服,添加pom.xml配置:


    <!--将本地jar发布到私服-->
    <distributionManagement>
        <repository>
            <id>maven-releases</id>
            <url>http://192.168.31.129:30081/repository/maven-releases/</url>
        </repository>

        <snapshotRepository>
            <id>maven-snapshots</id>
            <name>Internal Snapshots</name>
            <url>http://192.168.31.129:30081/repository/snapshots/</url>
        </snapshotRepository>

    </distributionManagement>

执行命令:  

mvn clean install package deploy

在自己的服务里添加base-service依赖:

<dependency>
                <groupId>com.bing.sh</groupId>
                <artifactId>base-service</artifactId>
                <version>0.0.1-release</version>
</dependency>

在user-service里的application.properties文件里配置appName:

# xxlJob
xxl.job.executor.appname=xxl-job-user-service

注入appName和所有的bean。

package com.bingbing.sh.config;

import com.bing.sh.job.config.FrameLessXxlJobConfig;
import com.bingbing.sh.job.UserJobHandler;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.Arrays;

@Configuration
public class XxlJobConfig 


    @Value("$xxl.job.executor.appname")
    private String appName;


    @Bean
    public void initJobExecutor() 
        FrameLessXxlJobConfig frameLessXxlJobConfig = new FrameLessXxlJobConfig();
        frameLessXxlJobConfig.initXxlJobExecutor(appName, Arrays.asList(new UserJobHandler()));
    


二、XxlJob 核心工作原理

1. 注册JobHandler

       Job处理器是XxlJob中调度的单位,也是最终调用目标的任务的载体,所有的Job处理器注册在了一个ConcurrentHashMap里,  在XxlJobExecutor类里,其中map的key 为@XxlJob(value=''')的value值, map的value 一个IJobHandler接口的实例实现。

private static ConcurrentMap<String, IJobHandler> jobHandlerRepository 
    = new ConcurrentHashMap<String, IJobHandler>();

         IJobHandler有3个实现,分别为GlueJobHandler、MethodJobHandler和ScriptJobHandler。

        其中MethodJobHandler能基本满足我们日常的开发需求。 

handler名称描述
GlueJobHandler提供GLUE任务的处理器。
MethodJobHandler提供常规的Bean模式方法Job处理器。
ScriptJobHandler提供脚本处理器。

         最新版本支持生命周期模式,提供init和destroy的存放方法,initMethod和destroyMethod,用法:

        实例化一个MethodJobHandler, 通过clazz获取到Method对象。

    registJobHandler(name, new MethodJobHandler(bean, executeMethod, initMethod, destroyMethod));

2.  注册JobThread

        JobThread是运行job的一个线程,可以看做执行Job线程载体,存放在XxlJobExecutor类里 的JobThreadRepository,它也是一个concurrentHashMap。

private static ConcurrentMap<Integer, JobThread> jobThreadRepository 
        = new ConcurrentHashMap();

        注册JobThread方法, 每次注册时会将jobId和Jobhandler作为参数实例化一个JobThread。

    public static JobThread registJobThread(int jobId, IJobHandler handler, String removeOldReason)
        JobThread newJobThread = new JobThread(jobId, handler);
        // 启动线程
        newJobThread.start();
        logger.info(">>>>>>>>>>> xxl-job regist JobThread success, jobId:, handler:", new Object[]jobId, handler);

        JobThread oldJobThread = jobThreadRepository.put(jobId, newJobThread);	// putIfAbsent | oh my god, map's put method return the old value!!!
        if (oldJobThread != null) 
            oldJobThread.toStop(removeOldReason);
            oldJobThread.interrupt();
        

        return newJobThread;
    

        直接调用newJobThread.start()启动JobThread线程,如果该job已经存在于jobThreadRepository里,那么停掉旧线程,这样能始终保证只有一个线程为Job服务,避免有些情况下会出现任务重复执行,发生定时错乱问题。

        可以通过postman调用一个http请求去kill掉该Job,查看XxlJob会在任务执行的时候,重新创建一个新的线程去替代旧线程。

        localhost:9998/kill 是executor提供的一个http请求,参数为"jobId":2。

        

         调用结果:

观察executor的控制台: 

21:23:23.916 logback [Thread-14] INFO  com.xxl.job.core.thread.JobThread - >>>>>>>>>>> xxl-job JobThread stoped, hashCode:Thread[Thread-14,10,main]
21:23:24.014 logback [xxl-rpc, EmbedServer bizThreadPool-1270369654] INFO  c.x.job.core.executor.XxlJobExecutor - >>>>>>>>>>> xxl-job regist JobThread success, jobId:2, handler:com.xxl.job.core.handler.impl.MethodJobHandler@2d99d5a5[class com.bingbing.sh.job.UserJobHandler#initUserHandler]

         也可以跟踪代码发现创建了一个新的线程去替代旧线程。

3. JobThread---- 真正执行Job的地方

         JobThread是一个自定义的线程,也是正在调用@XxlJob标记方法的地方,执行的机制是通过反射,调用的形式是通过启动JobThread线程,  在run()方法里通过handler来执行execute()方法,达到最终调用目标方法的目的。

        看下面一个Job例子,在JobThread是如何执行的呢?

    @XxlJob(value = "demoJobHandler2", init = "init", destroy = "destroy")
    public void demoJobHandler2() throws Exception 
        XxlJobHelper.log("XXL-JOB, Hello World.");
    
    public void init()
        logger.info("init");
    
    public void destroy()
        logger.info("destroy");
    

         在run方法里会首先从triggerQueue里poll一个triggerParam, triggerParam 是启动job的一组参数集,在admin 页面 启动任务时将初始化triggerParam, 下一节会提到triggerParam。

        根据调试,默认的getExecutorTimeout() 的值为0,因此直接执行handler.execute() 方法, MethodJobHandler的execute方法如下:

    public void execute() throws Exception 
        Class<?>[] paramTypes = this.method.getParameterTypes();
        if (paramTypes.length > 0) 
            this.method.invoke(this.target);
         else 
            this.method.invoke(this.target);
        

    

        我们在这里看到了最终执行Job的地方是JobThread类里的handler.execute()、handler.init()和handler.destory()方法。

        让我们接着看XxlJob是如何触发执行任务的,简单讲是怎么触发JobThread的启动,是怎么实现在admin页面通过手动的控制任务的启动与终止Job的?

3. 执行一次任务

        在控制台上执行一次任务 ,点击执行:

        核心思想: 执行一次时直接触发任务,发送Http请求 /run 给executor,netty server 接收到请求后,执行run()方法----executorBiz.run(triggerParam), 最终进入JobThread,执行任务。

        接着进入到JobTriggerPoolHelper的addTrigger()方法,这里使用了线程池去执行trigger动作。

   public void addTrigger(final int jobId,
                           final TriggerTypeEnum triggerType,
                           final int failRetryCount,
                           final String executorShardingParam,
                           final String executorParam,
                           final String addressList) 

        // choose thread pool
        ThreadPoolExecutor triggerPool_ = fastTriggerPool;
        AtomicInteger jobTimeoutCount = jobTimeoutCountMap.get(jobId);
        if (jobTimeoutCount!=null && jobTimeoutCount.get() > 10)       // job-timeout 10 times in 1 min
            triggerPool_ = slowTriggerPool;
        

        // trigger
        triggerPool_.execute(new Runnable() 
            @Override
            public void run() 

                long start = System.currentTimeMillis();

                try 
                    // do trigger
                    XxlJobTrigger.trigger(jobId, triggerType, failRetryCount, executorShardingParam, executorParam, addressList);
                 catch (Exception e) 
                    logger.error(e.getMessage(), e);
                 finally 

                    // check timeout-count-map
                    long minTim_now = System.currentTimeMillis()/60000;
                    if (minTim != minTim_now) 
                        minTim = minTim_now;
                        jobTimeoutCountMap.clear();
                    

                    // incr timeout-count-map
                    long cost = System.currentTimeMillis()-start;
                    if (cost > 500)        // ob-timeout threshold 500ms
                        AtomicInteger timeoutCount = jobTimeoutCountMap.putIfAbsent(jobId, new AtomicInteger(1));
                        if (timeoutCount != null) 
                            timeoutCount.incrementAndGet();
                        
                    
                
            
        );
    

        接着进入到XxlJobTrigger类里的processTrigger方法,看processTrigger主要做了哪几件事?

1) init trigger-param,  创建一个TriggerParam实例。

2) 获取executor的address, 是从xxl_job_group表里读取出来的一个address,该address可自动注册也可在admin后台手动录入。

3) 将TriggerParam 和 address 组合,执行 runExecutor(triggerParam,address)方法。

   ReturnT<String> triggerResult = null;
        if (address != null) 
            triggerResult = runExecutor(triggerParam, address);
         else 
            triggerResult = new ReturnT<String>(ReturnT.FAIL_CODE, null);
        

4)  调用 ExecutorBiz 接口的run方法, 实现类为ExecutorBizImpl

        try 
            ExecutorBiz executorBiz = XxlJobScheduler.getExecutorBiz(address);
            runResult = executorBiz.run(triggerParam);
         catch (Exception e) 
            logger.error(">>>>>>>>>>> xxl-job trigger error, please check if the executor[] is running.", address, e);
            runResult = new ReturnT<String>(ReturnT.FAIL_CODE, ThrowableUtil.toString(e));
        

 5)  进入到 run() 方法, 执行jobThread 的实例化, 如果有JobId对应了旧的Thread,那么需要用新线程去替换。

        // replace thread (new or exists invalid)
        if (jobThread == null) 
            jobThread = XxlJobExecutor.registJobThread(triggerParam.getJobId(), jobHandler, removeOldReason);
        

        // push data to queue
        ReturnT<String> pushResult = jobThread.pushTriggerQueue(triggerParam);

 进入到registJobThread, 启动JobThread。

 JobThread 启动成功,意味着JobId对应的目标方法会被调度到。

4. 启动任务

        启动任务与执行一次的触发方式不同,执行一次直接会调用触发器,到executor的run()方法里执行JobThread, 而启动的任务则需要借助JobScheduleHelper来调度执行。

        同时将xxl_job_info表里的任务status 字段置为1,为后续定时任务判断job的状态为启动: 

XxlJobScheduler

        XxlJobScheduler是admin server 初始化的一个bean, 在spring 生命周期中的InitializingBean的afterPropertiesSet() 方法里初始化, 在Spring 容器启动的时会执行afterPropertiesSet() 方法。

public class XxlJobAdminConfig implements InitializingBean, DisposableBean 

    private static XxlJobAdminConfig adminConfig = null;
    public static XxlJobAdminConfig getAdminConfig() 
        return adminConfig;
    


    // ---------------------- XxlJobScheduler ----------------------

    private XxlJobScheduler xxlJobScheduler;

    @Override
    public void afterPropertiesSet() throws Exception 
        adminConfig = this;

        xxlJobScheduler = new XxlJobScheduler();
        xxlJobScheduler.init();
    

    @Override
    public void destroy() throws Exception 

        xxlJobScheduler.destroy();
    

...

其中XxlJobScheduler的init()方法初始化了一个JobScheduleHelper 帮助定时触发在admin页面配置的Job。

    public void init() throws Exception 
        // init i18n
        initI18n();

        // admin trigger pool start
        JobTriggerPoolHelper.toStart();

        // admin registry monitor run
        JobRegistryHelper.getInstance().start();

        // admin fail-monitor run
        JobFailMonitorHelper.getInstance().start();

        // admin lose-monitor run ( depend on JobTriggerPoolHelper )
        JobCompleteHelper.getInstance().start();

        // admin log report start
        JobLogReportHelper.getInstance().start();

        // start-schedule  ( depend on JobTriggerPoolHelper )
        JobScheduleHelper.getInstance().start();

        logger.info(">>>>>>>>> init xxl-job admin success.");
    

进入到JobScheduleHelper的start() 方法, start()方法初始化了2个线程:
1)  scheduleThread,  读取xxl_job_info的status为1的所有任务并通过pushTimeRing(int ringSecond, int jobId)方法将 JobId和下次执行时间放入到时间轮里,同时根据cron表达式刷新下次执行时间。

        注:  ringData是通过时间戳的取余计算出来的,如果有相同的key,那么计算出来的值会放在Map的value,即List<Integer>里。

2)  ringThread, 轮询时间轮,取出JobId和下次执行时间,触发Trigger。

进入scheduleThread的run方法里,执行查询xxl_job_info表 status为1的记录:

进入到pushTimeRing, TimeRing 是一个时间轮。

 TimeRing 用来存放触发时间和JobId的组合。

TimeRing

        JobScheduleHelper的start()方法里scheduleThread 将任务放到时间轮里,ringThread的daemon线程处理时间轮里的任务,时间轮需要一个线程去轮询执行,类似于kafka的时间轮机制,也就是遍历ringItemData , 然后挨个去触发Trigger。

   // ring thread
        ringThread = new Thread(new Runnable() 
            @Override
            public void run() 

                while (!ringThreadToStop) 

                    // align second
                    try 
                        TimeUnit.MILLISECONDS.sleep(1000 - System.currentTimeMillis() % 1000);
                     catch (InterruptedException e) 
                        if (!ringThreadToStop) 
                            logger.error(e.getMessage(), e);
                        
                    

                    try 
                        // second data
                        List<Integer> ringItemData = new ArrayList<>();
                        int nowSecond = Calendar.getInstance().get(Calendar.SECOND);   // 避免处理耗时太长,跨过刻度,向前校验一个刻度;
                        for (int i = 0; i < 2; i++) 
                            List<Integer> tmpData = ringData.remove( (nowSecond+60-i)%60 );
                            if (tmpData != null) 
                                ringItemData.addAll(tmpData);
                            
                        

                        // ring trigger
                        logger.debug(">>>>>>>>>>> xxl-job, time-ring beat : " + nowSecond + " = " + Arrays.asList(ringItemData) );
                        if (ringItemData.size() > 0) 
                            // do trigger
                            for (int jobId: ringItemData) 
                                // do trigger
                                JobTriggerPoolHelper.trigger(jobId, TriggerTypeEnum.CRON, -1, null, null, null);
                            
                            // clear
                            ringItemData.clear();
                        
                     catch (Exception e) 
                        if (!ringThreadToStop) 
                            logger.error(">>>>>>>>>>> xxl-job, JobScheduleHelper#ringThread error:", e);
                        
                    
                
                logger.info(">>>>>>>>>>> xxl-job, JobScheduleHelper#ringThread stop");
            
        );
        ringThread.setDaemon(true);
        ringThread.setName("xxl-job, admin JobScheduleHelper#ringThread");
        ringThread.start();

最终进入到ExecutorBizImpl的run()方法

和上述执行一次的逻辑一样会进入到XxlJobExecutor.registJobThread(int jobId, IJobHandler handler, String removeOldReason)方法,JobThread启动,调用目标方法,核心流程结束。

以上是关于分布式定时任务XxlJob用法及核心调度源码详解的主要内容,如果未能解决你的问题,请参考以下文章

XxlJob XxlJob中的多线程

XxlJob 负载均衡用法及实现原理详解

springcloud技术栈系列4:分布式定时任务

发现mariadb数据库时间晚了12个小时,xxljob定时任务调度异常

xxljob在使用过程中(每五分钟执行一次)前期正常,中间出现过一次卡住的情况,定时任务从某个时间

分布式定时调度:xxl-job 万字详解