ScheduledThreadPoolExecutor定时任务执行线程池分析

Posted 好大的月亮

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了ScheduledThreadPoolExecutor定时任务执行线程池分析相关的知识,希望对你有一定的参考价值。

概述

ScheduledThreadPoolExecutor自然是继承了ThreadPoolExecutor,那么它也就是一个被定义了特定功能的线程池而已,本质上就是一个ThreadPoolExecutor。

代码分析

可以看到其继承了ThreadPoolExecutor,在new ScheduledThreadPoolExecutor实例的时候,调用的是父类的构造函数。

在new的时候其传入了初始核心线程数,最大线程数为Integer.MAX_VALUE,线程空闲存活时间为0,默认的拒绝策略是抛异常给调用线程。

这里传入的队列是DelayedWorkQueue
顾名思义就是按照执行时间的升序来排列,执行时间距离当前时间越近的任务在队列的前面。

图例分析

继承关系

实例化调用父类构造

父类构造方法

默认的线程池拒绝策略是抛异常给调用线程

传入的队列是DelayedWorkQueue
会自动增长容量,最大是Integer.MAX_VALUE。所以正常情况下,线程池最多就保持corePoolSize数量的活跃线程

DelayedWorkQueue

顾名思义就是按照执行时间的升序来排列,执行时间距离当前时间越近的任务在队列的前面。
核心数据结构是二叉最小堆的优先队列.其容量会动态增长(增长的时候每次增加50%的容量)。

DelayedWorkQueue是一个基于堆的数据结构,类似于DelayQueuePriorityQueue.在执行定时任务的时候,每个任务的执行时间都不同,所以DelayedWorkQueue的工作就是按照执行时间的升序来排列,执行时间距离当前时间越近的任务在队列的前面(注意:这里的顺序并不是绝对的,堆中的排序只保证了子节点的下次执行时间要比父节点的下次执行时间要大,而叶子节点之间并不一定是顺序的)。


用数组来表示就是这样的

在这种结构中,可以发现有如下特性:
假设第一个元素”在数组中的下标为0的话,则父结点和子结点的位置关系如下:

索引为 的左孩子的索引是 (2∗i+1)(2 * i + 1)(2∗i+1)
索引为 的右孩子的索引是 (2∗i+2)(2 * i + 2)(2∗i+2)
索引为 的父结点的索引是 floor((i−1)/2)floor((i-1)/2)floor((i−1)/2)

代码demo

ScheduledThreadPoolExecutor中的
schedule:一次性定时任务
scheduleWithFixedDelay:周期性执行,下次执行时间=上一次任务从执行到结束所需要的时间+给定的间隔时间。

scheduleAtFixedRate:周期性执行,创建一个给定初始延迟的间隔性的任务,之后的每次任务执行时间为 初始延迟 + N * delay(间隔) 。这里的N为首次任务执行之后的第N个任务,N从1开始。比如首次执行任务的时间为12:00 那么下次任务的执行时间是固定的 是12:01 下下次为12:02。与scheduleWithFixedDelay最大的区别就是 ,scheduleAtFixedRate 不受任务执行时间的影响。

获取内容

package com.xxx.xxx;

import com.cloud.common.core.enums.system.UserQueryType;
import com.xxx.xxx.xxx.SysUserService;
import org.apache.commons.collections4.CollectionUtils;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.*;

@Component
public class QueryTypeContext implements InitializingBean 


    private static SysUserService sysUserService;

    @Value("$queryType.refresh.threadCount:8")
    private Integer threadCount;

    public static ScheduledExecutorService executorService;

    private static List<String> qcRoleKeys;

    private static Map<String,QueryTypeRefreshTask> refreshQueryType = new ConcurrentHashMap<>();



    @Resource
    public void setSysUserService(SysUserService sysUserService) 
        QueryTypeContext.sysUserService = sysUserService;
    


    public static List<String> getQcRoleKeys() 
        if(CollectionUtils.isEmpty(qcRoleKeys))
            synchronized (QueryTypeContext.class)
                if(CollectionUtils.isEmpty(qcRoleKeys))
                    qcRoleKeys = getSyncRoleKeysByQueryType(UserQueryType.QC.getCode());
                else 
                    return qcRoleKeys;
                
            
        
        return qcRoleKeys;
    

    public static void setQcRoleKeys(List<String> qcRoleKeys) 
        QueryTypeContext.qcRoleKeys = qcRoleKeys;
    

    private static List<String> getSyncRoleKeysByQueryType(String queryType)
        startRefreshConfig(queryType);
        return sysUserService.getSyncRoleKeysByQueryType(queryType);
    

    private static void startRefreshConfig(String type)
        QueryTypeRefreshTask queryTypeRefreshTask = refreshQueryType.get(type);
        if(Objects.isNull(queryTypeRefreshTask))
            synchronized (QueryTypeContext.class)
                queryTypeRefreshTask = refreshQueryType.get(type);
                if(Objects.isNull(queryTypeRefreshTask))
                    QueryTypeRefreshTask task = new QueryTypeRefreshTask(sysUserService, type);
                    //一次性任务,按照指定时间延时
                    //executorService.schedule(task, 10, TimeUnit.SECONDS);
                    //周期性执行任务,按照指定时间周期性执行
                    executorService.scheduleWithFixedDelay(task, 10, 10,TimeUnit.SECONDS);
                    refreshQueryType.put(type, task);
                
            
        
    


    @Override
    public void afterPropertiesSet() throws Exception 
        QueryTypeContext.executorService = new ScheduledThreadPoolExecutor(threadCount, new ThreadFactory() 
            @Override
            public Thread newThread(Runnable r) 
                Thread thread = new Thread(r);
                thread.setDaemon(true);
                thread.setName("com.cloud.system#refresh");
                return thread;
            
        );
    

具体任务

package com.xx.xx;

import com.xx.xx.xx.SysUserService;

public class QueryTypeRefreshTask implements Runnable

    private SysUserService sysUserService;

    private String type;

    public QueryTypeRefreshTask() 
    

    public QueryTypeRefreshTask(SysUserService sysUserService, String type)
        this.sysUserService = sysUserService;
        this.type = type;
    

    @Override
    public void run() 

        QueryTypeContext.setQcRoleKeys(
                sysUserService.getSyncRoleKeysByQueryType(type)
        );

    

以上是关于ScheduledThreadPoolExecutor定时任务执行线程池分析的主要内容,如果未能解决你的问题,请参考以下文章