源码分析XXL-JOB的执行器的注册流程

Posted xbhog

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了源码分析XXL-JOB的执行器的注册流程相关的知识,希望对你有一定的参考价值。

目的:分析xxl-job执行器的注册过程

流程:

  1. 获取执行器中所有被注解(@xxlJjob)修饰的handler
  2. 执行器注册过程
  3. 执行器中任务执行过程

版本:xxl-job 2.3.1

建议:下载xxl-job源码,按流程图debug调试,看堆栈信息并按文章内容理解执行流程

完整流程图:

查找Handler任务

部分流程图:

首先启动管理台界面(服务XxlJobAdminApplication),然后启动项目中给的执行器实例(SpringBoot);

这个方法是扫描项目中使用@xxlJob注解的所有handler方法。接着往下走

private void initJobHandlerMethodRepository(ApplicationContext applicationContext) 
    if (applicationContext == null) 
        return;
    
    //获取该项目中所有的bean,然后遍历
    String[] beanDefinitionNames = applicationContext.getBeanNamesForType(Object.class, false, true);
    for (String beanDefinitionName : beanDefinitionNames) 
        Object bean = applicationContext.getBean(beanDefinitionName);

        Map<Method, XxlJob> annotatedMethods = null;   // referred to :org.springframework.context.event.EventListenerMethodProcessor.processBean
        try 
            annotatedMethods = MethodIntrospector.selectMethods(bean.getClass(),
                    new MethodIntrospector.MetadataLookup<XxlJob>() 
                        //注意点★
                        @Override
                        public XxlJob inspect(Method method) 
                            return AnnotatedElementUtils.findMergedAnnotation(method, XxlJob.class);
                        
                    );
         catch (Throwable ex) 
            logger.error("xxl-job method-jobhandler resolve error for bean[" + beanDefinitionName + "].", ex);
        
        //没有跳过本次循环继续
        if (annotatedMethods==null || annotatedMethods.isEmpty()) 
            continue;
        
    	//获取了当前执行器中所有@xxl-job的方法,获取方法以及对应的初始化和销毁方法
        for (Map.Entry<Method, XxlJob> methodXxlJobEntry : annotatedMethods.entrySet()) 
            Method executeMethod = methodXxlJobEntry.getKey();
            XxlJob xxlJob = methodXxlJobEntry.getValue();
            // regist
            registJobHandler(xxlJob, bean, executeMethod);
        
    

Spring案例执行器中有5个handler:

XxlJobExecutor.registJobHandler()中部分源码

String name = xxlJob.value();
//make and simplify the variables since they\'ll be called several times later
Class<?> clazz = bean.getClass();
String methodName = executeMethod.getName();
if (name.trim().length() == 0) 
    throw new RuntimeException("xxl-job method-jobhandler name invalid, for[" + clazz + "#" + methodName + "] .");

if (loadJobHandler(name) != null) 
    throw new RuntimeException("xxl-job jobhandler[" + name + "] naming conflicts.");

然后进行遍历注册;开始进行名字判断:

  1. 判断bean名字是否为空
  2. 判断bean是否被注册了(存在了)

loadJobHandler校验方式会去该方法中查找:当bean注册完成后时存放到jobHandlerRepository一个map类型中;

private static ConcurrentMap<String, IJobHandler> jobHandlerRepository = new ConcurrentHashMap<String, IJobHandler>();
public static IJobHandler loadJobHandler(String name)
    return jobHandlerRepository.get(name);

executeMethod.setAccessible(true);它实现了修改对象访问权限的功能,参数为true,则表示允许调用方在使用反射时忽略Java语言的访问控制检查.

往后走会判断该注解的生命周期方法(init和destroy)

  1. 未设置生命周期,则直接开始注册
//注意MethodJobHandler,后面会用到
registJobHandler(name, new MethodJobHandler(bean, executeMethod, initMethod, destroyMethod));
//添加执行器名字及对应的hob方法信息(当前类、方法、init和destroy属性)
public static IJobHandler registJobHandler(String name, IJobHandler jobHandler)
    logger.info(">>>>>>>>>>> xxl-job register jobhandler success, name:, jobHandler:", name, jobHandler);
    return jobHandlerRepository.put(name, jobHandler);

  1. 有生命周期,设置init和destroy方法权限
