线程池监控设计

Posted 恒奇恒毅

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了线程池监控设计相关的知识,希望对你有一定的参考价值。

    为了提高系统吞吐量,系统使用线程池越来越多。由于缺乏对线程池的统一监控手段,运营人员不知道线程池的具体运行情况,不利于线程池的性能调优。
   开发线程池监控系统,对系统中的线程池进行统一监控。
  1. 定义线程池统计数据接口ExecutorMonitorData,用于获取线程池的运行时数据,包括所属服务、线程池名称、线程池的唯一标识(线程池一般随系统启动而启动,随系统关闭而关闭)、核心线程数、当前线程数、历史最大线程、最大线程数、当前活跃线程数、最大空闲时间、当前任务数、已完成数量、任务执行总时间、阻塞队列、线程工厂、拒绝策略、启动时间。
/**
 * 定义监控数据接口
 */
public interface ExecutorMonitorData 
	String IDENTIFIER_SEPARATOR = "@@";
	/**
	 * 线程池启动时间
	 */
	Date getStartTime();

	/**
	 * 线程池所在服务
	 */
	String getService();

	/**
	 * 线程池名字,每个线程池有一个唯一的名字,比如HttpRequest、DbExecutor
	 */
	String getPoolName();

	/**
	 * 线程池的唯一标识,因为线程池可能因为重启等因素被关闭,但是要唯一区别一个线程池,所以不能使用@link this#getPoolName(),
	 * 建议的格式为@link this#getService()@@@link this#getPoolName()@@@link this#getStartTime().getTime()
	 */
	default String getIdentifier() 
		return getService() + IDENTIFIER_SEPARATOR + getPoolName() + IDENTIFIER_SEPARATOR + getStartTime().getTime();
	

	/**
	 * 当前核心线程数
	 */
	int getPoolSize();

	/**
	 * 配置的核心线程数
	 */
	int getCorePoolSize();

	/**
	 * 历史到达最大线程数
	 */
	int getLargestPoolSize();

	/**
	 * 配置的最大线程数
	 */
	int getMaximumPoolSize();

	/**
	 * 当前活跃线程数
	 */
	int getActiveCount();

	/**
	 * 线程最大空闲时间
	 */
	long getKeepAliveTime(TimeUnit timeUnit);

	/**
	 * 当前任务数
	 */
	long getTaskCount();

	/**
	 * 已完成任务数
	 */
	long getCompletedTaskCount();

	BlockingQueue<Runnable> getQueue();

	ThreadFactory getThreadFactory();

	RejectedExecutionHandler getRejectedExecutionHandler();

	/**
	 * 任务执行总时间
	 */
	long getTotalExeTime();


  1. 定义一种统计数据接口的实现MonitoredThreadPoolExecutor,继承自ThreadPoolExecutor,增加没有的属性,包括所属服务、名称、启动时间、任务执行时间,复写构造器传入,任务执行时间通过复写beforeExecute和afterExecute方法统计到。
/**
 * 具备监控功能的线程池,在使用@link ThreadPoolExecutor的地方替换为该类即可
 */
