线程池监控设计
Posted 恒奇恒毅
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了线程池监控设计相关的知识,希望对你有一定的参考价值。
为了提高系统吞吐量,系统使用线程池越来越多。由于缺乏对线程池的统一监控手段,运营人员不知道线程池的具体运行情况,不利于线程池的性能调优。
开发线程池监控系统,对系统中的线程池进行统一监控。
- 定义线程池统计数据接口ExecutorMonitorData,用于获取线程池的运行时数据,包括所属服务、线程池名称、线程池的唯一标识(线程池一般随系统启动而启动,随系统关闭而关闭)、核心线程数、当前线程数、历史最大线程、最大线程数、当前活跃线程数、最大空闲时间、当前任务数、已完成数量、任务执行总时间、阻塞队列、线程工厂、拒绝策略、启动时间。
/**
* 定义监控数据接口
*/
public interface ExecutorMonitorData
String IDENTIFIER_SEPARATOR = "@@";
/**
* 线程池启动时间
*/
Date getStartTime();
/**
* 线程池所在服务
*/
String getService();
/**
* 线程池名字,每个线程池有一个唯一的名字,比如HttpRequest、DbExecutor
*/
String getPoolName();
/**
* 线程池的唯一标识,因为线程池可能因为重启等因素被关闭,但是要唯一区别一个线程池,所以不能使用@link this#getPoolName(),
* 建议的格式为@link this#getService()@@@link this#getPoolName()@@@link this#getStartTime().getTime()
*/
default String getIdentifier()
return getService() + IDENTIFIER_SEPARATOR + getPoolName() + IDENTIFIER_SEPARATOR + getStartTime().getTime();
/**
* 当前核心线程数
*/
int getPoolSize();
/**
* 配置的核心线程数
*/
int getCorePoolSize();
/**
* 历史到达最大线程数
*/
int getLargestPoolSize();
/**
* 配置的最大线程数
*/
int getMaximumPoolSize();
/**
* 当前活跃线程数
*/
int getActiveCount();
/**
* 线程最大空闲时间
*/
long getKeepAliveTime(TimeUnit timeUnit);
/**
* 当前任务数
*/
long getTaskCount();
/**
* 已完成任务数
*/
long getCompletedTaskCount();
BlockingQueue<Runnable> getQueue();
ThreadFactory getThreadFactory();
RejectedExecutionHandler getRejectedExecutionHandler();
/**
* 任务执行总时间
*/
long getTotalExeTime();
- 定义一种统计数据接口的实现MonitoredThreadPoolExecutor,继承自ThreadPoolExecutor,增加没有的属性,包括所属服务、名称、启动时间、任务执行时间,复写构造器传入,任务执行时间通过复写beforeExecute和afterExecute方法统计到。
/**
* 具备监控功能的线程池,在使用@link ThreadPoolExecutor的地方替换为该类即可
*/
public class MonitoredThreadPoolExecutor extends ThreadPoolExecutor implements ExecutorMonitorData
private static final Logger logger = LoggerFactory.getLogger(MonitoredThreadPoolExecutor.class);
/**
* 任务执行时间
*/
private final ConcurrentHashMap<String, Date> runnableStartTimes;
private final String service;
private final String poolName;
private long totalExeTime = 0;
private final Date startTime;
public MonitoredThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
String service,
String poolName)
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
this.service = service;
this.poolName = poolName;
this.runnableStartTimes = new ConcurrentHashMap<>(maximumPoolSize);
this.startTime = new Date();
public MonitoredThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
String service,
String poolName)
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
this.service = service;
this.poolName = poolName;
this.runnableStartTimes = new ConcurrentHashMap<>(maximumPoolSize);
this.startTime = new Date();
public MonitoredThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler,
String service,
String poolName)
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
this.service = service;
this.poolName = poolName;
this.runnableStartTimes = new ConcurrentHashMap<>(maximumPoolSize);
this.startTime = new Date();
public MonitoredThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler,
String service,
String poolName)
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
this.service = service;
this.poolName = poolName;
this.runnableStartTimes = new ConcurrentHashMap<>(maximumPoolSize);
this.startTime = new Date();
@Override
public void shutdown()
logger.info(" Going to shutdown. Executed tasks: , Running tasks: , Pending tasks: ", this.poolName, this.getCompletedTaskCount(), this.getActiveCount(), this.getQueue().size());
super.shutdown();
@Override
public List<Runnable> shutdownNow()
logger.info(" Going to immediately shutdown. Executed tasks: , Running tasks: , Pending tasks: ", this.poolName, this.getCompletedTaskCount(), this.getActiveCount(), this.getQueue().size());
return super.shutdownNow();
@Override
protected void beforeExecute(Thread t, Runnable r)
this.runnableStartTimes.put(String.valueOf(r.hashCode()), new Date());
@Override
protected void afterExecute(Runnable r, Throwable t)
//这里一定要remove,否则可能导致内存泄露
Date startDate = this.runnableStartTimes.remove(String.valueOf(r.hashCode()));
long diff = System.currentTimeMillis() - startDate.getTime();
this.totalExeTime += diff;
@Override
public long getTotalExeTime()
return this.totalExeTime;
@Override
public Date getStartTime()
return this.startTime;
@Override
public String getPoolName()
return this.poolName;
@Override
public String getService()
return this.service;
- 针对某些情况我们无法直接定义线程池,即第二种方式无法实现的时候,我们可以代理线程池。定义ThreadPoolExecutorMonitoredDelegator,构造器传入ThreadPoolExecutor进行代理,增加没有的属性,包括所属服务、名称、启动时间,这种情况下无法获取到任务执行时间。
/**
* 代理线程池@link ThreadPoolExecutor,附加监控相关信息
*/
public class ThreadPoolExecutorMonitoredDelegator implements ExecutorMonitorData
private final ThreadPoolExecutor delegate;
private final String service;
private final String poolName;
private final Date startTime;
public ThreadPoolExecutorMonitoredDelegator(ThreadPoolExecutor threadPoolExecutor, String service, String poolName)
this.delegate = threadPoolExecutor;
this.service = service;
this.poolName = poolName;
this.startTime = new Date();
@Override
public Date getStartTime()
return this.startTime;
@Override
public String getPoolName()
return this.poolName;
@Override
public String getService()
return this.service;
@Override
public long getTotalExeTime()
//无法监控到执行时间,返回特定标识
return -1;
@Override
public int getActiveCount()
return this.delegate.getActiveCount();
@Override
public int getMaximumPoolSize()
return this.delegate.getMaximumPoolSize();
@Override
public BlockingQueue<Runnable> getQueue()
return this.delegate.getQueue();
@Override
public long getCompletedTaskCount()
return this.delegate.getCompletedTaskCount();
@Override
public int getPoolSize()
return this.delegate.getPoolSize();
@Override
public int getCorePoolSize()
return this.delegate.getCorePoolSize();
@Override
public int getLargestPoolSize()
return this.delegate.getLargestPoolSize();
@Override
public long getKeepAliveTime(TimeUnit timeUnit)
return this.delegate.getKeepAliveTime(timeUnit);
@Override
public long getTaskCount()
return this.delegate.getTaskCount();
@Override
public ThreadFactory getThreadFactory()
return this.delegate.getThreadFactory();
@Override
public RejectedExecutionHandler getRejectedExecutionHandler()
return this.delegate.getRejectedExecutionHandler();
- 定义用于上报监控信息的接口ExecutorReportService,定义两个方法,分别在线程池启动的时候调用initReport(主要用于收集线程池的参数配置,属于静态参数)和系统定时调用report(主要用于收集线程池的动态运行参数,属于动态参数)。
/**
* 初始化线程池的时候调用,上报线程初始化相关信息
*/
public interface ExecutorReportService
/**
* 线程池初始化的时候调用上报初始化信息
* @param executorMonitorData 被监控线程池初始化信息
*/
void initReport(ExecutorMonitorData executorMonitorData);
/**
* 普通上报信息,定时或者手动触发
* @param executorMonitorData 被监控线程池监控信息
*/
void report(ExecutorMonitorData executorMonitorData);
- 因为系统中可能有很多的上报具体实现,我们定义一个聚合的CompositeExecutorReportService,用于管理ExecutorReportService,它本身也可以看作为一个ExecutorReportService。
/**
* 组合service,系统中可能需要多种处理
*/
public class CompositeExecutorReportService implements ExecutorReportService
private final List<ExecutorReportService> reportServices = new ArrayList<>();
public CompositeExecutorReportService(List<ExecutorReportService> reportServices)
this.reportServices.addAll(reportServices);
public CompositeExecutorReportService(ExecutorReportService... reportServices)
this.reportServices.addAll(Arrays.asList(reportServices));
public CompositeExecutorReportService addServices(ExecutorReportService... reportServices)
this.reportServices.addAll(Arrays.asList(reportServices));
return this;
public List<ExecutorReportService> getReportServices()
return this.reportServices;
@Override
public void initReport(ExecutorMonitorData executorMonitorData)
for (ExecutorReportService reportService : this.reportServices)
reportService.initReport(executorMonitorData);
@Override
public void report(ExecutorMonitorData executorMonitorData)
for (ExecutorReportService reportService : this.reportServices)
reportService.report(executorMonitorData);
- 定义系统的监控服务ExecutorMonitorService,构造器传入ExecutorReportService,然后启动一个定时任务每十分钟收集一次系统的线程池的统计信息。线程池通过该类的register方法进行注册。
/**
* 线程池监控定时任务,定时上报线程池相关数据
*/
public class ExecutorMonitorService implements Closeable
private static final Logger logger = LoggerFactory.getLogger(ExecutorMonitorService.class);
public static final long DEFAULT_SCHEDULER_PERIOD = 10;
private final List<ExecutorMonitorData> executorMonitorDataList = new ArrayList<>();
private final ScheduledExecutorService scheduledExecutorService;
public ExecutorMonitorService(long initDelay, long schedulerPeriod, ExecutorReportService... reportServices)
//首先注册reportService,否则register线程池的时候还没有reportService,就会丢失初始化数据
addReportService(reportServices);
this.scheduledExecutorService = initScheduledExecutorService(initDelay, schedulerPeriod);
public ExecutorMonitorService(ExecutorReportService... reportServices)
//首先注册reportService,否则register线程池的时候还没有reportService,就会丢失初始化数据
addReportService(reportServices);
this.scheduledExecutorService = initScheduledExecutorService(DEFAULT_SCHEDULER_PERIOD, DEFAULT_SCHEDULER_PERIOD);
private ScheduledExecutorService initScheduledExecutorService(long initDelay, long schedulerPeriod)
ThreadFactory threadFactory = ThreadFactoryBuilder.create()
.setNameFormat("MonitorService-Thread-%d")
.setDaemon(true)
//.setThreadFactory(Thread::new)
.build();
ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(1, threadFactory);
scheduledThreadPoolExecutor.scheduleAtFixedRate(this::reportMonitorData, initDelay, schedulerPeriod, TimeUnit.MINUTES);
return scheduledThreadPoolExecutor;
/**
* 注册监控线程池
*/
public void register(ExecutorMonitorData... executorMonitorDataList)
for (ExecutorMonitorData executorMonitorData : executorMonitorDataList)
//注册
this.executorMonitorDataList.add(executorMonitorData);
//上报初始化信息
this.compositeExecutorReportService.initReport(executorMonitorData)Java线程池监控小结