if (xxlJob.init().trim().length() > 0) 
    try 
        initMethod = clazz.getDeclaredMethod(xxlJob.init());
        initMethod.setAccessible(true);
     catch (NoSuchMethodException e) 
        throw new RuntimeException("xxl-job method-jobhandler initMethod invalid, for[" + clazz + "#" + methodName + "] .");
    

if (xxlJob.destroy().trim().length() > 0) 
    try 
        destroyMethod = clazz.getDeclaredMethod(xxlJob.destroy());
        destroyMethod.setAccessible(true);
     catch (NoSuchMethodException e) 
        throw new RuntimeException("xxl-job method-jobhandler destroyMethod invalid, for[" + clazz + "#" + methodName + "] .");
    

首先检查@XxlJob注解中的init属性是否存在且不为空。如果存在,则尝试获取该类中名为init的方法,并将其设置为可访问状态,以便后续调用。

同理,代码接下来也检查了@XxlJob注解中的destroy属性是否存在且不为空,如果是,则获取该类中名为destroy的方法,并设置其为可访问状态

在这个过程中,如果某个方法不存在或者无法被访问,则会抛出NoSuchMethodException异常,并且使用throw new RuntimeException将其包装并抛出一个运行时异常。这样做的目的是为了提醒开发人员在任务处理器类中正确地设置init和destroy属性,并确保方法名称与属性值一致。

执行器的注册过程

部分流程图:

public void afterSingletonsInstantiated() 

    // init JobHandler Repository
    /*initJobHandlerRepository(applicationContext);*/

    // init JobHandler Repository (for method)
    initJobHandlerMethodRepository(applicationContext);

    // refresh GlueFactory
    GlueFactory.refreshInstance(1);

    // super start
    try 
        super.start();
     catch (Exception e) 
        throw new RuntimeException(e);
    

在扫描完执行器中所有的任务后,开始进行执行器注册XxlJobSpringExecutor中的super.start() 方法。

在初始化执行服务器启动之前,进行了四种操作,初始化日志、初始化adminBizList地址(可视化管理台地址)、初始化日志清除、初始化回调线程等。

这里需要注意的是第二步初始化地址,在初始化服务器启动的时候需要用到。

private void initEmbedServer(String address, String ip, int port, String appname, String accessToken) throws Exception 

    // fill ip port
    port = port>0?port: NetUtil.findAvailablePort(9999);
    ip = (ip!=null&&ip.trim().length()>0)?ip: IpUtil.getIp();

    // generate address
    if (address==null || address.trim().length()==0) 
        String ip_port_address = IpUtil.getIpPort(ip, port);   // registry-address:default use address to registry , otherwise use ip:port if address is null
        address = "http://ip_port/".replace("ip_port", ip_port_address);
    

    // accessToken
    if (accessToken==null || accessToken.trim().length()==0) 
        logger.warn(">>>>>>>>>>> xxl-job accessToken is empty. To ensure system security, please set the accessToken.");
    

    // start
    embedServer = new EmbedServer();
    embedServer.start(address, port, appname, accessToken);

继续到initEmbedServer,开始初始化ip地址和端口等,需要明白的是,这一步的参数获取方式其实是第一步读取**XxlJobConfig**获得的;进行ip的校验和拼接等操作,开始进行真正的注册。

创建一个嵌入式的HTTP服务器,将当前执行器信息(包含应用名称和IP地址端口等)注册到注册中心,注册方式的实现在ExecutorRegistryThread中实现。

校验名字和注册中心,如果注册中心不可用,则等待一段时间后重新尝试连接。

