XxlJob XxlJob中的多线程

Posted Dream_it_possible!

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了XxlJob XxlJob中的多线程相关的知识,希望对你有一定的参考价值。

XxlJob(一) 分布式定时任务XxlJob用法及核心调度源码详解_Dream_it_possible!的博客-CSDN博客_xxl-job 核心原理

XxlJob(二) 负载均衡用法及实现原理详解_Dream_it_possible!的博客-CSDN博客

目录

前期回顾

一、Volatile关键字的应用

JobThread的启动与停止

二、Daemon线程的使用

三、InheritableThreadLocal 传递线程上下文

XxlJobContext

四、快慢线程池的巧用


前期回顾

梳理一下前二章的内容:

第一章讲解了一个Job的生命周期和核心调度流程

1)  JobHandler的创建、注册。 容器启动时会扫描并注册所有@XxlJob 标记的bean, 此操作伴随着Spring 容器启动实现注册。

  private void initJobHandlerMethodRepository(List<Object> xxlJobBeanList) 
        if (xxlJobBeanList==null || xxlJobBeanList.size()==0) 
            return;
        

        // init job handler from method
        for (Object bean: xxlJobBeanList) 
            // method
            Method[] methods = bean.getClass().getDeclaredMethods();
            if (methods.length == 0) 
                continue;
            
            for (Method executeMethod : methods) 
                XxlJob xxlJob = executeMethod.getAnnotation(XxlJob.class);
                // registry
                registJobHandler(xxlJob, bean, executeMethod);
            

        

    

2)  任务触发。任务触发分为手动触发和自动触发两种方式。自动触发通过scheduler实现,admin server里的JobScheduleHelper启动一个schedule线程从数据库表xxl_job_info 取任务放时间轮里,同时启动一个ringThread从时间轮里取任务,最终通过XxlJobTrigger 去触发,调用executor 组件的/run方法。XxlJobScheduler伴随着Spring 容器启动,其中JobTriggerPoolHelper里的addTrigger方法通过线程池异步提交trigger,提升效率。

// do trigger
XxlJobTrigger.trigger(jobId, triggerType, failRetryCount, executorShardingParam, executorParam, addressList);

3) 发起请求调度executor。XxlJobTrigger的processTrigger方法实现了远程调用executor的run接口。

 // 4、trigger remote executor
        ReturnT<String> triggerResult = null;
        if (address != null) 
            triggerResult = runExecutor(triggerParam, address);
         else 
            triggerResult = new ReturnT<String>(ReturnT.FAIL_CODE, null);
        

4) 注册JobThread并启动Job。在registJobThread方法里,将jobId和methodHanlder交给jobThread, 注册的同时启动JobThread, 最终通过MethodJobHandler执行目标方法。

 public static JobThread registJobThread(int jobId, IJobHandler handler, String removeOldReason)
        JobThread newJobThread = new JobThread(jobId, handler);
        newJobThread.start();
        logger.info(">>>>>>>>>>> xxl-job regist JobThread success, jobId:, handler:", new Object[]jobId, handler);

        JobThread oldJobThread = jobThreadRepository.put(jobId, newJobThread);	// putIfAbsent | oh my god, map's put method return the old value!!!
        if (oldJobThread != null) 
            oldJobThread.toStop(removeOldReason);
            oldJobThread.interrupt();
        

        return newJobThread;
    

第二篇文章讲了XxlJob的负载均衡策略。

1)  根据应用名配置多个实例的executor应用,只要保证应用名相同即可。

2)  任务的配置策略为轮询,注册成功后,启动任务。

3)  机器实例动态伸缩。

4) 负载均衡策略实现。根据配置的route key 选择指定的策略,此处用到了设计模式中的策略模式;在计算Server时使用AtomicInteger实现原子递增,通过addAndGet(1)记录一个JobId的执行次数,再取模serverList的size就能轮询得到所有的server。

一、Volatile关键字的应用

        在Java中可以使用volatile 关键字来标记一个共享变量,能实现多线程环境中的可见性,在多线程执行的过程中,如果有一个线程修改过该共享变量,那么其他线程会通过CPU的总线嗅探机制感知到该变量的update。

        接下来看下XxlJob中应用到Volatile关键字的地方。

JobThread的启动与停止

        前面提到在注册JobThread的同时启动了JobThread

  JobThread newJobThread = new JobThread(jobId, handler);
 newJobThread.start();

        同时停掉旧的JobThread线程将toStap 属性置为true。

  JobThread oldJobThread = jobThreadRepository.put(jobId, newJobThread);	// putIfAbsent 
        if (oldJobThread != null) 
            oldJobThread.toStop(removeOldReason);
            oldJobThread.interrupt();
        

         启动和停止的过程都使用了一个属性toStop, toStop属性被volatile关键字修饰。

    private volatile boolean toStop = false;

         volatile能保证共享变量在多线程中的可见性,那么问题来了,JobThread是如何保证一个jo只成功执行一次,使用while(!toStop) 给一个Job创造一个可重复执行的环境,直到执行成功。

  @Override
    public void run() 
        // 执行方法的生命周期, 例如@XxlJob(value="",init="",destroy="")
        // init, 调用 handler里的init()方法,
        try 
            handler.init();
         catch (Throwable e) 
            logger.error(e.getMessage(), e);
        

        // execute
        while (!toStop) 
            running = false;
            idleTimes++;

