xxx-job调度中心源码分析

Posted sharedCode

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了xxx-job调度中心源码分析相关的知识,希望对你有一定的参考价值。

调度中心启动源码分析

xxl-job 主要分为两部分,调度中心,和执行器,这两块,这里闲分析一下调度中心的源码

首先从spring的配置看起, 从以下配置可以看出,xxl内部使用的是quartz

spring配置

 1<bean id="quartzScheduler" lazy-init="false" class="org.springframework.scheduling.quartz.SchedulerFactoryBean">
2   <property name="dataSource" ref="dataSource" />
3   <property name="autoStartup" value="true" />         <!--自动启动 -->
4   <property name="startupDelay" value="20" />             <!--延时启动,应用启动成功后在启动 -->
5   <property name="overwriteExistingJobs" value="true" /> <!--覆盖DB中JOB:true、以数据库中已经存在的为准:false -->
6   <property name="applicationContextSchedulerContextKey"  value="applicationContextKey" />
7   <property name="configLocation" value="classpath:quartz.properties"/>
8</bean>
9<!-- 这个调度中心,在启动的时候,会做很多初始化的工作 ,比如:执行器信息,注册机器列表等信息 -->
10<bean id="xxlJobDynamicScheduler" class="com.xxl.job.admin.core.schedule.XxlJobDynamicScheduler" init-method="init" destroy-method="destroy" >
11    <!-- 配置调度中心的名称 -->
12   <property name="scheduler" ref="quartzScheduler"/>
13    <!-- 用于调度中心和执行器之间通信的时候做数据加密 -->
14   <property name="accessToken" value="${xxl.job.accessToken}" />
15</bean>

com.xxl.job.admin.core.schedule.XxlJobDynamicScheduler  在启动的时候会做如下工作:

 1public void init() throws Exception {
2    // 启动自动注册线程, 获取类型为自动注册的执行器信息,完成机器的自动注册与发现
3    JobRegistryMonitorHelper.getInstance().start();
4
5    // 启动失败日志监控线程
6    JobFailMonitorHelper.getInstance().start();
7
8    // admin-server(spring-mvc)
9    NetComServerFactory.putService(AdminBiz.class, XxlJobDynamicScheduler.adminBiz);
10    NetComServerFactory.setAccessToken(accessToken);
11
12    // valid
13    Assert.notNull(scheduler, "quartz scheduler is null");
14    logger.info(">>>>>>>>> init xxl-job admin success.");
15}

JobRegistryMonitorHelper.getInstance().start()   详细代码如下:

 1JobRegistryMonitorHelper
2public void start()
{
3   //创建一个线程
4   registryThread = new Thread(new Runnable() {
5      @Override
6      public void run() {
7         // 当toStop 为false时进入该循环。
8         while (!toStop) {
9            try {
10               // 获取类型为自动注册的执行器地址列表
11               List<XxlJobGroup> groupList = XxlJobDynamicScheduler.xxlJobGroupDao.findByAddressType(0);
12               if (CollectionUtils.isNotEmpty(groupList)) {
13
14                  // 删除 90秒之内没有更新信息的注册机器, 90秒没有心跳信息返回,代表机器已经出现问题,故移除
15                  XxlJobDynamicScheduler.xxlJobRegistryDao.removeDead(RegistryConfig.DEAD_TIMEOUT);
16
17                  // fresh online address (admin/executor)
18                  HashMap<String, List<String>> appAddressMap = new HashMap<String, List<String>>();
19                  // 查询在90秒之内有过更新的机器列表
20                  List<XxlJobRegistry> list = XxlJobDynamicScheduler.xxlJobRegistryDao.findAll(RegistryConfig.DEAD_TIMEOUT);
21                  if (list != null) {
22                     //循环注册机器列表,  根据执行器不同,将这些机器列表区分拿出来
23                     for (XxlJobRegistry item: list) {
24                        // 判断该机器注册信息RegistryGroup ,RegistType
25                        //是否是EXECUTOR , EXECUTOR 代表该机器是注册到执行器上面的
26                        // RegistType  分为两种, ADMIN 和EXECUTOR
27
28
29                 if(RegistryConfig.RegistType.EXECUTOR.name().equals(item.getRegistryGroup())) {
30                           // 获取注册的执行器 KEY  (也就是执行器)
31                           String appName = item.getRegistryKey();
32                           List<String> registryList = appAddressMap.get(appName);
33                           if (registryList == null) {
34                              registryList = new ArrayList<String>();
35                           }
36
37                           if (!registryList.contains(item.getRegistryValue())) {
38                              registryList.add(item.getRegistryValue());
39                           }
40                           // 收集 机器信息,根据执行器做区分
41                           appAddressMap.put(appName, registryList);
42                        }
43                     }
44                  }
45
46                  //  遍历执行器列表
47                  for (XxlJobGroup group: groupList) {
48                     // 通过执行器的APP_NAME  拿出他下面的集群机器地址
49                     List<String> registryList = appAddressMap.get(group.getAppName());
50                     String addressListStr = null;
51                     if (CollectionUtils.isNotEmpty(registryList)) {
52                        Collections.sort(registryList);
53                        // 转为为String, 通过逗号分隔
54                        addressListStr = StringUtils.join(registryList, ",");
55                     }
56                     group.setAddressList(addressListStr);
57                     // 将 这个执行器的 集群机器地址列表,写入到数据库
58                     XxlJobDynamicScheduler.xxlJobGroupDao.update(group);
59                  }
60               }
61            } catch (Exception e) {
62               logger.error("job registry instance error:{}", e);
63            }
64            try {
65               TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT);
66            } catch (InterruptedException e) {
67               logger.error("job registry instance error:{}", e);
68            }
69         }
70      }
71   });
72   registryThread.setDaemon(true);
73   //启动线程
74   registryThread.start();
75}