// registry
while (!toStop) 
    try 
        RegistryParam registryParam = new RegistryParam(RegistryConfig.RegistType.EXECUTOR.name(), appname, address);
        for (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) 
            try 
                ReturnT<String> registryResult = adminBiz.registry(registryParam);
                if (registryResult!=null && ReturnT.SUCCESS_CODE == registryResult.getCode()) 
                    registryResult = ReturnT.SUCCESS;
                    logger.debug(">>>>>>>>>>> xxl-job registry success, registryParam:, registryResult:", new Object[]registryParam, registryResult);
                    break;
                 else 
                    logger.info(">>>>>>>>>>> xxl-job registry fail, registryParam:, registryResult:", new Object[]registryParam, registryResult);
                
             catch (Exception e) 
                logger.info(">>>>>>>>>>> xxl-job registry error, registryParam:", registryParam, e);
            

        
     catch (Exception e) 
        if (!toStop) 
            logger.error(e.getMessage(), e);
        

    

    try 
        //心跳检测,默认30s
        if (!toStop) 
            TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT);
        
     catch (InterruptedException e) 
        if (!toStop) 
            logger.warn(">>>>>>>>>>> xxl-job, executor registry thread interrupted, error msg:", e.getMessage());
        
    

开启一个新线程,首先构建注册参数(包含执行器分组、执行器名字、执行器本地地址及端口号),遍历注册中心地址,开始进行执行器注册,注册方式通过发送http的post请求。

@Override
public ReturnT<String> registry(RegistryParam registryParam) 
    return XxlJobRemotingUtil.postBody(addressUrl + "api/registry", accessToken, timeout, registryParam, String.class);

debug的过程中,XxlJobRemotingUtil 执行到int statusCode = connection.getResponseCode();才会跳转到JobApiController.api中的注册地址.

// services mapping
if ("callback".equals(uri)) 
    List<HandleCallbackParam> callbackParamList = GsonTool.fromJson(data, List.class, HandleCallbackParam.class);
    return adminBiz.callback(callbackParamList);
 else if ("registry".equals(uri)) 
    RegistryParam registryParam = GsonTool.fromJson(data, RegistryParam.class);
    return adminBiz.registry(registryParam);
 else if ("registryRemove".equals(uri)) 
    RegistryParam registryParam = GsonTool.fromJson(data, RegistryParam.class);
    return adminBiz.registryRemove(registryParam);
 else 
    return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, uri-mapping("+ uri +") not found.");

最后进入到JobRegistryHelper.registry()方法中完成数据库的入库和更新操作。

通过更新语句判断该执行器是否注册,结果小于1,那么保存注册器信息,并向注册中心发送一个请求,更新当前执行器所属的应用名称、执行器名称和 IP 地址等信息,否则跳过。

public ReturnT<String> registry(RegistryParam registryParam) 
	//.......
    // async execute
    registryOrRemoveThreadPool.execute(new Runnable() 
        @Override
        public void run() 
            //更新注册表信息
            int ret = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().registryUpdate(registryParam.getRegistryGroup(), registryParam.getRegistryKey(), registryParam.getRegistryValue(), new Date());
            if (ret < 1) 
                //保存执行器注册信息
                XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().registrySave(registryParam.getRegistryGroup(), registryParam.getRegistryKey(), registryParam.getRegistryValue(), new Date());

                // fresh 刷新执行器状态
                freshGroupRegistryInfo(registryParam);
            
        
    );

    return ReturnT.SUCCESS;

至此执行器的注册流程分析完成。

执行器中的任务执行过程

部分流程图:

执行器中的任务流程比较简单,如果执行器启动的话,那么每次执行任务是通过JobThread通过Cron 表达式进行操作的。

通过handler.execute()进行执行,是在框架内部通过反射机制调用作业处理器对象 handler 中的 execute() 方法实现的。在这个过程中,handler 对象表示被加载的作业处理器,并且已经调用了init()方法进行初始化。

method.invoke() 方法使用反射机制调用指定对象 target 中的方法 method。在这个方法中,target 表示作业处理器对象,method 表示作业处理器中的 execute() 方法。

通过上述方法,获取到SampleXxlJob.demoJobHandler的任务,然后开始进行任务逻辑操作。

XXL-JOB分布式任务调度框架-源码分析-调度中心对执行器的上下线感知实现原理


文章目录

