xxx-job调度中心源码分析
Posted sharedCode
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了xxx-job调度中心源码分析相关的知识,希望对你有一定的参考价值。
调度中心启动源码分析
xxl-job 主要分为两部分,调度中心,和执行器,这两块,这里闲分析一下调度中心的源码
首先从spring的配置看起, 从以下配置可以看出,xxl内部使用的是quartz
spring配置
1<bean id="quartzScheduler" lazy-init="false" class="org.springframework.scheduling.quartz.SchedulerFactoryBean">
2 <property name="dataSource" ref="dataSource" />
3 <property name="autoStartup" value="true" /> <!--自动启动 -->
4 <property name="startupDelay" value="20" /> <!--延时启动,应用启动成功后在启动 -->
5 <property name="overwriteExistingJobs" value="true" /> <!--覆盖DB中JOB:true、以数据库中已经存在的为准:false -->
6 <property name="applicationContextSchedulerContextKey" value="applicationContextKey" />
7 <property name="configLocation" value="classpath:quartz.properties"/>
8</bean>
9<!-- 这个调度中心,在启动的时候,会做很多初始化的工作 ,比如:执行器信息,注册机器列表等信息 -->
10<bean id="xxlJobDynamicScheduler" class="com.xxl.job.admin.core.schedule.XxlJobDynamicScheduler" init-method="init" destroy-method="destroy" >
11 <!-- 配置调度中心的名称 -->
12 <property name="scheduler" ref="quartzScheduler"/>
13 <!-- 用于调度中心和执行器之间通信的时候做数据加密 -->
14 <property name="accessToken" value="${xxl.job.accessToken}" />
15</bean>
com.xxl.job.admin.core.schedule.XxlJobDynamicScheduler 在启动的时候会做如下工作:
1public void init() throws Exception {
2 // 启动自动注册线程, 获取类型为自动注册的执行器信息,完成机器的自动注册与发现
3 JobRegistryMonitorHelper.getInstance().start();
4
5 // 启动失败日志监控线程
6 JobFailMonitorHelper.getInstance().start();
7
8 // admin-server(spring-mvc)
9 NetComServerFactory.putService(AdminBiz.class, XxlJobDynamicScheduler.adminBiz);
10 NetComServerFactory.setAccessToken(accessToken);
11
12 // valid
13 Assert.notNull(scheduler, "quartz scheduler is null");
14 logger.info(">>>>>>>>> init xxl-job admin success.");
15}
JobRegistryMonitorHelper.getInstance().start() 详细代码如下:
1JobRegistryMonitorHelper
2public void start(){
3 //创建一个线程
4 registryThread = new Thread(new Runnable() {
5 @Override
6 public void run() {
7 // 当toStop 为false时进入该循环。
8 while (!toStop) {
9 try {
10 // 获取类型为自动注册的执行器地址列表
11 List<XxlJobGroup> groupList = XxlJobDynamicScheduler.xxlJobGroupDao.findByAddressType(0);
12 if (CollectionUtils.isNotEmpty(groupList)) {
13
14 // 删除 90秒之内没有更新信息的注册机器, 90秒没有心跳信息返回,代表机器已经出现问题,故移除
15 XxlJobDynamicScheduler.xxlJobRegistryDao.removeDead(RegistryConfig.DEAD_TIMEOUT);
16
17 // fresh online address (admin/executor)
18 HashMap<String, List<String>> appAddressMap = new HashMap<String, List<String>>();
19 // 查询在90秒之内有过更新的机器列表
20 List<XxlJobRegistry> list = XxlJobDynamicScheduler.xxlJobRegistryDao.findAll(RegistryConfig.DEAD_TIMEOUT);
21 if (list != null) {
22 //循环注册机器列表, 根据执行器不同,将这些机器列表区分拿出来
23 for (XxlJobRegistry item: list) {
24 // 判断该机器注册信息RegistryGroup ,RegistType
25 //是否是EXECUTOR , EXECUTOR 代表该机器是注册到执行器上面的
26 // RegistType 分为两种, ADMIN 和EXECUTOR
27
28
29 if(RegistryConfig.RegistType.EXECUTOR.name().equals(item.getRegistryGroup())) {
30 // 获取注册的执行器 KEY (也就是执行器)
31 String appName = item.getRegistryKey();
32 List<String> registryList = appAddressMap.get(appName);
33 if (registryList == null) {
34 registryList = new ArrayList<String>();
35 }
36
37 if (!registryList.contains(item.getRegistryValue())) {
38 registryList.add(item.getRegistryValue());
39 }
40 // 收集 机器信息,根据执行器做区分
41 appAddressMap.put(appName, registryList);
42 }
43 }
44 }
45
46 // 遍历执行器列表
47 for (XxlJobGroup group: groupList) {
48 // 通过执行器的APP_NAME 拿出他下面的集群机器地址
49 List<String> registryList = appAddressMap.get(group.getAppName());
50 String addressListStr = null;
51 if (CollectionUtils.isNotEmpty(registryList)) {
52 Collections.sort(registryList);
53 // 转为为String, 通过逗号分隔
54 addressListStr = StringUtils.join(registryList, ",");
55 }
56 group.setAddressList(addressListStr);
57 // 将 这个执行器的 集群机器地址列表,写入到数据库
58 XxlJobDynamicScheduler.xxlJobGroupDao.update(group);
59 }
60 }
61 } catch (Exception e) {
62 logger.error("job registry instance error:{}", e);
63 }
64 try {
65 TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT);
66 } catch (InterruptedException e) {
67 logger.error("job registry instance error:{}", e);
68 }
69 }
70 }
71 });
72 registryThread.setDaemon(true);
73 //启动线程
74 registryThread.start();
75}
JobFailMonitorHelper.getInstance().start(); 详细代码如下:
1public void start(){
2 // 启动线程
3 monitorThread = new Thread(new Runnable() {
4 @Override
5 public void run() {
6 // monitor
7 while (!toStop) {
8 try {
9 List<Integer> jobLogIdList = new ArrayList<Integer>();
10 // 从队列中拿出所有可用的 jobLogIds
11 int drainToNum = JobFailMonitorHelper.instance.queue.drainTo(jobLogIdList);
12 if (CollectionUtils.isNotEmpty(jobLogIdList)) {
13 for (Integer jobLogId : jobLogIdList) {
14 if (jobLogId==null || jobLogId==0) {
15 continue;
16 }
17 //从数据库跟以前有日志信息
18 XxlJobLog log = XxlJobDynamicScheduler.xxlJobLogDao.load(jobLogId);
19 if (log == null) {
20 continue;
21 }
22 //任务触发成功, 但是JobHandle 还没有返回结果
23 if (IJobHandler.SUCCESS.getCode() == log.getTriggerCode() && log.getHandleCode() == 0) {
24 //将 JobLogId 放入队列 , 继续监控
25 JobFailMonitorHelper.monitor(jobLogId);
26 logger.info(">>>>>>>>>>> job monitor, job running, JobLogId:{}", jobLogId);
27 } else if (IJobHandler.SUCCESS.getCode() == log.getHandleCode()) {
28 // job success, pass
29 logger.info(">>>>>>>>>>> job monitor, job success, JobLogId:{}", jobLogId);
30 } else if (IJobHandler.FAIL.getCode() == log.getTriggerCode()
31 || IJobHandler.FAIL.getCode() == log.getHandleCode()
32 || IJobHandler.FAIL_RETRY.getCode() == log.getHandleCode() ) {
33 // 任务执行失败, 执行发送邮件等预警措施
34 failAlarm(log);
35 logger.info(">>>>>>>>>>> job monitor, job fail, JobLogId:{}", jobLogId);
36 } else {
37 JobFailMonitorHelper.monitor(jobLogId);
38 logger.info(">>>>>>>>>>> job monitor, job status unknown, JobLogId:{}", jobLogId);
39 }
40 }
41 }
42 // 停顿一下
43 TimeUnit.SECONDS.sleep(10);
44 } catch (Exception e) {
45 logger.error("job monitor error:{}", e);
46 }
47 }
48
49 });
50 monitorThread.setDaemon(true);
51 monitorThread.start();
52}
以上 是xxl-job 在启动的时候做的操作, 主要是启动两个线程,
用来监控自动注册上来的机器,达到自动注册的目的监控任务的执行状态, 如若失败,则发送邮件预警
xxl-job 是基于quartz 进行的二次开发,在系统启动的时候,quartz框架会自动去数据库读取相关的配置信息,载入相关定时器信息
RemotehttpJobBean 触发任务源码分析
xxl-job 所有的任务触发最终都是通过这个类来执行 , 该类继承关系如下:
RemoteHttpJobBean > QuartzJobBean > Job
当quartz监听到有任务需要触发是,会调用 JobRunShell 的run方法, 在该类的run方法中,会调用当前任务的JOB_CLASS 的excute方法,
调用链最终会调用到remoteHttpJobBean 的 executeInternal()
1@Override
2protected void executeInternal(JobExecutionContext context)
3 throws JobExecutionException {
4
5 // load jobId
6 JobKey jobKey = context.getTrigger().getJobKey();
7 Integer jobId = Integer.valueOf(jobKey.getName());
8
9 // trigger
10 XxlJobTrigger.trigger(jobId); // 详细的代码分析往下看
11}
12public static void trigger(int jobId) {
13
14 // 通过JobId从数据库中查询该任务的具体信息
15 XxlJobInfo jobInfo = XxlJobDynamicScheduler.xxlJobInfoDao.loadById(jobId); // job info
16 if (jobInfo == null) {
17 logger.warn(">>>>>>>>>>>> trigger fail, jobId invalid,jobId={}", jobId);
18 return;
19 }
20 // 获取该类型的执行器信息
21 XxlJobGroup group = XxlJobDynamicScheduler.xxlJobGroupDao.load(jobInfo.getJobGroup()); // group info
22
23 // 匹配运行模式
24 ExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(jobInfo.getExecutorBlockStrategy(), ExecutorBlockStrategyEnum.SERIAL_EXECUTION); // block strategy
25 // 匹配失败后的处理模式
26 ExecutorFailStrategyEnum failStrategy = ExecutorFailStrategyEnum.match(jobInfo.getExecutorFailStrategy(), ExecutorFailStrategyEnum.FAIL_ALARM); // fail strategy
27 // 获取路由策略
28 ExecutorRouteStrategyEnum executorRouteStrategyEnum = ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(), null); // route strategy
29 // 获取该执行器的集群机器列表
30 ArrayList<String> addressList = (ArrayList<String>) group.getRegistryList();
31
32 // 判断路由策略 是否为 分片广播模式
33 if (ExecutorRouteStrategyEnum.SHARDING_BROADCAST == executorRouteStrategyEnum && CollectionUtils.isNotEmpty(addressList)) {
34 for (int i = 0; i < addressList.size(); i++) {
35 String address = addressList.get(i);
36 //定义日志信息
37 XxlJobLog jobLog = new XxlJobLog();
38 // .....省略
39 ReturnT<String> triggerResult = new ReturnT<String>(null);
40
41 if (triggerResult.getCode() == ReturnT.SUCCESS_CODE) {
42 // 4.1、trigger-param
43 TriggerParam triggerParam = new TriggerParam();
44 triggerParam.setJobId(jobInfo.getId());
45 triggerParam.setBroadcastIndex(i); // 设置分片标记
46 triggerParam.setBroadcastIndex(addressList.size());// 设置分片总数
47 // ......省略组装参数的过程
48
49 // 根据参数以及 机器地址,向执行器发送执行信息 , 此处将会详细讲解runExecutor 这个方法
50 triggerResult = runExecutor(triggerParam, address);
51 }
52 // 将日志ID,放入队列,便于日志监控线程来监控任务的执行状态
53 JobFailMonitorHelper.monitor(jobLog.getId());
54 logger.debug(">>>>>>>>>>> xxl-job trigger end, jobId:{}", jobLog.getId());
55
56 }
57 } else {
58 // 出分片模式外,其他的路由策略均走这里
59 //定义日志信息
60 XxlJobLog jobLog = new XxlJobLog();
61 jobLog.setJobGroup(jobInfo.getJobGroup());
62 // .....省略
63 ReturnT<String> triggerResult = new ReturnT<String>(null);
64 if (triggerResult.getCode() == ReturnT.SUCCESS_CODE) {
65 // 4.1、trigger-param
66 TriggerParam triggerParam = new TriggerParam();
67 triggerParam.setJobId(jobInfo.getId());
68 triggerParam.setExecutorHandler(jobInfo.getExecutorHandler());
69 triggerParam.setBroadcastIndex(0); // 默认分片标记为0
70 triggerParam.setBroadcastTotal(1); // 默认分片总数为1
71 // .... 省略组装参数的过程
72 // 此处使用了策略模式, 根据不同的策略 使用不同的实现类,此处不再详细说明
73 triggerResult = executorRouteStrategyEnum.getRouter().routeRun(triggerParam, addressList);
74 }
75 JobFailMonitorHelper.monitor(jobLog.getId());
76 logger.debug(">>>>>>>>>>> xxl-job trigger end, jobId:{}", jobLog.getId());
77 }
78}
继续往下面看, 着重看 runExecutor 这个方法 , 向执行器发送指令都是从这个方法中执行的
1public static ReturnT<String> runExecutor(TriggerParam triggerParam, String address){
2 ReturnT<String> runResult = null;
3 try {
4 //创建一个ExcutorBiz 的对象,重点在这个方法里面
5 ExecutorBiz executorBiz = XxlJobDynamicScheduler.getExecutorBiz(address);
6 // 这个run 方法不会最终执行,仅仅只是为了触发 proxy object 的 invoke方法,同时将目标的类型传送给服务端, 因为在代理对象的invoke的方法里面没有执行目标对象的方法
7 runResult = executorBiz.run(triggerParam);
8 } catch (Exception e) {
9 logger.error(">>>>>>>>>>> xxl-job trigger error, please check if the executor[{}] is running.", address, e);
10 runResult = new ReturnT<String>(ReturnT.FAIL_CODE, ""+e );
11 }
12
13 StringBuffer runResultSB = new StringBuffer(I18nUtil.getString("jobconf_trigger_run") + ":");
14 runResultSB.append("<br>address:").append(address);
15 runResultSB.append("<br>code:").append(runResult.getCode());
16 runResultSB.append("<br>msg:").append(runResult.getMsg());
17
18 runResult.setMsg(runResultSB.toString());
19 runResult.setContent(address);
20 return runResult;
21}
1public static ExecutorBiz getExecutorBiz(String address) throws Exception {
2 // valid
3 if (address==null || address.trim().length()==0) {
4 return null;
5 }
6
7 // load-cache
8 address = address.trim();
9 //查看缓存里面是否存在,如果存在则不需要再去创建executorBiz了
10 ExecutorBiz executorBiz = executorBizRepository.get(address);
11 if (executorBiz != null) {
12 return executorBiz;
13 }
14
15 // 创建ExecutorBiz的代理对象,重点在这个里面。
16 executorBiz = (ExecutorBiz) new NetComClientProxy(ExecutorBiz.class, address, accessToken).getObject();
17 executorBizRepository.put(address, executorBiz);
18 return executorBiz;
19}
NetComClientProxy 这是一个factoryBean , 所以我们主要看他的getObject 方法就知道怎么创建对象并返回的。
下面这个代理对象的invoke里面并没有执行目标类的方法,而是将目标类的信息包装好,发送给执行器那一端来做。
1public Object getObject() throws Exception {
2 return Proxy.newProxyInstance(Thread.currentThread()
3 .getContextClassLoader(), new Class[] { iface },
4 new InvocationHandler() {
5 @Override
6 public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
7
8 if (Object.class.getName().equals(method.getDeclaringClass().getName())) {
9 logger.error(">>>>>>>>>>> xxl-rpc proxy class-method not support [{}.{}]", method.getDeclaringClass().getName(), method.getName());
10 throw new RuntimeException("xxl-rpc proxy class-method not support");
11 }
12
13 // 重点来了,创建request信息, 发送HTTP请求到执行器服务器上。
14 RpcRequest request = new RpcRequest();
15 request.setServerAddress(serverAddress); // 服务器地址
16 request.setCreateMillisTime(System.currentTimeMillis()); // 创建时间, 用于判断请求是否超时
17 request.setAccessToken(accessToken); // 数据校验
18 request.setClassName(method.getDeclaringClass().getName()); // 将目标类的class名称传给执行器,让那边来创建对象,并执行逻辑代码
19 request.setMethodName(method.getName()); // 方法名称为run
20 request.setParameterTypes(method.getParameterTypes()); // 参数类型
21 request.setParameters(args); // 参数
22 RpcResponse response = client.send(request); // 发送HTTP请求
23 if (response == null) {
24 logger.error(">>>>>>>>>>> xxl-rpc netty response not found.");
25 throw new Exception(">>>>>>>>>>> xxl-rpc netty response not found.");
26 }
27 if (response.isError()) {
28 throw new RuntimeException(response.getError());
29 } else {
30 // 返回请求结果
31 return response.getResult();
32 }
33
34 }
35 });
36}
以上就是调度中心,触发任务之后执行的核心代码 , 下一小结讲会分析一下执行器那一块的代码,看下执行器收到请求之后会执行那些逻辑。
以上是关于xxx-job调度中心源码分析的主要内容,如果未能解决你的问题,请参考以下文章
XXL-JOB分布式任务调度框架-源码分析-任务调度执行流程及实现原理
Android 插件化VirtualApp 源码分析 ( 目前的 API 现状 | 安装应用源码分析 | 安装按钮执行的操作 | 返回到 HomeActivity 执行的操作 )(代码片段
Android 逆向整体加固脱壳 ( DEX 优化流程分析 | DexPrepare.cpp 中 dvmOptimizeDexFile() 方法分析 | /bin/dexopt 源码分析 )(代码片段
Android 事件分发事件分发源码分析 ( Activity 中各层级的事件传递 | Activity -> PhoneWindow -> DecorView -> ViewGroup )(代码片段