XxlJob 分布式定时任务XxlJob用法及核心调度源码详解
Posted Dream_it_possible!
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了XxlJob 分布式定时任务XxlJob用法及核心调度源码详解相关的知识,希望对你有一定的参考价值。
目录
XxlJob是目前最流行的分布式定时任务中间件,对比quartz,代码的侵入明显少了很多,不需要每次在代码里配置job, 而XxlJobd的admin server组件提供了可视化ui, 对job和执行器能够从前端页面配置管理,简单易用,目前已经接入几百家互联网公司使用,XxlJob的强大任务调度能力为广大开发者和企业所认可,那XxlJob是怎么工作的?
Tip: 总字数22922字,阅读全文大概会花您20分钟喝茶时间~
XxlJob最新依赖版本: 2.3.0 和源码地址:
<dependency>
<groupId>com.xuxueli</groupId>
<artifactId>xxl-job-core</artifactId>
<version>2.3.0</version>
</dependency>
GitHub - xuxueli/xxl-job: A distributed task scheduling framework.(分布式任务调度平台XXL-JOB)
XxlJob主要包含2个核心模块: xxl-job-admin 和xxl-job-core。
- xxl-job-admin 提供可视化的ui页面管理执行器、Job以及查看日志等功能, 默认登录地址为: localhost:8080/xxl-job-admin, 用户名为: admin, 密码为: 123456。
- xxl-job-executor 中基于netty实现一个embedServer, 与admin server是一个独立的server ,处理任务调度请求,包含了Job的核心调度实现。
最新版本使用@XxlJob注解标记Job, 同时支持生命周期Job任务。
XxlJob的Executor组件是Job调度的核心实现,配合admin Server 完成周期调度。
一、XxlJob 的Executor
XxlJob提供了2个任务执行器,简称Executor, XxlJob通过Executor来管理所有Job的生命周期,包括Job的初始化、启动和销毁等工作,目前的2个主要子类为XxlJobSimpleExecutor和XxlSpringExecutor。
- XxlJobSimpleExecutor 提供不依赖Spring框架的实现方式。也就是说我不用Spring框架,使用纯Java代码也能使用XxlJob。
- XxlSpringExecutor 提供基于Spring框架的实现方式。
XxlJobSimpleExecutor和XxlSpringExecutor都继承了XxlJobExecutor, XxlJobExecutor提供注册Job、初始化Server等功能、核心方法 registJobHandler、initEmbedServer。
注入Job的方式有2种: 基于Spring的Bean 和 纯Java(不使用Spring框架)两种。
1. 使用Spring框架注入
覆盖XxlJobSpringExecutor, 使用@Value注解读取application.properties里的配置。
package com.xxl.job.executor.core.config;
import com.xxl.job.core.executor.impl.XxlJobSpringExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* xxl-job config
*
* @author xuxueli 2017-04-28
*/
@Configuration
public class XxlJobConfig
private Logger logger = LoggerFactory.getLogger(XxlJobConfig.class);
@Value("$xxl.job.admin.addresses")
private String adminAddresses;
@Value("$xxl.job.accessToken")
private String accessToken;
@Value("$xxl.job.executor.appname")
private String appname;
@Value("$xxl.job.executor.address")
private String address;
@Value("$xxl.job.executor.ip")
private String ip;
@Value("$xxl.job.executor.port")
private int port;
@Value("$xxl.job.executor.logpath")
private String logPath;
@Value("$xxl.job.executor.logretentiondays")
private int logRetentionDays;
@Bean
public XxlJobSpringExecutor xxlJobExecutor()
logger.info(">>>>>>>>>>> xxl-job config init.");
XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();
xxlJobSpringExecutor.setAdminAddresses(adminAddresses);
xxlJobSpringExecutor.setAppname(appname);
xxlJobSpringExecutor.setAddress(address);
xxlJobSpringExecutor.setIp(ip);
xxlJobSpringExecutor.setPort(port);
xxlJobSpringExecutor.setAccessToken(accessToken);
xxlJobSpringExecutor.setLogPath(logPath);
xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays);
return xxlJobSpringExecutor;
/**
* 针对多网卡、容器内部署等情况,可借助 "spring-cloud-commons" 提供的 "InetUtils" 组件灵活定制注册IP;
*
* 1、引入依赖:
* <dependency>
* <groupId>org.springframework.cloud</groupId>
* <artifactId>spring-cloud-commons</artifactId>
* <version>$version</version>
* </dependency>
*
* 2、配置文件,或者容器启动变量
* spring.cloud.inetutils.preferred-networks: 'xxx.xxx.xxx.'
*
* 3、获取IP
* String ip_ = inetUtils.findFirstNonLoopbackHostInfo().getIpAddress();
*/
配置application.properties文件:
### xxl-job admin address list, such as "http://address" or "http://address01,http://address02"
xxl.job.admin.addresses=http://127.0.0.1:8080/xxl-job-admin
### xxl-job, access token
xxl.job.accessToken=
### xxl-job executor appname
xxl.job.executor.appname=xxl-job-executor-sample
### xxl-job executor registry-address: default use address to registry , otherwise use ip:port if address is null
xxl.job.executor.address=
### xxl-job executor server-info
xxl.job.executor.ip=
xxl.job.executor.port=9998
### xxl-job executor log-path
xxl.job.executor.logpath=/data/applogs/xxl-job/jobhandler
### xxl-job executor log-retention-days
xxl.job.executor.logretentiondays=30
创建一个Bean类SampleXxlJob, 每一个被@XxlJob标记方法都是一个Job,使用@XxlJob注解标记方法即可。
package com.xxl.job.executor.service.jobhandler;
import com.xxl.job.core.context.XxlJobHelper;
import com.xxl.job.core.handler.annotation.XxlJob;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import java.io.BufferedInputStream;
import java.io.BufferedReader;
import java.io.DataOutputStream;
import java.io.InputStreamReader;
import java.net.HttpURLConnection;
import java.net.URL;
import java.util.Arrays;
import java.util.concurrent.TimeUnit;
/**
* XxlJob开发示例(Bean模式)
*
* 开发步骤:
* 1、任务开发:在Spring Bean实例中,开发Job方法;
* 2、注解配置:为Job方法添加注解 "@XxlJob(value="自定义jobhandler名称", init = "JobHandler初始化方法", destroy = "JobHandler销毁方法")",注解value值对应的是调度中心新建任务的JobHandler属性的值。
* 3、执行日志:需要通过 "XxlJobHelper.log" 打印执行日志;
* 4、任务结果:默认任务结果为 "成功" 状态,不需要主动设置;如有诉求,比如设置任务结果为失败,可以通过 "XxlJobHelper.handleFail/handleSuccess" 自主设置任务结果;
*
* @author xuxueli 2019-12-11 21:52:51
*/
@Component
public class SampleXxlJob
private static Logger logger = LoggerFactory.getLogger(SampleXxlJob.class);
/**
* 1、简单任务示例(Bean模式)
*/
@XxlJob("demoJobHandler")
public void demoJobHandler() throws Exception
XxlJobHelper.log("XXL-JOB, Hello World.");
for (int i = 0; i < 5; i++)
XxlJobHelper.log("beat at:" + i);
TimeUnit.SECONDS.sleep(2);
// default success
/**
* 2、分片广播任务
*/
@XxlJob("shardingJobHandler")
public void shardingJobHandler() throws Exception
// 分片参数
int shardIndex = XxlJobHelper.getShardIndex();
int shardTotal = XxlJobHelper.getShardTotal();
XxlJobHelper.log("分片参数:当前分片序号 = , 总分片数 = ", shardIndex, shardTotal);
// 业务逻辑
for (int i = 0; i < shardTotal; i++)
if (i == shardIndex)
XxlJobHelper.log("第 片, 命中分片开始处理", i);
else
XxlJobHelper.log("第 片, 忽略", i);
/**
* 3、命令行任务
*/
@XxlJob("commandJobHandler")
public void commandJobHandler() throws Exception
String command = XxlJobHelper.getJobParam();
int exitValue = -1;
BufferedReader bufferedReader = null;
try
// command process
ProcessBuilder processBuilder = new ProcessBuilder();
processBuilder.command(command);
processBuilder.redirectErrorStream(true);
Process process = processBuilder.start();
//Process process = Runtime.getRuntime().exec(command);
BufferedInputStream bufferedInputStream = new BufferedInputStream(process.getInputStream());
bufferedReader = new BufferedReader(new InputStreamReader(bufferedInputStream));
// command log
String line;
while ((line = bufferedReader.readLine()) != null)
XxlJobHelper.log(line);
// command exit
process.waitFor();
exitValue = process.exitValue();
catch (Exception e)
XxlJobHelper.log(e);
finally
if (bufferedReader != null)
bufferedReader.close();
if (exitValue == 0)
// default success
else
XxlJobHelper.handleFail("command exit value("+exitValue+") is failed");
/**
* 4、跨平台Http任务
* 参数示例:
* "url: http://www.baidu.com\\n" +
* "method: get\\n" +
* "data: content\\n";
*/
@XxlJob("httpJobHandler")
public void httpJobHandler() throws Exception
// param parse
String param = XxlJobHelper.getJobParam();
if (param==null || param.trim().length()==0)
XxlJobHelper.log("param["+ param +"] invalid.");
XxlJobHelper.handleFail();
return;
String[] httpParams = param.split("\\n");
String url = null;
String method = null;
String data = null;
for (String httpParam: httpParams)
if (httpParam.startsWith("url:"))
url = httpParam.substring(httpParam.indexOf("url:") + 4).trim();
if (httpParam.startsWith("method:"))
method = httpParam.substring(httpParam.indexOf("method:") + 7).trim().toUpperCase();
if (httpParam.startsWith("data:"))
data = httpParam.substring(httpParam.indexOf("data:") + 5).trim();
// param valid
if (url==null || url.trim().length()==0)
XxlJobHelper.log("url["+ url +"] invalid.");
XxlJobHelper.handleFail();
return;
if (method==null || !Arrays.asList("GET", "POST").contains(method))
XxlJobHelper.log("method["+ method +"] invalid.");
XxlJobHelper.handleFail();
return;
boolean isPostMethod = method.equals("POST");
// request
HttpURLConnection connection = null;
BufferedReader bufferedReader = null;
try
// connection
URL realUrl = new URL(url);
connection = (HttpURLConnection) realUrl.openConnection();
// connection setting
connection.setRequestMethod(method);
connection.setDoOutput(isPostMethod);
connection.setDoInput(true);
connection.setUseCaches(false);
connection.setReadTimeout(5 * 1000);
connection.setConnectTimeout(3 * 1000);
connection.setRequestProperty("connection", "Keep-Alive");
connection.setRequestProperty("Content-Type", "application/json;charset=UTF-8");
connection.setRequestProperty("Accept-Charset", "application/json;charset=UTF-8");
// do connection
connection.connect();
// data
if (isPostMethod && data!=null && data.trim().length()>0)
DataOutputStream dataOutputStream = new DataOutputStream(connection.getOutputStream());
dataOutputStream.write(data.getBytes("UTF-8"));
dataOutputStream.flush();
dataOutputStream.close();
// valid StatusCode
int statusCode = connection.getResponseCode();
if (statusCode != 200)
throw new RuntimeException("Http Request StatusCode(" + statusCode + ") Invalid.");
// result
bufferedReader = new BufferedReader(new InputStreamReader(connection.getInputStream(), "UTF-8"));
StringBuilder result = new StringBuilder();
String line;
while ((line = bufferedReader.readLine()) != null)
result.append(line);
String responseMsg = result.toString();
XxlJobHelper.log(responseMsg);
return;
catch (Exception e)
XxlJobHelper.log(e);
XxlJobHelper.handleFail();
return;
finally
try
if (bufferedReader != null)
bufferedReader.close();
if (connection != null)
connection.disconnect();
catch (Exception e2)
XxlJobHelper.log(e2);
/**
* 5、生命周期任务示例:任务初始化与销毁时,支持自定义相关逻辑;
*/
@XxlJob(value = "demoJobHandler2", init = "init", destroy = "destroy")
public void demoJobHandler2() throws Exception
XxlJobHelper.log("XXL-JOB, Hello World.");
public void init()
logger.info("init");
public void destroy()
logger.info("destroy");
2. 不使用框架注入
不使用Spring框架也能实现Bean的注入,使用类加载器调用getResourceAsStream方法读取到Properties对象实例里,然后初始化XxlJobExecutor的子类, 在init的时候将所有的声明@XxlJob的类作为bean设置在List<Object> beans里。
package com.bing.sh.job.config;
import com.bing.sh.job.executor.SimpleExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.Configuration;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.List;
import java.util.Properties;
@Configuration
public class FrameLessXxlJobConfig
private Logger logger = LoggerFactory.getLogger(FrameLessXxlJobConfig.class);
// singleTon
private static final FrameLessXxlJobConfig instance = new FrameLessXxlJobConfig();
public static FrameLessXxlJobConfig getInstance()
return instance;
public SimpleExecutor initXxlJobExecutor(String appName, List<Object> beanLists)
Properties xxlJobProp = loadProperties("xxl-job-executor.properties");
// init executor
SimpleExecutor xxlJobExecutor = new SimpleExecutor();
xxlJobExecutor.setAdminAddresses(xxlJobProp.getProperty("xxl.job.admin.addresses"));
xxlJobExecutor.setAccessToken(xxlJobProp.getProperty("xxl.job.accessToken"));
xxlJobExecutor.setAppname(appName);
xxlJobExecutor.setAddress(xxlJobProp.getProperty("xxl.job.executor.address"));
xxlJobExecutor.setIp(xxlJobProp.getProperty("xxl.job.executor.ip"));
xxlJobExecutor.setPort(Integer.valueOf(xxlJobProp.getProperty("xxl.job.executor.port")));
xxlJobExecutor.setLogPath(xxlJobProp.getProperty("xxl.job.executor.logpath"));
xxlJobExecutor.setLogRetentionDays(Integer.valueOf(xxlJobProp.getProperty("xxl.job.executor.logretentiondays")));
xxlJobExecutor.setXxlJobBeanLists(beanLists);
try
xxlJobExecutor.start();
catch (Exception e)
logger.error(e.getMessage(), e);
return xxlJobExecutor;
public Properties loadProperties(String fileName)
InputStreamReader isr = null;
try
ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
isr = new InputStreamReader(classLoader.getResourceAsStream(fileName), "utf-8");
if (isr != null)
Properties prop = new Properties();
prop.load(isr);
return prop;
catch (IOException e)
logger.error("load propeties error");
return null;
推荐采用第二种方式注入,分布式环境下我们可以使用第二种方式注入,将executor打成jar包,然后在微服务里扫描所有包含@XxlJob的bean, 每个依赖的服务只需要配置自己服务的appName即可。
xxl.job.executor.appname=xxl-job-user-service
当然也可以采用Springboot的形式注入,只是在配置时,我们需要在每个服务里注入xxlJob的admin url和executor的所有相关信息。
3. 使用jar包的形式集成executor
新创建一个base-service project , 将executor的公共的配置放入到base-service里, 执行器的端口设置为:9998。
### xxl-job admin address list, such as "http://address" or "http://address01,http://address02"
xxl.job.admin.addresses=http://127.0.0.1:8000/xxl-job-admin
### xxl-job, access token
xxl.job.accessToken=
### xxl-job executor appname
#xxl.job.executor.appname=xxl-job-executor-sample
### xxl-job executor registry-address: default use address to registry , otherwise use ip:port if address is null
xxl.job.executor.address=
### xxl-job executor server-info
xxl.job.executor.ip=
xxl.job.executor.port=9998
### xxl-job executor log-path
xxl.job.executor.logpath=/data/applogs/xxl-job/jobhandler
### xxl-job executor log-retention-days
xxl.job.executor.logretentiondays=30
打包发布到本地仓库和私服,添加pom.xml配置:
<!--将本地jar发布到私服-->
<distributionManagement>
<repository>
<id>maven-releases</id>
<url>http://192.168.31.129:30081/repository/maven-releases/</url>
</repository>
<snapshotRepository>
<id>maven-snapshots</id>
<name>Internal Snapshots</name>
<url>http://192.168.31.129:30081/repository/snapshots/</url>
</snapshotRepository>
</distributionManagement>
执行命令:
mvn clean install package deploy
在自己的服务里添加base-service依赖:
<dependency>
<groupId>com.bing.sh</groupId>
<artifactId>base-service</artifactId>
<version>0.0.1-release</version>
</dependency>
在user-service里的application.properties文件里配置appName:
# xxlJob
xxl.job.executor.appname=xxl-job-user-service
注入appName和所有的bean。
package com.bingbing.sh.config;
import com.bing.sh.job.config.FrameLessXxlJobConfig;
import com.bingbing.sh.job.UserJobHandler;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.Arrays;
@Configuration
public class XxlJobConfig
@Value("$xxl.job.executor.appname")
private String appName;
@Bean
public void initJobExecutor()
FrameLessXxlJobConfig frameLessXxlJobConfig = new FrameLessXxlJobConfig();
frameLessXxlJobConfig.initXxlJobExecutor(appName, Arrays.asList(new UserJobHandler()));
二、XxlJob 核心工作原理
1. 注册JobHandler
Job处理器是XxlJob中调度的单位,也是最终调用目标的任务的载体,所有的Job处理器注册在了一个ConcurrentHashMap里, 在XxlJobExecutor类里,其中map的key 为@XxlJob(value=''')的value值, map的value 一个IJobHandler接口的实例实现。
private static ConcurrentMap<String, IJobHandler> jobHandlerRepository
= new ConcurrentHashMap<String, IJobHandler>();
registJobHandler(name, new MethodJobHandler(bean, executeMethod, initMethod, destroyMethod));
IJobHandler有3个实现,分别为GlueJobHandler、MethodJobHandler和ScriptJobHandler。
handler名称 | 描述 |
GlueJobHandler | 提供GLUE任务的处理器。 |
MethodJobHandler | 提供常规的Bean模式方法Job处理器。 |
ScriptJobHandler | 提供脚本处理器。 |
其中MethodJobHandler能基本满足我们日常的开发需求。
最新版本支持生命周期模式,提供init和destroy的存放方法,MethodHandler包含3个Method属性: executeMethod 、initMethod和destroyMethod,用法:
实例化一个MethodJobHandler,然后根据XxlJob注解里的定义的init、destory和value值找到对应的method对象,封装到MethodJobHandler里。
2. 注册JobThread
JobThread是运行job的一个线程,可以看做执行Job线程载体,存放在XxlJobExecutor类里 的JobThreadRepository,它也是一个concurrentHashMap。
private static ConcurrentMap<Integer, JobThread> jobThreadRepository
= new ConcurrentHashMap();
注册JobThread方法, 每次注册时会将jobId和Jobhandler作为参数实例化一个JobThread。
public static JobThread registJobThread(int jobId, IJobHandler handler, String removeOldReason)
JobThread newJobThread = new JobThread(jobId, handler);
// 启动线程
newJobThread.start();
logger.info(">>>>>>>>>>> xxl-job regist JobThread success, jobId:, handler:", new Object[]jobId, handler);
JobThread oldJobThread = jobThreadRepository.put(jobId, newJobThread); // putIfAbsent | oh my god, map's put method return the old value!!!
if (oldJobThread != null)
oldJobThread.toStop(removeOldReason);
oldJobThread.interrupt();
return newJobThread;
直接调用newJobThread.start()启动JobThread线程,如果该job已经存在于jobThreadRepository里,那么停掉旧线程,这样能始终保证只有一个线程为Job服务,避免有些情况下会出现任务重复执行,发生定时错乱问题。
可以通过postman调用一个http请求去kill掉该Job,查看XxlJob会在任务执行的时候,重新创建一个新的线程去替代旧线程。
localhost:9998/kill 是executor提供的一个http请求,参数为"jobId":2。
调用结果:
观察executor的控制台:
21:23:23.916 logback [Thread-14] INFO com.xxl.job.core.thread.JobThread - >>>>>>>>>>> xxl-job JobThread stoped, hashCode:Thread[Thread-14,10,main]
21:23:24.014 logback [xxl-rpc, EmbedServer bizThreadPool-1270369654] INFO c.x.job.core.executor.XxlJobExecutor - >>>>>>>>>>> xxl-job regist JobThread success, jobId:2, handler:com.xxl.job.core.handler.impl.MethodJobHandler@2d99d5a5[class com.bingbing.sh.job.UserJobHandler#initUserHandler]
也可以跟踪代码发现创建了一个新的线程去替代旧线程。
3. JobThread---- 真正执行Job的地方
JobThread是一个自定义的线程,也是正在调用@XxlJob标记方法的地方,执行的机制是通过反射,调用的形式是通过启动JobThread线程, 在run()方法里通过handler来执行execute()方法,达到最终调用目标方法的目的。
看下面一个Job例子,在JobThread是如何执行的呢?
@XxlJob(value = "demoJobHandler2", init = "init", destroy = "destroy")
public void demoJobHandler2() throws Exception
XxlJobHelper.log("XXL-JOB, Hello World.");
public void init()
logger.info("init");
public void destroy()
logger.info("destroy");
在run方法里会首先从triggerQueue里poll一个triggerParam, triggerParam 是启动job的一组参数集,在admin 页面 启动任务时将初始化triggerParam, 下一节会提到triggerParam。
根据调试,默认的getExecutorTimeout() 的值为0,因此直接执行handler.execute() 方法, MethodJobHandler的execute方法如下:
public void execute() throws Exception
Class<?>[] paramTypes = this.method.getParameterTypes();
if (paramTypes.length > 0)
this.method.invoke(this.target);
else
this.method.invoke(this.target);
我们在这里看到了最终执行Job的地方是JobThread类里的handler.execute()、handler.init()和handler.destory()方法。
让我们接着看XxlJob是如何触发执行任务的,简单讲是怎么触发JobThread的启动,是怎么实现在admin页面通过手动的控制任务的启动与终止Job的?
3. 执行一次任务
在控制台上执行一次任务 ,点击执行:
核心思想: 执行一次时直接触发任务,发送Http请求 /run 给executor,netty server 接收到请求后,执行run()方法----executorBiz.run(triggerParam), 最终进入JobThread,执行任务。
接着进入到JobTriggerPoolHelper的addTrigger()方法,这里使用了线程池去执行trigger动作。
public void addTrigger(final int jobId,
final TriggerTypeEnum triggerType,
final int failRetryCount,
final String executorShardingParam,
final String executorParam,
final String addressList)
// choose thread pool
ThreadPoolExecutor triggerPool_ = fastTriggerPool;
AtomicInteger jobTimeoutCount = jobTimeoutCountMap.get(jobId);
if (jobTimeoutCount!=null && jobTimeoutCount.get() > 10) // job-timeout 10 times in 1 min
triggerPool_ = slowTriggerPool;
// trigger
triggerPool_.execute(new Runnable()
@Override
public void run()
long start = System.currentTimeMillis();
try
// do trigger
XxlJobTrigger.trigger(jobId, triggerType, failRetryCount, executorShardingParam, executorParam, addressList);
catch (Exception e)
logger.error(e.getMessage(), e);
finally
// check timeout-count-map
long minTim_now = System.currentTimeMillis()/60000;
if (minTim != minTim_now)
minTim = minTim_now;
jobTimeoutCountMap.clear();
// incr timeout-count-map
long cost = System.currentTimeMillis()-start;
if (cost > 500) // ob-timeout threshold 500ms
AtomicInteger timeoutCount = jobTimeoutCountMap.putIfAbsent(jobId, new AtomicInteger(1));
if (timeoutCount != null)
timeoutCount.incrementAndGet();
);
接着进入到XxlJobTrigger类里的processTrigger方法,看processTrigger主要做了哪几件事?
1) init trigger-param, 创建一个TriggerParam实例。
2) 获取executor的address, 是从xxl_job_group表里读取出来的一个address,该address可自动注册也可在admin后台手动录入。
3) 将TriggerParam 和 address 组合,执行 runExecutor(triggerParam,address)方法。
ReturnT<String> triggerResult = null;
if (address != null)
triggerResult = runExecutor(triggerParam, address);
else
triggerResult = new ReturnT<String>(ReturnT.FAIL_CODE, null);
4) 调用 ExecutorBiz 接口的run方法, 实现类为ExecutorBizImpl
try
ExecutorBiz executorBiz = XxlJobScheduler.getExecutorBiz(address);
runResult = executorBiz.run(triggerParam);
catch (Exception e)
logger.error(">>>>>>>>>>> xxl-job trigger error, please check if the executor[] is running.", address, e);
runResult = new ReturnT<String>(ReturnT.FAIL_CODE, ThrowableUtil.toString(e));
5) 进入到 run() 方法, 执行jobThread 的实例化, 如果有JobId对应了旧的Thread,那么需要用新线程去替换。
// replace thread (new or exists invalid)
if (jobThread == null)
jobThread = XxlJobExecutor.registJobThread(triggerParam.getJobId(), jobHandler, removeOldReason);
// push data to queue
ReturnT<String> pushResult = jobThread.pushTriggerQueue(triggerParam);
进入到registJobThread, 启动JobThread。
JobThread 启动成功,意味着JobId对应的目标方法会被调度到。
4. 启动任务
启动任务与执行一次的触发方式不同,执行一次直接会调用触发器,到executor的run()方法里执行JobThread, 而启动的任务则需要借助JobScheduleHelper来调度执行。
同时将xxl_job_info表里的任务status 字段置为1,为后续定时任务判断job的状态为启动:
XxlJobScheduler
XxlJobScheduler是admin server 初始化的一个bean, 在spring 生命周期中的InitializingBean的afterPropertiesSet() 方法里初始化, 在Spring 容器启动的时会执行afterPropertiesSet() 方法。
public class XxlJobAdminConfig implements InitializingBean, DisposableBean
private static XxlJobAdminConfig adminConfig = null;
public static XxlJobAdminConfig getAdminConfig()
return adminConfig;
// ---------------------- XxlJobScheduler ----------------------
private XxlJobScheduler xxlJobScheduler;
@Override
public void afterPropertiesSet() throws Exception
adminConfig = this;
xxlJobScheduler = new XxlJobScheduler();
xxlJobScheduler.init();
@Override
public void destroy() throws Exception
xxlJobScheduler.destroy();
...
其中XxlJobScheduler的init()方法初始化了一个JobScheduleHelper 帮助定时触发在admin页面配置的Job。
public void init() throws Exception
// init i18n
initI18n();
// admin trigger pool start
JobTriggerPoolHelper.toStart();
// admin registry monitor run
JobRegistryHelper.getInstance().start();
// admin fail-monitor run
JobFailMonitorHelper.getInstance().start();
// admin lose-monitor run ( depend on JobTriggerPoolHelper )
JobCompleteHelper.getInstance().start();
// admin log report start
JobLogReportHelper.getInstance().start();
// start-schedule ( depend on JobTriggerPoolHelper )
JobScheduleHelper.getInstance().start();
logger.info(">>>>>>>>> init xxl-job admin success.");
进入到JobScheduleHelper的start() 方法, start()方法初始化了2个线程:
1) scheduleThread, 读取xxl_job_info的status为1的所有任务并通过pushTimeRing(int ringSecond, int jobId)方法将 JobId和下次执行时间放入到时间轮里,同时根据cron表达式刷新下次执行时间。
注: ringData是通过时间戳的取余计算出来的,以一分钟为刻度,每一秒可以作为一个key, 如果有相同的key,那么计算出来的值会放在Map的value,即List<Integer>里。
int ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)%60);
2) ringThread, 轮询时间轮,取出JobId和下次执行时间,触发Trigger。
进入scheduleThread的run方法里,执行查询xxl_job_info表 status为1的记录:
进入到pushTimeRing, TimeRing 是一个时间轮。
TimeRing 用来存放触发时间和JobId的组合。
TimeRing
JobScheduleHelper的start()方法里scheduleThread 将任务放到时间轮里,ringThread的daemon线程处理时间轮里的任务,时间轮需要一个线程去轮询执行,类似于kafka的时间轮机制,也就是遍历ringItemData , 然后挨个去触发Trigger。
存放任务
ringData是一个map, key 为任务的时间戳,JobId为任务id, 如果相同时间内有多个任务,那么用List<Integer>存放任务Id列表。
private void pushTimeRing(int ringSecond, int jobId)
// push async ring
List<Integer> ringItemData = ringData.get(ringSecond);
if (ringItemData == null)
ringItemData = new ArrayList<Integer>();
ringData.put(ringSecond, ringItemData);
ringItemData.add(jobId);
logger.debug(">>>>>>>>>>> xxl-job, schedule push time-ring : " + ringSecond + " = " + Arrays.asList(ringItemData) );
取出任务
根据当前时间取出ringData里的任务id列表,然后轮询任务id列表,轮询执行trigger。
// ring thread
ringThread = new Thread(new Runnable()
@Override
public void run()
while (!ringThreadToStop)
// align second
try
TimeUnit.MILLISECONDS.sleep(1000 - System.currentTimeMillis() % 1000);
catch (InterruptedException e)
if (!ringThreadToStop)
logger.error(e.getMessage(), e);
try
// second data
List<Integer> ringItemData = new ArrayList<>();
int nowSecond = Calendar.getInstance().get(Calendar.SECOND); // 避免处理耗时太长,跨过刻度,向前校验一个刻度;
for (int i = 0; i < 2; i++)
List<Integer> tmpData = ringData.remove( (nowSecond+60-i)%60 );
if (tmpData != null)
ringItemData.addAll(tmpData);
// ring trigger
logger.debug(">>>>>>>>>>> xxl-job, time-ring beat : " + nowSecond + " = " + Arrays.asList(ringItemData) );
if (ringItemData.size() > 0)
// do trigger
for (int jobId: ringItemData)
// do trigger
JobTriggerPoolHelper.trigger(jobId, TriggerTypeEnum.CRON, -1, null, null, null);
// clear
ringItemData.clear();
catch (Exception e)
if (!ringThreadToStop)
logger.error(">>>>>>>>>>> xxl-job, JobScheduleHelper#ringThread error:", e);
logger.info(">>>>>>>>>>> xxl-job, JobScheduleHelper#ringThread stop");
);
ringThread.setDaemon(true);
ringThread.setName("xxl-job, admin JobScheduleHelper#ringThread");
ringThread.start();
最终进入到ExecutorBizImpl的run()方法
和上述执行一次的逻辑一样会进入到XxlJobExecutor.registJobThread(int jobId, IJobHandler handler, String removeOldReason)方法,JobThread启动,调用目标方法,核心流程结束。
以上是关于XxlJob 分布式定时任务XxlJob用法及核心调度源码详解的主要内容,如果未能解决你的问题,请参考以下文章
xxljob在使用过程中(每五分钟执行一次)前期正常,中间出现过一次卡住的情况,定时任务从某个时间