1.引言

在前面三篇文章内容中,我们已经获取到了一个XXL-JOB的集群,以及一个可以执行任务的调度器,同时,在实际的项目中可以参照这个流程,引入定时任务。

接下来,我们就可以探索一下调度中心对执行器的上下线感知实现原理,主要包括以下几点:

  • 执行器注册流程
  • 执行器的注销流程
  • 调度中心探活流程

在运行过程中,调度中心要对执行器进行调度,得先获取到执行器的信息,才能根据信息发起调度请求,同时,我们又不希望因调度中心调用到已宕机的执行器而导致程序异常。

于是,XXL-JOB在调度中心中,维护了一个注册中心,通过xxl_job_registry这张表来实现的,调度中心每次发起调度请求时,都会通过这张表中的数据来做负载均衡。那么,只需要做到将活跃的执行器信息注册上去,并在执行器停机或宕机后,将其从注册中心中移除,这样,调度中心就获得了对执行器的上线下感知。

2.调度关系

定时任务是如何被调用的呢?

我们先看一个分层结构图,XXL-JOB的调度关系分为了3层,每层向下进行调度,最上层是调度中心,最下层是定时任务需要执行的方法,调度中心可以调度不同的执行器,执行器再调用归属于自己的定时任务,如下图所示:

调度中心在调度执行器时,需要知道执行器的ip和端口号,以此来找到对应的执行器节点来进行调度。而调度中心获取到执行器ip的方式有两种,分别是:自动注册手动录入

一般不会使用手动录入的方式,为什么呢?可以想象一下,在新增、减少了执行器实例,执行器宕机时,都需要手动修改机器地址,意味着需要有人24小时盯着,这是一件很可怕的事。

所以,正常情况下我们都会选择使用自动注册的方式来创建,选择这种方式的话,就需要调度中心与执行器之间建立通信机制,通过网络请求传输注册信息。

注:下面是在后台管理系统中的配置。

3.执行器注册

当前版本(2.3.1)的XXL-JOB采用的是Http通信,而调度中心是通过SpringBoot来实现的。实际上,调度中心就是启动了一个Tomcat,并提供了执行器注册接口。执行器在启动的时候就会调用这个接口,将自己的ip,端口等信息传输到调度中心,再由调度中心存入数据库中,这样就完成了执行器注册。

3.1.调度中心处理注册请求

首先,需要调度中心向外暴露的注册接口位置。

XXL-JOB项目中的的命名还是比较规范的,我们可以在xxl-job-admincontoller包中去搜索,很容易找到一个api相关的Controller接口JobApiController,进入到这个类中。
果然,在这个类里面有一个api相关的方法,如下:

/**
 * api
 *
 * @param uri
 * @param data
 * @return
 */
@RequestMapping("/uri")
@ResponseBody
@PermissionLimit(limit=false)
public ReturnT<String> api(HttpServletRequest request, @PathVariable("uri") String uri, @RequestBody(required = false) String data) 

    // valid
    if (!"POST".equalsIgnoreCase(request.getMethod())) 
        return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, HttpMethod not support.");
    
    if (uri==null || uri.trim().length()==0) 
        return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, uri-mapping empty.");
    
    if (XxlJobAdminConfig.getAdminConfig().getAccessToken()!=null
            && XxlJobAdminConfig.getAdminConfig().getAccessToken().trim().length()>0
            && !XxlJobAdminConfig.getAdminConfig().getAccessToken().equals(request.getHeader(XxlJobRemotingUtil.XXL_JOB_ACCESS_TOKEN))) 
        return new ReturnT<String>(ReturnT.FAIL_CODE, "The access token is wrong.");
    

    // services mapping
    if ("callback".equals(uri)) 
        List<HandleCallbackParam> callbackParamList = GsonTool.fromJson(data, List.class, HandleCallbackParam.class);
        return adminBiz.callback(callbackParamList);
     else if ("registry".equals(uri)) 
        RegistryParam registryParam = GsonTool.fromJson(data, RegistryParam.class);
        //服务注册
        return adminBiz.registry(registryParam);
     else if ("registryRemove".equals(uri)) 
        RegistryParam registryParam = GsonTool.fromJson(data, RegistryParam.class);
        return adminBiz.registryRemove(registryParam);
     else 
        return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, uri-mapping("+ uri +") not found.");
    