JobFailMonitorHelper.getInstance().start(); 详细代码如下:

 1public void start(){
2   // 启动线程
3 monitorThread = new Thread(new Runnable() {
4 @Override
5 public void run() {
6         // monitor
7 while (!toStop) {
8            try {
9               List<Integer> jobLogIdList = new ArrayList<Integer>();
10               // 从队列中拿出所有可用的 jobLogIds
11                int drainToNum = JobFailMonitorHelper.instance.queue.drainTo(jobLogIdList);
12               if (CollectionUtils.isNotEmpty(jobLogIdList)) {
13                  for (Integer jobLogId : jobLogIdList) {
14                     if (jobLogId==null || jobLogId==0) {
15                        continue;
16                     }
17                     //从数据库跟以前有日志信息
18                    XxlJobLog log = XxlJobDynamicScheduler.xxlJobLogDao.load(jobLogId);
19                     if (log == null) {
20                        continue;
21                     }
22                     //任务触发成功, 但是JobHandle 还没有返回结果
23                    if (IJobHandler.SUCCESS.getCode() == log.getTriggerCode() && log.getHandleCode() == 0) {
24                        //将 JobLogId 放入队列 , 继续监控
25                         JobFailMonitorHelper.monitor(jobLogId);
26                        logger.info(">>>>>>>>>>> job monitor, job running, JobLogId:{}", jobLogId);
27                     } else if (IJobHandler.SUCCESS.getCode() == log.getHandleCode()) {
28                        // job success, pass
29                        logger.info(">>>>>>>>>>> job monitor, job success, JobLogId:{}", jobLogId);
30                     } else if (IJobHandler.FAIL.getCode() == log.getTriggerCode()
31                           || IJobHandler.FAIL.getCode() == log.getHandleCode()
32                           || IJobHandler.FAIL_RETRY.getCode() == log.getHandleCode() ) {
33                        // 任务执行失败, 执行发送邮件等预警措施
34                        failAlarm(log);
35                        logger.info(">>>>>>>>>>> job monitor, job fail, JobLogId:{}", jobLogId);
36                     } else {
37                        JobFailMonitorHelper.monitor(jobLogId);
38                        logger.info(">>>>>>>>>>> job monitor, job status unknown, JobLogId:{}", jobLogId);
39                     }
40                  }
41               }
42               // 停顿一下
43                TimeUnit.SECONDS.sleep(10);
44            } catch (Exception e) {
45               logger.error("job monitor error:{}", e);
46            }
47         }
48
49   });
50   monitorThread.setDaemon(true);
51   monitorThread.start();
52}

以上 是xxl-job 在启动的时候做的操作,  主要是启动两个线程,

用来监控自动注册上来的机器,达到自动注册的目的监控任务的执行状态, 如若失败,则发送邮件预警

xxl-job 是基于quartz 进行的二次开发,在系统启动的时候,quartz框架会自动去数据库读取相关的配置信息,载入相关定时器信息

RemotehttpJobBean 触发任务源码分析

xxl-job 所有的任务触发最终都是通过这个类来执行  , 该类继承关系如下:

RemoteHttpJobBean > QuartzJobBean > Job

当quartz监听到有任务需要触发是,会调用 JobRunShell 的run方法, 在该类的run方法中,会调用当前任务的JOB_CLASS 的excute方法,

调用链最终会调用到remoteHttpJobBean 的 executeInternal()

 1@Override
2protected void executeInternal(JobExecutionContext context)
3      throws JobExecutionException
{
4
5   // load jobId
6   JobKey jobKey = context.getTrigger().getJobKey();
7   Integer jobId = Integer.valueOf(jobKey.getName());
8
9   // trigger
10   XxlJobTrigger.trigger(jobId); // 详细的代码分析往下看
11}
12public static void trigger(int jobId) {
13
14    // 通过JobId从数据库中查询该任务的具体信息
15    XxlJobInfo jobInfo = XxlJobDynamicScheduler.xxlJobInfoDao.loadById(jobId);              // job info
16    if (jobInfo == null) {
17        logger.warn(">>>>>>>>>>>> trigger fail, jobId invalid,jobId={}", jobId);
18        return;
19    }
20    // 获取该类型的执行器信息
21    XxlJobGroup group = XxlJobDynamicScheduler.xxlJobGroupDao.load(jobInfo.getJobGroup());  // group info
22
23    // 匹配运行模式
24    ExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(jobInfo.getExecutorBlockStrategy(), ExecutorBlockStrategyEnum.SERIAL_EXECUTION);  // block strategy
25    // 匹配失败后的处理模式
26    ExecutorFailStrategyEnum failStrategy = ExecutorFailStrategyEnum.match(jobInfo.getExecutorFailStrategy(), ExecutorFailStrategyEnum.FAIL_ALARM);    // fail strategy
27    //  获取路由策略
28    ExecutorRouteStrategyEnum executorRouteStrategyEnum = ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(), null);    // route strategy
29    // 获取该执行器的集群机器列表
30    ArrayList<String> addressList = (ArrayList<String>) group.getRegistryList();
31
32    // 判断路由策略  是否为  分片广播模式
33    if (ExecutorRouteStrategyEnum.SHARDING_BROADCAST == executorRouteStrategyEnum && CollectionUtils.isNotEmpty(addressList)) {
34        for (int i = 0; i < addressList.size(); i++) {
35            String address = addressList.get(i);
36            //定义日志信息
37            XxlJobLog jobLog = new XxlJobLog();
38            // .....省略
39            ReturnT<String> triggerResult = new ReturnT<String>(null);
40
41            if (triggerResult.getCode() == ReturnT.SUCCESS_CODE) {
42                // 4.1、trigger-param
43                TriggerParam triggerParam = new TriggerParam();
44                triggerParam.setJobId(jobInfo.getId());
45                triggerParam.setBroadcastIndex(i); // 设置分片标记
46                triggerParam.setBroadcastIndex(addressList.size());// 设置分片总数
47                // ......省略组装参数的过程
48
49                // 根据参数以及 机器地址,向执行器发送执行信息 , 此处将会详细讲解runExecutor 这个方法
50                triggerResult = runExecutor(triggerParam, address);
51            }
52            // 将日志ID,放入队列,便于日志监控线程来监控任务的执行状态
53            JobFailMonitorHelper.monitor(jobLog.getId());
54            logger.debug(">>>>>>>>>>> xxl-job trigger end, jobId:{}", jobLog.getId());
55
56        }
57    } else {
58        // 出分片模式外,其他的路由策略均走这里
59        //定义日志信息
60        XxlJobLog jobLog = new XxlJobLog();
61        jobLog.setJobGroup(jobInfo.getJobGroup());
62        // .....省略
63        ReturnT<String> triggerResult = new ReturnT<String>(null);
64        if (triggerResult.getCode() == ReturnT.SUCCESS_CODE) {
65            // 4.1、trigger-param
66            TriggerParam triggerParam = new TriggerParam();
67            triggerParam.setJobId(jobInfo.getId());
68            triggerParam.setExecutorHandler(jobInfo.getExecutorHandler());
69            triggerParam.setBroadcastIndex(0); // 默认分片标记为0
70            triggerParam.setBroadcastTotal(1);  // 默认分片总数为1
71            // .... 省略组装参数的过程
72            // 此处使用了策略模式, 根据不同的策略 使用不同的实现类,此处不再详细说明
73            triggerResult = executorRouteStrategyEnum.getRouter().routeRun(triggerParam, addressList);
74        }
75        JobFailMonitorHelper.monitor(jobLog.getId());
76        logger.debug(">>>>>>>>>>> xxl-job trigger end, jobId:{}", jobLog.getId());
77    }
78}

