xxl-job的实现原理(源码分析)

Posted 八阿哥克星

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了xxl-job的实现原理(源码分析)相关的知识,希望对你有一定的参考价值。

关于使用xxl-job的流程,这里不再赘述。在加入xxl-job依赖后,需要我们自己做的就是创建一个XxlJobSpringExecutor对象交给spring容器管理,因为XxlJobSpringExecutor实现了SmartInitializingSingleton接口,在spring管理的对象完成初始化之后,会执行SmartInitializingSingleton接口的afterSingletonsInstantiated()方法,XxlJobSpringExecutor重写了此方法;

1、初始化执行任务handler

​        initJobHandlerMethodRepository()方法中,会将spring管理的对象遍历一遍,找到被@Xxljob注解(加在类上的@JobHandler已经被摒弃)修饰的实例对象的方法,并把该方法作为key,作用在该实例上的@Xxljob注解作为value放在一个map集合中annotatedMethods;然后对annotatedMethods进行迭代,获取到相应的method和 @Xxljob注解的value值,也就是任务调度平台中jobhandler的值,接着会注册执行任务registJobHandler(..)到jobHandlerRepository中,以供后期使用;

private void initJobHandlerMethodRepository(ApplicationContext applicationContext) 
        if (applicationContext != null) 
            String[] beanDefinitionNames = applicationContext.getBeanNamesForType(Object.class, false, true);
            String[] var3 = beanDefinitionNames;
            int var4 = beanDefinitionNames.length;

            label86:
            for(int var5 = 0; var5 < var4; ++var5) 
                String beanDefinitionName = var3[var5];
                Object bean = applicationContext.getBean(beanDefinitionName);
                Map annotatedMethods = null;
                //获取到加了@Xxljob注解的方法
                try 
                    annotatedMethods = MethodIntrospector.selectMethods(bean.getClass(), new MetadataLookup<XxlJob>() 
                        public XxlJob inspect(Method method) 
                            return (XxlJob)AnnotatedElementUtils.findMergedAnnotation(method, XxlJob.class);
                        
                    );
                 catch (Throwable var19) 
                    logger.error("xxl-job method-jobhandler resolve error for bean[" + beanDefinitionName + "].", var19);
                

                if (annotatedMethods != null && !annotatedMethods.isEmpty()) 
                    Iterator var9 = annotatedMethods.entrySet().iterator();

                    while(true) 
                        Method method;
                        XxlJob xxlJob;
                        do 
                            if (!var9.hasNext()) 
                                continue label86;
                            

                            Entry<Method, XxlJob> methodXxlJobEntry = (Entry)var9.next();
                            method = (Method)methodXxlJobEntry.getKey();
                            xxlJob = (XxlJob)methodXxlJobEntry.getValue();
                         while(xxlJob == null);

                        String name = xxlJob.value();
                        if (name.trim().length() == 0) 
                            throw new RuntimeException("xxl-job method-jobhandler name invalid, for[" + bean.getClass() + "#" + method.getName() + "] .");
                        

                        if (loadJobHandler(name) != null) 
                            throw new RuntimeException("xxl-job jobhandler[" + name + "] naming conflicts.");
                        

                        if (method.getParameterTypes().length != 1 || !method.getParameterTypes()[0].isAssignableFrom(String.class)) 
                            throw new RuntimeException("xxl-job method-jobhandler param-classtype invalid, for[" + bean.getClass() + "#" + method.getName() + "] , The correct method format like \\" public ReturnT<String> execute(String param) \\" .");
                        

                        if (!method.getReturnType().isAssignableFrom(ReturnT.class)) 
                            throw new RuntimeException("xxl-job method-jobhandler return-classtype invalid, for[" + bean.getClass() + "#" + method.getName() + "] , The correct method format like \\" public ReturnT<String> execute(String param) \\" .");
                        

                        method.setAccessible(true);
                        Method initMethod = null;
                        Method destroyMethod = null;
                        if (xxlJob.init().trim().length() > 0) 
                            try 
                                initMethod = bean.getClass().getDeclaredMethod(xxlJob.init());
                                initMethod.setAccessible(true);
                             catch (NoSuchMethodException var18) 
                                throw new RuntimeException("xxl-job method-jobhandler initMethod invalid, for[" + bean.getClass() + "#" + method.getName() + "] .");
                            
                        

                        if (xxlJob.destroy().trim().length() > 0) 
                            try 
                                destroyMethod = bean.getClass().getDeclaredMethod(xxlJob.destroy());
                                destroyMethod.setAccessible(true);
                             catch (NoSuchMethodException var17) 
                                throw new RuntimeException("xxl-job method-jobhandler destroyMethod invalid, for[" + bean.getClass() + "#" + method.getName() + "] .");
                            
                        
                        //注册执行任务到jobHandlerRepository属性
                        registJobHandler(name, new MethodJobHandler(bean, method, initMethod, destroyMethod));
                    
                
            

        
    

2、注册执行器信息到调度平台,并且启动netty服务等待调度平台连接

初始化完执行任务后,就要进行执行器的注册,执行到父类的start()方法

​        

        该方法主要有日志初始化,日志清理任务初始化,RPC调用触发器回调线程启动,调度中心列表初始化以及执行器端口初始化。在走到this.initAdminBizList(this.adminAddresses, this.accessToken),初始化注册中心列表,用于后期和注册中心交互,而真正的注册执行器是在this.initEmbedServer(this.address, this.ip, this.port, this.appname, this.accessToken)方法中;

一直进入到EmbedServer的start()方法中,在这个方法中,有一段启动netty服务的代码,绑定的端口号就是之前配置好的执行器的端口号,也就是说,这里时开启了一个netty,对该端口号进行监听,等待调度平台的连接请求去执行到要执行的任务。

当调度平台触发任务时,会根据负载均衡选择一台执行器并携带着任务的参数(jobhandler或者分片序号等等)访问上图中启动的netty服务,netty执行该事件的回调方法,根据传来的参数jobhandler在之前注册过的jobHandlerRepository找到对应的执行任务的方法,执行代码。

以上是关于xxl-job的实现原理(源码分析)的主要内容,如果未能解决你的问题,请参考以下文章

XXL-JOB分布式任务调度框架-源码分析-任务调度执行流程及实现原理

XXL-JOB分布式任务调度框架-源码分析-调度中心对执行器的上下线感知实现原理

XXL-JOB分析(一任务执行的过程源码分析)

xxl-job后继任务导致前一个任务执行一半,源码分析xxljob

源码分析XXL-JOB的执行器的注册流程

分布式调度平台XXL-JOB源码分析-重试机制