这是一个RESTFUL接口,包含了三个策略路径,分别是:callback,registry,registryRemove,通过语义,可以大胆的猜测registry这条路径就是注册操作,registryRemove是注销操作,而callback是执行器的执行结果回调(本篇暂不关注回调接口)。

我们通过registry的路径一路顺藤摸瓜,就找到了实际做注册动作的方法

JobRegistryHelper

// ---------------------- helper ----------------------

public ReturnT<String> registry(RegistryParam registryParam) 

// valid
if (!StringUtils.hasText(registryParam.getRegistryGroup())
		|| !StringUtils.hasText(registryParam.getRegistryKey())
		|| !StringUtils.hasText(registryParam.getRegistryValue())) 
	return new ReturnT<String>(ReturnT.FAIL_CODE, "Illegal Argument.");


// async execute
registryOrRemoveThreadPool.execute(new Runnable() 
	@Override
	public void run() 
		int ret = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().registryUpdate(registryParam.getRegistryGroup(), registryParam.getRegistryKey(), registryParam.getRegistryValue(), new Date());
		if (ret < 1) 
			XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().registrySave(registryParam.getRegistryGroup(), registryParam.getRegistryKey(), registryParam.getRegistryValue(), new Date());

			// fresh
			freshGroupRegistryInfo(registryParam);
		
	
);

return ReturnT.SUCCESS;

上图中的就是一个简单的saveOrUpdate方法,只是这里用异步来做了,很好理解:

  • getXxlJobRegistryDao():表示获取xxl_job_registry这张表对应的dao对象。
  • registrySave:创建一条执行器注册数据。
  • registryUpdate:更新执行器信息,这个操作是用来维持心跳连接的。

简单的说,就是调度中心会接收执行器的registry请求,然后将请求中传入的参数保存到xxl_job_registy表中。这就是调度中心运行的执行器注册主流程,一个非常简单的CRUD

看完了主流程之后,我们再来看一下细节,可以发现这里的注册代码并不是同步执行的,而是通过一个线程池registryOrRemoveThreadPool来进行的异步操作。这里也体现了XXL-JOB的一个设计思想,即全异步化调用,我们在研究后续原理的时候,还会经常看到这样的用法。

registryOrRemoveThreadPool的创建

registryOrRemoveThreadPool这个线程池是项目启动时提前创建好的,通过Ideausages可以找到,选中代码中的registryOrRemoveThreadPool使用快捷键alt+F7,可以打开下图所示的界面,找到一个new ThreadPoolExecutor()的地方,这就创建线程池位置。

这里还可以进一步查看XXL-JOB的配置初始化过程,使用alt+鼠标左键查看start()方法的使用位置。

可以看到这里做了各种各样的初始化操作,后续想了解XXL-JOB中的某个流程的话,就可以以这里的初始化操作为线索,找到对应的代码流程,在后续的源码探索中,还会多次进入这个位置。

3.2.执行器发起注册请求

所谓的执行器,实际上就是一个引入了xxl-job-core包的Spring-Web项目,在上一篇的内容中,我们在代码中只写了一个@XxlJob注解就完成了一个定时任务方法,就是因为大部分工作都是由xxl-job-core这个包来完成的,现在我们可以去探索一下,执行器是如何注册到调度中心的。

在上面xxl-job-admin中的注册接口吗,在这个接口中使用了一个AdminBiz接口,进入到这个接口中,找到registry方法,它有两个实现:

  • 一个在xxl-job-admin中,这是上面已经讲到的调度中心实现注册的方法。
  • 另一个在xxl-job-core中,这就是执行器发起注册请求的位置。

我们可以通过注册请求倒推回去,可以找到一个ExecutorRegistryThread