继续往下面看, 着重看 runExecutor 这个方法 , 向执行器发送指令都是从这个方法中执行的

 1public static ReturnT<String> runExecutor(TriggerParam triggerParam, String address){
2    ReturnT<String> runResult = null;
3    try {
4        //创建一个ExcutorBiz 的对象,重点在这个方法里面
5        ExecutorBiz executorBiz = XxlJobDynamicScheduler.getExecutorBiz(address);
6        // 这个run 方法不会最终执行,仅仅只是为了触发 proxy object 的 invoke方法,同时将目标的类型传送给服务端, 因为在代理对象的invoke的方法里面没有执行目标对象的方法
7        runResult = executorBiz.run(triggerParam);
8    } catch (Exception e) {
9        logger.error(">>>>>>>>>>> xxl-job trigger error, please check if the executor[{}] is running.", address, e);
10        runResult = new ReturnT<String>(ReturnT.FAIL_CODE, ""+e );
11    }
12
13    StringBuffer runResultSB = new StringBuffer(I18nUtil.getString("jobconf_trigger_run") + ":");
14    runResultSB.append("<br>address:").append(address);
15    runResultSB.append("<br>code:").append(runResult.getCode());
16    runResultSB.append("<br>msg:").append(runResult.getMsg());
17
18    runResult.setMsg(runResultSB.toString());
19    runResult.setContent(address);
20    return runResult;
21}
 1public static ExecutorBiz getExecutorBiz(String address) throws Exception {
2    // valid
3    if (address==null || address.trim().length()==0) {
4        return null;
5    }
6
7    // load-cache
8    address = address.trim();
9    //查看缓存里面是否存在,如果存在则不需要再去创建executorBiz了
10    ExecutorBiz executorBiz = executorBizRepository.get(address);
11    if (executorBiz != null) {
12        return executorBiz;
13    }
14
15    // 创建ExecutorBiz的代理对象,重点在这个里面。
16    executorBiz = (ExecutorBiz) new NetComClientProxy(ExecutorBiz.class, address, accessToken).getObject();
17    executorBizRepository.put(address, executorBiz);
18    return executorBiz;
19}