public class MonitoredThreadPoolExecutor extends ThreadPoolExecutor implements ExecutorMonitorData
    private static final Logger logger = LoggerFactory.getLogger(MonitoredThreadPoolExecutor.class);
    /**
     * 任务执行时间
     */
    private final ConcurrentHashMap<String, Date> runnableStartTimes;
    private final String service;
    private final String poolName;
    private long totalExeTime = 0;
    private final Date startTime;

    public MonitoredThreadPoolExecutor(int corePoolSize,
                                       int maximumPoolSize,
                                       long keepAliveTime,
                                       TimeUnit unit,
                                       BlockingQueue<Runnable> workQueue,
                                       String service,
                                       String poolName) 
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
        this.service = service;
        this.poolName = poolName;
        this.runnableStartTimes = new ConcurrentHashMap<>(maximumPoolSize);
        this.startTime = new Date();
    

    public MonitoredThreadPoolExecutor(int corePoolSize,
                                       int maximumPoolSize,
                                       long keepAliveTime,
                                       TimeUnit unit,
                                       BlockingQueue<Runnable> workQueue,
                                       ThreadFactory threadFactory,
                                       String service,
                                       String poolName) 
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
        this.service = service;
        this.poolName = poolName;
        this.runnableStartTimes = new ConcurrentHashMap<>(maximumPoolSize);
        this.startTime = new Date();
    

    public MonitoredThreadPoolExecutor(int corePoolSize,
                                       int maximumPoolSize,
                                       long keepAliveTime,
                                       TimeUnit unit,
                                       BlockingQueue<Runnable> workQueue,
                                       RejectedExecutionHandler handler,
                                       String service,
                                       String poolName) 
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
        this.service = service;
        this.poolName = poolName;
        this.runnableStartTimes = new ConcurrentHashMap<>(maximumPoolSize);
        this.startTime = new Date();
    

    public MonitoredThreadPoolExecutor(int corePoolSize,
                                       int maximumPoolSize,
                                       long keepAliveTime,
                                       TimeUnit unit,
                                       BlockingQueue<Runnable> workQueue,
                                       ThreadFactory threadFactory,
                                       RejectedExecutionHandler handler,
                                       String service,
                                       String poolName) 
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
        this.service = service;
        this.poolName = poolName;
        this.runnableStartTimes = new ConcurrentHashMap<>(maximumPoolSize);
        this.startTime = new Date();
    

    @Override
    public void shutdown() 
        logger.info(" Going to shutdown. Executed tasks: , Running tasks: , Pending tasks: ", this.poolName, this.getCompletedTaskCount(), this.getActiveCount(), this.getQueue().size());
        super.shutdown();
    
    @Override
    public List<Runnable> shutdownNow() 
        logger.info(" Going to immediately shutdown. Executed tasks: , Running tasks: , Pending tasks: ", this.poolName, this.getCompletedTaskCount(), this.getActiveCount(), this.getQueue().size());
        return super.shutdownNow();
    

    @Override
    protected void beforeExecute(Thread t, Runnable r) 
        this.runnableStartTimes.put(String.valueOf(r.hashCode()), new Date());
    
    @Override
    protected void afterExecute(Runnable r, Throwable t) 
        //这里一定要remove,否则可能导致内存泄露
        Date startDate = this.runnableStartTimes.remove(String.valueOf(r.hashCode()));
        long diff = System.currentTimeMillis() - startDate.getTime();
        this.totalExeTime += diff;
    

    @Override
    public long getTotalExeTime() 
        return this.totalExeTime;
    
    @Override
    public Date getStartTime()
        return this.startTime;
    
    @Override
    public String getPoolName() 
        return this.poolName;
    

    @Override
    public String getService() 
        return this.service;
    

  1. 针对某些情况我们无法直接定义线程池,即第二种方式无法实现的时候,我们可以代理线程池。定义ThreadPoolExecutorMonitoredDelegator,构造器传入ThreadPoolExecutor进行代理,增加没有的属性,包括所属服务、名称、启动时间,这种情况下无法获取到任务执行时间。
/**
 * 代理线程池@link ThreadPoolExecutor,附加监控相关信息
 */
public class ThreadPoolExecutorMonitoredDelegator implements ExecutorMonitorData
    private final ThreadPoolExecutor delegate;
    private final String service;
    private final String poolName;
    private final Date startTime;

    public ThreadPoolExecutorMonitoredDelegator(ThreadPoolExecutor threadPoolExecutor, String service, String poolName) 
        this.delegate = threadPoolExecutor;
        this.service = service;
        this.poolName = poolName;
        this.startTime = new Date();
    

    @Override
    public Date getStartTime()
        return this.startTime;
    
    @Override
    public String getPoolName() 
        return this.poolName;
    

    @Override
    public String getService() 
        return this.service;
    

    @Override
    public long getTotalExeTime() 
        //无法监控到执行时间,返回特定标识
        return -1;
    

    @Override
    public int getActiveCount() 
        return this.delegate.getActiveCount();
    

    @Override
    public int getMaximumPoolSize() 
        return this.delegate.getMaximumPoolSize();
    

    @Override
    public BlockingQueue<Runnable> getQueue() 
        return this.delegate.getQueue();
    

    @Override
    public long getCompletedTaskCount() 
        return this.delegate.getCompletedTaskCount();
    

    @Override
    public int getPoolSize() 
        return this.delegate.getPoolSize();
    

    @Override
    public int getCorePoolSize() 
        return this.delegate.getCorePoolSize();
    

    @Override
    public int getLargestPoolSize() 
        return this.delegate.getLargestPoolSize();
    

    @Override
    public long getKeepAliveTime(TimeUnit timeUnit) 
        return this.delegate.getKeepAliveTime(timeUnit);
    

    @Override
    public long getTaskCount() 
        return this.delegate.getTaskCount();
    

    @Override
    public ThreadFactory getThreadFactory() 
        return this.delegate.getThreadFactory();
    

    @Override
    public RejectedExecutionHandler getRejectedExecutionHandler() 
        return this.delegate.getRejectedExecutionHandler();
    


  1. 定义用于上报监控信息的接口ExecutorReportService,定义两个方法,分别在线程池启动的时候调用initReport(主要用于收集线程池的参数配置,属于静态参数)和系统定时调用report(主要用于收集线程池的动态运行参数,属于动态参数)。