上图红框中的内容展示了是通过appNameaddress组成了一个请求参数,然后将这个参数传输到了xxl-job-admin中,这就是执行器注册的入口。这里可以注意一下while(!toStop),说明当前的registryThread线程会循环调用注册方法,还记得上面的registryUpdate吗?

我们说这个是用来维持心跳连接的,那么心跳请求是多长时间发送一次呢?可以把代码往下拉:


通过XXL-JOB的架构,我们已经知道在执行器启动之后,需要调度中心的来做任务调度,而调度中心需要知道执行器的标识以及IP地址、端口,才能对指定的执行器发送调度请求。这也是为什么上图中的请求参数中会有appNameaddress

既然执行器把地址交给了调度中心,很自然的可以想到,在交出地址之前,执行器会按照这个地址启动一个供调度中心调用的web服务。

继续往外层跳,可以找到web服务的启动代码,这里使用的是netty

综上,执行器这边的主要流程可以通过一张简图来表示:

4.执行器注销

执行器的注销分为主动注销和被动注销两种。

  • 主动注销:顾名思义,就是执行器向调度中心发送注销请求,调度中心接收后把这个执行器的注册信息删除掉。
  • 被动注销:就是执行器以外宕机后,无法正常的向调度中心发送注销请求,由调度中心的探活线程发现了某个执行器已下线,此时将该执行器的注册信息删除掉。

4.1.主动注销

主动注销的发起时机是在Spring容器正常关闭时,XXL-JOB的执行器类XxlJobSpringExecutor实现了DisposableBean接口,这个接口提供了一个destory方法。

在后续的流程中,会停止Netty服务,中断探活线程,并向调度中心发送removeRegistry请求。

stop的状态修改后,这里的探活循环就会停止,进而会调用到下面的registryRemove方法。

调度中心收到请求后,也会通过registryOrRemoveThreadPool线程池进行异步处理,最终将xxl_job_registry中对应的执行器信息删除掉。

4.2.被动注销

调度中心初始化时,会启动一个监控线程registryMonitorThread,这个线程每30秒会触发一次探活操作(即每循环一次sleep 30秒),探活操作触发时会查询xxl_job_registry表中的数据,将update_time与当前时间的差值大于90s的数据查询出来,将这部分数据删掉掉。

把sleep的时间差也考虑进去的话,就是执行器在最多120秒内都没有发送新的注册请求来维持心跳的话,这个执行器就会被调度中心注销掉。

心跳是怎么维持的呢?

看了上面执行器发起注册的流程,大概也能猜到了,执行器里面的registryThread每30秒会调用一次调度中心的注册接口,调度中心收到请求后,更新update-time的值。

5.流程图

经过上面的探索,我们已经了解了执行器的注册与注销的流程,下面是这整个流程的流程图。

6. 总结

本篇内容主要是在探索执行器注册到调度中心的流程以及代码实现,流程如下:

  1. 调度中心启动了一个Tomcat作为Web容器,暴露出注册与注销的接口,可以供执行器调用。
  2. 执行器在启动Netty服务暴露出调度接口后,将自己的nameip、端口信息通过调度中心的注册接口传输到调度中心,同时每30秒会调用一次注册接口,用于更新注册信息。
  3. 同理,在执行器停止的时候,也会请求调度中心的注销接口,进行注销。
  4. 调度中心在接收到注册或注销请求后,会操作xxl_job_registry表,新增或删除执行器的注册信息。
  5. 调度中心会启动一个探活线程,将90秒都没有更新注册信息的执行器删除掉。

由于本篇只是在探索注册与发现的流程,所以忽略在这个流程中还涉及到的任务调度与回调相关的逻辑,这部分逻辑将在下一篇调度流程原理分析中讲到。

以上是关于源码分析XXL-JOB的执行器的注册流程的主要内容,如果未能解决你的问题,请参考以下文章

XXL-JOB分布式任务调度框架-源码分析-任务调度执行流程及实现原理

xxl-job的实现原理(源码分析)

xxl-job的实现原理(源码分析)

XXL-JOB分析(一任务执行的过程源码分析)

xxl-job分析

xxl-job后继任务导致前一个任务执行一半,源码分析xxljob