二、Daemon线程的使用

         Java中的Daemon线程是为用户线程服务的,可以理解为后台线程,随着JVM的停止而终止, 在线程启动前通过setDaemon(true) 值表示该线程是一个后台线程,线程的退出先后顺序如下: 

user thread leave >>> daemon leave >>> jvm leave

        XxlJob的executor组件里使用到了很多Daemon线程,主要的罗列如下:

线程名       功能描述
TriggerCallBackThreadtriggerCallBackThread trigger回调线程
TriggerCallBackThreadtriggerRetryCallBackThreadtrigger 回调重试线程,用于回调失败后的重试。
ExecutorRegistryThreadregistryThread自动注册executor的ip地址到xxl_job_registry数表里,此线程在executor的内置server启动时开启。
JobLogFileCleanThreadlocalThread清理日志文件的后台线程,默认是关闭的线程,可设置executor的属性logRetentionDays值大于3时,该线程才会开启。

        这些daemon线程为应用提供一些后台服务,执行一些耗时操作,有效的利用了系统资源。

三、InheritableThreadLocal 传递线程上下文

        InheritableThreadLocal 是一个可以由子线程继承属性值的ThreadLocal,  XxlJobContext 使用到了InheritableThreadLocal, 保证线程上下文传递时的属性是线程自有的。

XxlJobContext

        在Jobhandler执行execute()之前到执行结束,XxlJobContext 都可以一直为该线程所用:

   XxlJobContext xxlJobContext = new XxlJobContext(
                            triggerParam.getJobId(),
                            triggerParam.getExecutorParams(),
                            logFileName,
                            triggerParam.getBroadcastIndex(),
                            triggerParam.getBroadcastTotal());

                    // init job context
                    XxlJobContext.setXxlJobContext(xxlJobContext);

        每个线程都持有一个XxlJobContext,在JobThread的run()方法执行前初始化:

   private static InheritableThreadLocal<XxlJobContext> contextHolder = new InheritableThreadLocal<XxlJobContext>(); // support for child thread of job handler)

    public static void setXxlJobContext(XxlJobContext xxlJobContext)
        contextHolder.set(xxlJobContext);
    

    public static XxlJobContext getXxlJobContext()
        return contextHolder.get();
    

        同时使用XxlJobContext存放执行job的code和msg。

        这样一来,不管这个Job有没有执行成功,都能通过XxlJobContext去传递code和msg, 最终写入到数据库。 

四、快慢线程池的巧用

        线程池是Java多线程中使用最多的工具之一,我们只需要创建一个pool, 那么后面可以从pool中持续的拿到线程,线程的创建等工作直接交给线程池去管理,值得一提的是,在XxlJob的admin server里JobTriggerPoolHelper 初始化了2个线程池,一个为fastTriggerPool, 另一个为slowTriggerPool。

    // fast/slow thread pool
    private ThreadPoolExecutor fastTriggerPool = null;
    private ThreadPoolExecutor slowTriggerPool = null;

    public void start()
        fastTriggerPool = new ThreadPoolExecutor(
                10,
                XxlJobAdminConfig.getAdminConfig().getTriggerPoolFastMax(),
                60L,
                TimeUnit.SECONDS,
                new LinkedBlockingQueue<Runnable>(1000),
                new ThreadFactory() 
                    @Override
                    public Thread newThread(Runnable r) 
                        return new Thread(r, "xxl-job, admin JobTriggerPoolHelper-fastTriggerPool-" + r.hashCode());
                    
                );

        slowTriggerPool = new ThreadPoolExecutor(
                10,
                XxlJobAdminConfig.getAdminConfig().getTriggerPoolSlowMax(),
                60L,
                TimeUnit.SECONDS,
                new LinkedBlockingQueue<Runnable>(2000),
                new ThreadFactory() 
                    @Override
                    public Thread newThread(Runnable r) 
                        return new Thread(r, "xxl-job, admin JobTriggerPoolHelper-slowTriggerPool-" + r.hashCode());
                    
                );
    

        默认的快线程池最大核心数为200,慢线程池最大核心线程数为100,其中慢线程池可以堆积的任务为2000。

        如果一个JobId的对应的任务执行间超过500ms,那么该任务视为一个慢任务,并用jobTimeOutCountMap记录,如果超过10次执行都超过了500ms,那么后续执行该JobId选用慢线程池。

    AtomicInteger jobTimeoutCount = jobTimeoutCountMap.get(jobId);
        if (jobTimeoutCount!=null && jobTimeoutCount.get() > 10)       // job-timeout 10 times in 1 min
            triggerPool_ = slowTriggerPool;
        

        通过快慢线程池的选用,用时短的任务, XxlJob就用快线程池同时多执行些,用时长的任务,XxlJob就用慢线程池, 快慢线程池的使用能最大化的利用系统资源去处理耗时任务和非耗时任务。

以上是关于XxlJob XxlJob中的多线程的主要内容,如果未能解决你的问题,请参考以下文章

XxlJob 分布式定时任务XxlJob用法及核心调度源码详解

分布式定时任务XxlJob用法及核心调度源码详解

项目实战典型案例24.xxljob控制台不打印日志排查

xxl-job后继任务导致前一个任务执行一半,源码分析xxljob

XxlJob 负载均衡用法及实现原理详解

SpringBoot项目集成xxljob全纪录(图文详解)