NetComClientProxy 这是一个factoryBean , 所以我们主要看他的getObject 方法就知道怎么创建对象并返回的。
下面这个代理对象的invoke里面并没有执行目标类的方法,而是将目标类的信息包装好,发送给执行器那一端来做。

 1public Object getObject() throws Exception {
2   return Proxy.newProxyInstance(Thread.currentThread()
3         .getContextClassLoader(), new Class[] { iface },
4         new InvocationHandler() {
5            @Override
6            public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
7
8               if (Object.class.getName().equals(method.getDeclaringClass().getName())) {
9                  logger.error(">>>>>>>>>>> xxl-rpc proxy class-method not support [{}.{}]", method.getDeclaringClass().getName(), method.getName());
10                  throw new RuntimeException("xxl-rpc proxy class-method not support");
11               }
12
13               // 重点来了,创建request信息, 发送HTTP请求到执行器服务器上。
14               RpcRequest request = new RpcRequest();
15                    request.setServerAddress(serverAddress); // 服务器地址
16                    request.setCreateMillisTime(System.currentTimeMillis()); // 创建时间, 用于判断请求是否超时
17                    request.setAccessToken(accessToken);  // 数据校验
18                    request.setClassName(method.getDeclaringClass().getName()); // 将目标类的class名称传给执行器,让那边来创建对象,并执行逻辑代码
19                    request.setMethodName(method.getName());   // 方法名称为run
20                    request.setParameterTypes(method.getParameterTypes());  // 参数类型
21                    request.setParameters(args); // 参数
22                    RpcResponse response = client.send(request); // 发送HTTP请求
23               if (response == null) {
24                  logger.error(">>>>>>>>>>> xxl-rpc netty response not found.");
25                  throw new Exception(">>>>>>>>>>> xxl-rpc netty response not found.");
26                }
27                    if (response.isError()) {
28                        throw new RuntimeException(response.getError());
29                    } else {
30                        // 返回请求结果
31                        return response.getResult();
32                    }
33
34            }
35         });
36}

以上就是调度中心,触发任务之后执行的核心代码 , 下一小结讲会分析一下执行器那一块的代码,看下执行器收到请求之后会执行那些逻辑。


以上是关于xxx-job调度中心源码分析的主要内容,如果未能解决你的问题,请参考以下文章

XXL-JOB分布式任务调度框架-源码分析-任务调度执行流程及实现原理

XXL-JOB分析(一任务执行的过程源码分析)

Android 插件化VirtualApp 源码分析 ( 目前的 API 现状 | 安装应用源码分析 | 安装按钮执行的操作 | 返回到 HomeActivity 执行的操作 )(代码片段

Android 逆向整体加固脱壳 ( DEX 优化流程分析 | DexPrepare.cpp 中 dvmOptimizeDexFile() 方法分析 | /bin/dexopt 源码分析 )(代码片段

Spark源码分析之六:Task调度

Android 事件分发事件分发源码分析 ( Activity 中各层级的事件传递 | Activity -> PhoneWindow -> DecorView -> ViewGroup )(代码片段