XXL-JOB分析(一任务执行的过程源码分析)
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了XXL-JOB分析(一任务执行的过程源码分析)相关的知识,希望对你有一定的参考价值。
xxl-job介绍xxl-job是一个分布式任务调度系统,基于quartz实现调度器。
1、quartz是基于数据库for update实现锁,来保证同一个任务同一时间只会执行一次。
2、最新版本的xxl-job已经摒弃了quartz.
xxl-job核心模块
1、调度中心,也就是任务的管理系统
2、执行器,任务真正的执行服务,一般是分布式的服务。
任务执行过程
1、调度中心
1.1 点击执行的时候,入口JobInfoController.triggerJob方法:
进入JobTriggerPoolHelper.trigger方法,调用了JobTriggerPoolHelper.addTrigger方法,那么看一下addTrigger方法:
/**
* add trigger
*/
public void addTrigger(final int jobId, final TriggerTypeEnum triggerType, final int failRetryCount, final String executorShardingParam, final String executorParam)
// choose thread pool
//这里区分了快慢线程池,1分钟内超过500毫秒的请求大于10次,放入慢线程池处理
//快线程池默认8个核心线程,最大线程200,任务队列1000
//慢线程池默认0个核心线程,最大线程100,任务队列2000
ThreadPoolExecutor triggerPool_ = fastTriggerPool;
AtomicInteger jobTimeoutCount = jobTimeoutCountMap.get(jobId);
if (jobTimeoutCount!=null && jobTimeoutCount.get() > 10) // job-timeout 10 times in 1 min
triggerPool_ = slowTriggerPool;
// trigger
triggerPool_.execute(new Runnable()
@Override
public void run()
long start = System.currentTimeMillis();
try
//触发执行任务
XxlJobTrigger.trigger(jobId, triggerType, failRetryCount, executorShardingParam, executorParam);
catch (Exception e)
logger.error(e.getMessage(), e);
finally
// check timeout-count-map
long minTim_now = System.currentTimeMillis()/60000;
if (minTim != minTim_now)
minTim = minTim_now;
jobTimeoutCountMap.clear();
// incr timeout-count-map
long cost = System.currentTimeMillis()-start;
if (cost > 500) //耗时超过500ms就计算为慢请求,加入慢线程池。
AtomicInteger timeoutCount = jobTimeoutCountMap.put(jobId, new AtomicInteger(1));
if (timeoutCount != null)
timeoutCount.incrementAndGet();
);
进入执行任务:XxlJobTrigger.trigger
public static void trigger(int jobId, TriggerTypeEnum triggerType, int failRetryCount, String executorShardingParam, String executorParam)
// load data
//通过JobId从数据库中查询该任务的具体信息
XxlJobInfo jobInfo = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().loadById(jobId);
if (jobInfo == null)
logger.warn(">>>>>>>>>>>> trigger fail, jobId invalid,jobId=", jobId);
return;
if (executorParam != null)
jobInfo.setExecutorParam(executorParam);
int finalFailRetryCount = failRetryCount>=0?failRetryCount:jobInfo.getExecutorFailRetryCount();
//获取该类型的执行器信息
XxlJobGroup group = XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().load(jobInfo.getJobGroup());
// sharding param
//分片信息
int[] shardingParam = null;
if (executorShardingParam!=null)
String[] shardingArr = executorShardingParam.split("/");
if (shardingArr.length==2 && isNumeric(shardingArr[0]) && isNumeric(shardingArr[1]))
shardingParam = new int[2];
shardingParam[0] = Integer.valueOf(shardingArr[0]);
shardingParam[1] = Integer.valueOf(shardingArr[1]);
//广播模式,循环执行器配置的服务地址列表
if (ExecutorRouteStrategyEnum.SHARDING_BROADCAST==ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(), null)
&& group.getRegistryList()!=null && !group.getRegistryList().isEmpty()
&& shardingParam==null)
for (int i = 0; i < group.getRegistryList().size(); i++)
processTrigger(group, jobInfo, finalFailRetryCount, triggerType, i, group.getRegistryList().size());
else
if (shardingParam == null)
shardingParam = new int[]0, 1;
//非广播模式进入
processTrigger(group, jobInfo, finalFailRetryCount, triggerType, shardingParam[0], shardingParam[1]);
进入processTrigger方法,组装任务参数,选择路由和阻塞策略
private static void processTrigger(XxlJobGroup group, XxlJobInfo jobInfo, int finalFailRetryCount, TriggerTypeEnum triggerType, int index, int total)
//阻塞策略
ExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(jobInfo.getExecutorBlockStrategy(), ExecutorBlockStrategyEnum.SERIAL_EXECUTION); // block strategy
//路由策略,官方一共10中路由策略
ExecutorRouteStrategyEnum executorRouteStrategyEnum = ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(), null); // route strategy
String shardingParam = (ExecutorRouteStrategyEnum.SHARDING_BROADCAST==executorRouteStrategyEnum)?String.valueOf(index).concat("/").concat(String.valueOf(total)):null;
// 1、save log-id 日志信息
XxlJobLog jobLog = new XxlJobLog();
jobLog.setJobGroup(jobInfo.getJobGroup());
jobLog.setJobId(jobInfo.getId());
jobLog.setTriggerTime(new Date());
XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().save(jobLog);
logger.debug(">>>>>>>>>>> xxl-job trigger start, jobId:", jobLog.getId());
// 2、init trigger-param 初始化任务参数
//这里要增加2个参数;
// 1、固定IP,
// 2、任务为空时,默认轮训队列次数
TriggerParam triggerParam = new TriggerParam();
triggerParam.setJobId(jobInfo.getId());
triggerParam.setExecutorHandler(jobInfo.getExecutorHandler());
triggerParam.setExecutorParams(jobInfo.getExecutorParam());
triggerParam.setExecutorBlockStrategy(jobInfo.getExecutorBlockStrategy());
triggerParam.setExecutorTimeout(jobInfo.getExecutorTimeout());
triggerParam.setLogId(jobLog.getId());
triggerParam.setLogDateTim(jobLog.getTriggerTime().getTime());
triggerParam.setGlueType(jobInfo.getGlueType());
triggerParam.setGlueSource(jobInfo.getGlueSource());
triggerParam.setGlueUpdatetime(jobInfo.getGlueUpdatetime().getTime());
triggerParam.setBroadcastIndex(index);
triggerParam.setBroadcastTotal(total);
triggerParam.setAssignAddress(jobInfo.getAssignAddress());
triggerParam.setJobEmptyLoopNum(jobInfo.getJobEmptyLoopNum());
// 3、init address 选择执行器服务地址
String address = null;
ReturnT<String> routeAddre***esult = null;
if (group.getRegistryList()!=null && !group.getRegistryList().isEmpty())
if (ExecutorRouteStrategyEnum.SHARDING_BROADCAST == executorRouteStrategyEnum)
if (index < group.getRegistryList().size())
address = group.getRegistryList().get(index);
else
address = group.getRegistryList().get(0);
else
//根据路由策略,选择合适的执行器服务地址来执行任务
routeAddre***esult = executorRouteStrategyEnum.getRouter().route(triggerParam, group.getRegistryList());
if (routeAddre***esult.getCode() == ReturnT.SUCCESS_CODE)
address = routeAddre***esult.getContent();
else
routeAddre***esult = new ReturnT<String>(ReturnT.FAIL_CODE, I18nUtil.getString("jobconf_trigger_address_empty"));
// 4、trigger remote executor
ReturnT<String> triggerResult = null;
if (address != null)
//远程调用,这个是重点方法
triggerResult = runExecutor(triggerParam, address);
else
triggerResult = new ReturnT<String>(ReturnT.FAIL_CODE, null);
// 5、collection trigger info
StringBuffer triggerMsgSb = new StringBuffer();
triggerMsgSb.append(I18nUtil.getString("jobconf_trigger_type")).append(":").append(triggerType.getTitle());
triggerMsgSb.append("<br>").append(I18nUtil.getString("jobconf_trigger_admin_adress")).append(":").append(IpUtil.getIp());
triggerMsgSb.append("<br>").append(I18nUtil.getString("jobconf_trigger_exe_regtype")).append(":")
.append( (group.getAddressType() == 0)?I18nUtil.getString("jobgroup_field_addressType_0"):I18nUtil.getString("jobgroup_field_addressType_1") );
triggerMsgSb.append("<br>").append(I18nUtil.getString("jobconf_trigger_exe_regaddress")).append(":").append(group.getRegistryList());
triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_executorRouteStrategy")).append(":").append(executorRouteStrategyEnum.getTitle());
if (shardingParam != null)
triggerMsgSb.append("("+shardingParam+")");
triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_executorBlockStrategy")).append(":").append(blockStrategy.getTitle());
triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_timeout")).append(":").append(jobInfo.getExecutorTimeout());
triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_executorFailRetryCount")).append(":").append(finalFailRetryCount);
triggerMsgSb.append("<br><br><span style=\"color:#00c0ef;\" > >>>>>>>>>>>"+ I18nUtil.getString("jobconf_trigger_run") +"<<<<<<<<<<< </span><br>")
.append((routeAddre***esult!=null&&routeAddre***esult.getMsg()!=null)?routeAddre***esult.getMsg()+"<br><br>":"").append(triggerResult.getMsg()!=null?triggerResult.getMsg():"");
// 6、save log trigger-info
jobLog.setExecutorAddress(address);
jobLog.setExecutorHandler(jobInfo.getExecutorHandler());
jobLog.setExecutorParam(jobInfo.getExecutorParam());
jobLog.setExecutorShardingParam(shardingParam);
jobLog.setExecutorFailRetryCount(finalFailRetryCount);
//jobLog.setTriggerTime();
jobLog.setTriggerCode(triggerResult.getCode());
jobLog.setTriggerMsg(triggerMsgSb.toString());
XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateTriggerInfo(jobLog);
logger.debug(">>>>>>>>>>> xxl-job trigger end, jobId:", jobLog.getId());
由于某些场景,某个任务必须保证只能执行一次,或者宁愿不执行也不允许重复执行,比方说发放优惠券,可以少发或者不发,但是不能多发,这种情况下,让任务固定到一个执行器服务IP上执行,所以在原来的基础上增加了一个策略,固定IP策略
组装好参数,选择了执行任务地址,进入runExecutor方法
创建好了RPC的客户端对象,在创建对象过程中使用了NettyHttp协议,HESSIAN序列化,就可以发起RPC请求了
public static ExecutorBiz getExecutorBiz(String address) throws Exception
// valid
if (address==null || address.trim().length()==0)
return null;
// load-cache 是否在缓存中
address = address.trim();
ExecutorBiz executorBiz = executorBizRepository.get(address);
if (executorBiz != null)
return executorBiz;
// set-cache
// 创建ExecutorBiz的代理对象,重点在这个里面。
executorBiz = (ExecutorBiz) new XxlRpcReferenceBean(
NetEnum.NETTY_HTTP, //nettyHttp
Serializer.SerializeEnum.HESSIAN.getSerializer(),//序列化
CallType.SYNC,//同步
LoadBalance.ROUND,
ExecutorBiz.class,
null,
5000,
address,
XxlJobAdminConfig.getAdminConfig().getAccessToken(),
null,
null).getObject();
executorBizRepository.put(address, executorBiz); //对象放入缓存
return executorBiz;
然后发起RPC请求:executorBiz.run
public ReturnT<String> run(TriggerParam triggerParam)
// load old:jobHandler + jobThread
// 通过参数中的JobID, 从本地线程库里面获取线程 ( 第一次进来是没有线程的,jobThread为空 ,
// 本地线程库,本质上就是一个ConcurrentHashMap<Integer, JobThread>
JobThread jobThread = XxlJobExecutor.loadJobThread(triggerParam.getJobId());
IJobHandler jobHandler = jobThread!=null?jobThread.getHandler():null;
String removeOldReason = null;
// valid:jobHandler + jobThread
//运行模式,这里看一下java模式就可以了
GlueTypeEnum glueTypeEnum = GlueTypeEnum.match(triggerParam.getGlueType());
if (GlueTypeEnum.BEAN == glueTypeEnum)
// new jobhandler
// 通过参数中的handlerName从本地内存中获取handler实例
// (在执行器启动的时候,是把所有带有@JobHandler的实例通过name放入到一个map中的 )
IJobHandler newJobHandler = XxlJobExecutor.loadJobHandler(triggerParam.getExecutorHandler());
// valid old jobThread
// 如果修改了任务的handler, name此处会默认把以前老的handler清空,后面会以最新的newJobHandler为准
if (jobThread!=null && jobHandler != newJobHandler)
// change handler, need kill old thread
removeOldReason = "change jobhandler or glue type, and terminate the old job thread.";
jobThread = null;
jobHandler = null;
// valid handler
if (jobHandler == null)
jobHandler = newJobHandler;
if (jobHandler == null)
return new ReturnT<String>(ReturnT.FAIL_CODE, "job handler [" + triggerParam.getExecutorHandler() + "] not found.");
else if (GlueTypeEnum.GLUE_GROOVY == glueTypeEnum)
// valid old jobThread
if (jobThread != null &&
!(jobThread.getHandler() instanceof GlueJobHandler
&& ((GlueJobHandler) jobThread.getHandler()).getGlueUpdatetime()==triggerParam.getGlueUpdatetime() ))
// change handler or gluesource updated, need kill old thread
removeOldReason = "change job source or glue type, and terminate the old job thread.";
jobThread = null;
jobHandler = null;
// valid handler
if (jobHandler == null)
try
IJobHandler originJobHandler = GlueFactory.getInstance().loadNewInstance(triggerParam.getGlueSource());
jobHandler = new GlueJobHandler(originJobHandler, triggerParam.getGlueUpdatetime());
catch (Exception e)
logger.error(e.getMessage(), e);
return new ReturnT<String>(ReturnT.FAIL_CODE, e.getMessage());
else if (glueTypeEnum!=null && glueTypeEnum.isScript())
// valid old jobThread
if (jobThread != null &&
!(jobThread.getHandler() instanceof ScriptJobHandler
&& ((ScriptJobHandler) jobThread.getHandler()).getGlueUpdatetime()==triggerParam.getGlueUpdatetime() ))
// change script or gluesource updated, need kill old thread
removeOldReason = "change job source or glue type, and terminate the old job thread.";
jobThread = null;
jobHandler = null;
// valid handler
if (jobHandler == null)
jobHandler = new ScriptJobHandler(triggerParam.getJobId(), triggerParam.getGlueUpdatetime(), triggerParam.getGlueSource(), GlueTypeEnum.match(triggerParam.getGlueType()));
else
return new ReturnT<String>(ReturnT.FAIL_CODE, "glueType[" + triggerParam.getGlueType() + "] is not valid.");
// executor block strategy
if (jobThread != null)
ExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(triggerParam.getExecutorBlockStrategy(), null);
if (ExecutorBlockStrategyEnum.DISCARD_LATER == blockStrategy)
// discard when running
if (jobThread.isRunningOrHasQueue())
return new ReturnT<String>(ReturnT.FAIL_CODE, "block strategy effect:"+ExecutorBlockStrategyEnum.DISCARD_LATER.getTitle());
else if (ExecutorBlockStrategyEnum.COVER_EARLY == blockStrategy)
// kill running jobThread
if (jobThread.isRunningOrHasQueue())
removeOldReason = "block strategy effect:" + ExecutorBlockStrategyEnum.COVER_EARLY.getTitle();
jobThread = null;
else
// just queue trigger
// replace thread (new or exists invalid)
// 如果jobThread为空,那么这个时候,就要注册一个线程到本地线程库里面去。
// 然后启动这个线程,线程会轮训任务队列开始执行,可以查看JobThread.run方法
if (jobThread == null)
jobThread = XxlJobExecutor.registJobThread(triggerParam.getJobId(), jobHandler, removeOldReason, triggerParam.getJobEmptyLoopNum());
// push data to queue
// 任务线程已经存在了,将任务参数放入任务队列,每个任务线程有一个任务队列,任务线程去轮询这个任务
ReturnT<String> pushResult = jobThread.pushTriggerQueue(triggerParam);
return pushResult;
至此调度中心,基本处理完成。然后看执行器的操作流程。
执行器的流程
1、执行器部署启动会执行XxlJobSpringExecutor.start方法:
然后执行XxlJobExecutor。start方法:
public void start() throws Exception
// init logpath 初始化本地日志路径
XxlJobFileAppender.initLogPath(logPath);
// init invoker, admin-client
// 初始化调度中心的地址列表,创建好adminBiz实例,调度中心客户端
initAdminBizList(adminAddresses, accessToken);
// init JobLogFileCleanThread
JobLogFileCleanThread.getInstance().start(logRetentionDays);
// init TriggerCallbackThread
TriggerCallbackThread.getInstance().start();
// init executor-server
port = port>0?port: NetUtil.findAvailablePort(9999);
ip = (ip!=null&&ip.trim().length()>0)?ip: IpUtil.getIp();
//启动执行器服务,默认开端口9999
initRpcProvider(ip, port, appName, accessToken);
RPC服务启动了,就可以正常提供执行器服务了。
任务线程如何处理任务。
JobThread就是一个线程
重点看一下线程的run方法,会循环获取队列里的任务,每次获取超时时间是3秒,默认30次,如果没有任务就停止线程,这里的30次已经进行了定制化修改。
public void run()
// init
try
//初始化任务对象
handler.init();
catch (Throwable e)
logger.error(e.getMessage(), e);
// execute
while(!toStop)
running = false;
idleTimes++;//累加轮询次数
TriggerParam triggerParam = null;
ReturnT<String> executeResult = null;
try
//获取队列里的任务,设置3秒钟超时
triggerParam = triggerQueue.poll(3L, TimeUnit.SECONDS);
if (triggerParam!=null)
running = true;
idleTimes = 0;
triggerLogIdSet.remove(triggerParam.getLogId());
// log filename, like "logPath/yyyy-MM-dd/9999.log"
String logFileName = XxlJobFileAppender.makeLogFileName(new Date(triggerParam.getLogDateTim()), triggerParam.getLogId());
XxlJobFileAppender.contextHolder.set(logFileName);
ShardingUtil.setShardingVo(new ShardingUtil.ShardingVO(triggerParam.getBroadcastIndex(), triggerParam.getBroadcastTotal()));
// execute
XxlJobLogger.log("<br>----------- xxl-job job execute start -----------<br>----------- Param:" + triggerParam.getExecutorParams());
//带超时的任务执行
if (triggerParam.getExecutorTimeout() > 0)
// limit timeout
Thread futureThread = null;
try
final TriggerParam triggerParamTmp = triggerParam;
FutureTask<ReturnT<String>> futureTask = new FutureTask<ReturnT<String>>(new Callable<ReturnT<String>>()
@Override
public ReturnT<String> call() throws Exception
//执行任务
return handler.execute(triggerParamTmp.getExecutorParams());
);
//启动线程执行任务
futureThread = new Thread(futureTask);
futureThread.start();
//获取执行任务结果
executeResult = futureTask.get(triggerParam.getExecutorTimeout(), TimeUnit.SECONDS);
catch (TimeoutException e)
XxlJobLogger.log("<br>----------- xxl-job job execute timeout");
XxlJobLogger.log(e);
executeResult = new ReturnT<String>(IJobHandler.FAIL_TIMEOUT.getCode(), "job execute timeout ");
finally
futureThread.interrupt();
else
// just execute
//仅仅执行任务
executeResult = handler.execute(triggerParam.getExecutorParams());
if (executeResult == null)
executeResult = IJobHandler.FAIL;
else
executeResult.setMsg(
(executeResult!=null&&executeResult.getMsg()!=null&&executeResult.getMsg().length()>50000)
?executeResult.getMsg().substring(0, 50000).concat("...")
:executeResult.getMsg());
executeResult.setContent(null); // limit obj size
XxlJobLogger.log("<br>----------- xxl-job job execute end(finish) -----------<br>----------- ReturnT:" + executeResult);
else
//超过一定次数,清空线程,并设置JobThread的stop停止标识位,终止轮询。也就是3*jobEmptyLoopNum秒空轮询
XxlJobLogger.log("<br>----------- xxl-job loop num diy set Param:" + jobEmptyLoopNum);
if (idleTimes > jobEmptyLoopNum)
XxlJobExecutor.removeJobThread(jobId, "excutor idel times over limit.");
catch (Throwable e)
if (toStop)
XxlJobLogger.log("<br>----------- JobThread toStop, stopReason:" + stopReason);
StringWriter stringWriter = new StringWriter();
e.printStackTrace(new PrintWriter(stringWriter));
String errorMsg = stringWriter.toString();
executeResult = new ReturnT<String>(ReturnT.FAIL_CODE, errorMsg);
XxlJobLogger.log("<br>----------- JobThread Exception:" + errorMsg + "<br>----------- xxl-job job execute end(error) -----------");
finally
if(triggerParam != null)
// callback handler info
if (!toStop)
// commonm
TriggerCallbackThread.pushCallBack(new HandleCallbackParam(triggerParam.getLogId(), triggerParam.getLogDateTim(), executeResult));
else
// is killed
ReturnT<String> stopResult = new ReturnT<String>(ReturnT.FAIL_CODE, stopReason + " [job running,killed]");
TriggerCallbackThread.pushCallBack(new HandleCallbackParam(triggerParam.getLogId(), triggerParam.getLogDateTim(), stopResult));
// callback trigger request in queue
while(triggerQueue !=null && triggerQueue.size()>0)
TriggerParam triggerParam = triggerQueue.poll();
if (triggerParam!=null)
// is killed
ReturnT<String> stopResult = new ReturnT<String>(ReturnT.FAIL_CODE, stopReason + " [job not executed, in the job queue, killed.]");
TriggerCallbackThread.pushCallBack(new HandleCallbackParam(triggerParam.getLogId(), triggerParam.getLogDateTim(), stopResult));
// destroy
try
handler.destroy();
catch (Throwable e)
logger.error(e.getMessage(), e);
logger.info(">>>>>>>>>>> xxl-job JobThread stoped, hashCode:", Thread.currentThread());
以上是关于XXL-JOB分析(一任务执行的过程源码分析)的主要内容,如果未能解决你的问题,请参考以下文章
xxl-job后继任务导致前一个任务执行一半,源码分析xxljob
XXL-JOB分布式任务调度框架-源码分析-任务调度执行流程及实现原理