/**
 * 初始化线程池的时候调用,上报线程初始化相关信息
 */
public interface ExecutorReportService 
    /**
     * 线程池初始化的时候调用上报初始化信息
     * @param executorMonitorData 被监控线程池初始化信息
     */
    void initReport(ExecutorMonitorData executorMonitorData);
    /**
     * 普通上报信息,定时或者手动触发
     * @param executorMonitorData 被监控线程池监控信息
     */
    void report(ExecutorMonitorData executorMonitorData);

  1. 因为系统中可能有很多的上报具体实现,我们定义一个聚合的CompositeExecutorReportService,用于管理ExecutorReportService,它本身也可以看作为一个ExecutorReportService。
/**
 * 组合service,系统中可能需要多种处理
 */
public class CompositeExecutorReportService implements ExecutorReportService
	private final List<ExecutorReportService> reportServices = new ArrayList<>();

	public CompositeExecutorReportService(List<ExecutorReportService> reportServices) 
		this.reportServices.addAll(reportServices);
	
	public CompositeExecutorReportService(ExecutorReportService... reportServices) 
		this.reportServices.addAll(Arrays.asList(reportServices));
	

	public CompositeExecutorReportService addServices(ExecutorReportService... reportServices)
		this.reportServices.addAll(Arrays.asList(reportServices));
		return this;
	

	public List<ExecutorReportService> getReportServices() 
		return this.reportServices;
	

	@Override
	public void initReport(ExecutorMonitorData executorMonitorData) 
		for (ExecutorReportService reportService : this.reportServices) 
			reportService.initReport(executorMonitorData);
		
	

	@Override
	public void report(ExecutorMonitorData executorMonitorData) 
		for (ExecutorReportService reportService : this.reportServices) 
			reportService.report(executorMonitorData);
		
	


  1. 定义系统的监控服务ExecutorMonitorService,构造器传入ExecutorReportService,然后启动一个定时任务每十分钟收集一次系统的线程池的统计信息。线程池通过该类的register方法进行注册。
/**
 * 线程池监控定时任务,定时上报线程池相关数据
 */
public class ExecutorMonitorService implements Closeable 
    private static final Logger logger = LoggerFactory.getLogger(ExecutorMonitorService.class);
    public static final long DEFAULT_SCHEDULER_PERIOD = 10;
    private final List<ExecutorMonitorData> executorMonitorDataList = new ArrayList<>();
    private final ScheduledExecutorService scheduledExecutorService;

    public ExecutorMonitorService(long initDelay, long schedulerPeriod, ExecutorReportService... reportServices) 
        //首先注册reportService,否则register线程池的时候还没有reportService,就会丢失初始化数据
        addReportService(reportServices);
        this.scheduledExecutorService = initScheduledExecutorService(initDelay, schedulerPeriod);
    

    public ExecutorMonitorService(ExecutorReportService... reportServices) 
        //首先注册reportService,否则register线程池的时候还没有reportService,就会丢失初始化数据
        addReportService(reportServices);
        this.scheduledExecutorService = initScheduledExecutorService(DEFAULT_SCHEDULER_PERIOD, DEFAULT_SCHEDULER_PERIOD);
    

    private ScheduledExecutorService initScheduledExecutorService(long initDelay, long schedulerPeriod)
        ThreadFactory threadFactory = ThreadFactoryBuilder.create()
                .setNameFormat("MonitorService-Thread-%d")
                .setDaemon(true)
                //.setThreadFactory(Thread::new)
                .build();
        ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(1, threadFactory);

        scheduledThreadPoolExecutor.scheduleAtFixedRate(this::reportMonitorData, initDelay, schedulerPeriod, TimeUnit.MINUTES);

        return scheduledThreadPoolExecutor;
    
    /**
     * 注册监控线程池
     */
    public void register(ExecutorMonitorData... executorMonitorDataList)
        for (ExecutorMonitorData executorMonitorData : executorMonitorDataList) 
            //注册
            this.executorMonitorDataList.add(executorMonitorData);
            //上报初始化信息
            this.compositeExecutorReportService.initReport(executorMonitorData)Java线程池监控小结

java 如何获得线程池中正在执行的线程数?

遇见这么刁钻的面试题怎么办???Java怎么利用线程工厂监控线程池

java线程池的自带监控,线程池返回值的控制

如何使用Spring开发和监控线程池服务

Java线程池必备知识点:工作流程常见参数调优监控