Java线程池监控小结

Posted dqVoice

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Java线程池监控小结相关的知识,希望对你有一定的参考价值。

最近我们组杨青同学遇到一个使用线程池不当的问题:异步处理的线程池线程将主线程hang住了,分析代码发现是线程池的拒绝策略设置得不合理,设置为CallerRunsPolicy。当异步线程的执行效率降低时,阻塞队列满了,触发了拒绝策略,进而导致主线程hang死。

从这个问题中,我们学到了两点:

  • 线程池的使用,需要充分分析业务场景后作出选择,必要的情况下需要自定义线程池;

  • 线程池的运行状况,也需要监控

关于线程池的监控,我参考了《Java编程的艺术》中提供的思路实现的,分享下我的代码片段,如下:

 
   
   
 
  1. public class AsyncThreadExecutor implements AutoCloseable {

  2.    private static final int DEFAULT_QUEUE_SIZE = 1000;

  3.    private static final int DEFAULT_POOL_SIZE = 10;

  4.    @Setter

  5.    private int queueSize = DEFAULT_QUEUE_SIZE;

  6.    @Setter

  7.    private int poolSize = DEFAULT_POOL_SIZE;

  8.    /**

  9.     * 用于周期性监控线程池的运行状态

  10.     */

  11.    private final ScheduledExecutorService scheduledExecutorService =

  12.        Executors.newSingleThreadScheduledExecutor(new BasicThreadFactory.Builder().namingPattern("async thread executor monitor").build());

  13.    /**

  14.     * 自定义异步线程池

  15.     * (1)任务队列使用有界队列

  16.     * (2)自定义拒绝策略

  17.     */

  18.    private final ThreadPoolExecutor threadPoolExecutor =

  19.        new ThreadPoolExecutor(poolSize, poolSize, 0, TimeUnit.MILLISECONDS, new ArrayBlockingQueue(queueSize),

  20.                               new BasicThreadFactory.Builder().namingPattern("async-thread-%d").build(),

  21.                               (r, executor) -> log.error("the async executor pool is full!!"));

  22.    private final ExecutorService executorService = threadPoolExecutor;

  23.    @PostConstruct

  24.    public void init() {

  25.        scheduledExecutorService.scheduleAtFixedRate(() -> {

  26.            /**

  27.             * 线程池需要执行的任务数

  28.             */

  29.            long taskCount = threadPoolExecutor.getTaskCount();

  30.            /**

  31.             * 线程池在运行过程中已完成的任务数

  32.             */

  33.            long completedTaskCount = threadPoolExecutor.getCompletedTaskCount();

  34.            /**

  35.             * 曾经创建过的最大线程数

  36.             */

  37.            long largestPoolSize = threadPoolExecutor.getLargestPoolSize();

  38.            /**

  39.             * 线程池里的线程数量

  40.             */

  41.            long poolSize = threadPoolExecutor.getPoolSize();

  42.            /**

  43.             * 线程池里活跃的线程数量

  44.             */

  45.            long activeCount = threadPoolExecutor.getActiveCount();

  46.            log.info("async-executor monitor. taskCount:{}, completedTaskCount:{}, largestPoolSize:{}, poolSize:{}, activeCount:{}",

  47.                     taskCount, completedTaskCount, completedTaskCount, largestPoolSize, poolSize, activeCount);

  48.        }, 0, 10, TimeUnit.MINUTES);

  49.    }

  50.    public void execute(Runnable task) {

  51.        executorService.execute(task);

  52.    }

  53.    @Override

  54.    public void close() throws Exception {

  55.        executorService.shutdown();

  56.    }

  57. }

这里的主要思路是:(1)使用有界队列的固定数量线程池;(2)拒绝策略是将任务丢弃,但是需要记录错误日志;(3)使用一个调度线程池对业务线程池进行监控。

在查看监控日志的时候,看到下图所示的监控日志:

这里我对largestPooSize的含义比较困惑,按字面理解是“最大的线程池数量”,但是按照线程池的定义,maximumPoolSize和coreSize相同的时候(在这里,都是10),一个线程池里的最大线程数是10,那么为什么largestPooSize可以是39呢?我去翻这块的源码:

 
   
   
 
  1.    /**

  2.     * Returns the largest number of threads that have ever

  3.     * simultaneously been in the pool.

  4.     *

  5.     * @return the number of threads

  6.     */

  7.    public int getLargestPoolSize() {

  8.        final ReentrantLock mainLock = this.mainLock;

  9.        mainLock.lock();

  10.        try {

  11.            return largestPoolSize;

  12.        } finally {

  13.            mainLock.unlock();

  14.        }

  15.    }

注释的翻译是:返回在这个线程池里曾经同时存在过的线程数。再看这个变量largestPoolSize在ThreadExecutor中的赋值的地方,代码如下:

 
   
   
 
  1. private boolean addWorker(Runnable firstTask, boolean core) {

  2.        retry:

  3.        for (;;) {

  4.            int c = ctl.get();

  5.            int rs = runStateOf(c);

  6.            // Check if queue empty only if necessary.

  7.            if (rs >= SHUTDOWN &&

  8.                ! (rs == SHUTDOWN &&

  9.                   firstTask == null &&

  10.                   ! workQueue.isEmpty()))

  11.                return false;

  12.            for (;;) {

  13.                int wc = workerCountOf(c);

  14.                if (wc >= CAPACITY ||

  15.                    wc >= (core ? corePoolSize : maximumPoolSize))

  16.                    return false;

  17.                if (compareAndIncrementWorkerCount(c))

  18.                    break retry;

  19.                c = ctl.get();  // Re-read ctl

  20.                if (runStateOf(c) != rs)

  21.                    continue retry;

  22.                // else CAS failed due to workerCount change; retry inner loop

  23.            }

  24.        }

  25.        boolean workerStarted = false;

  26.        boolean workerAdded = false;

  27.        Worker w = null;

  28.        try {

  29.            w = new Worker(firstTask);

  30.            final Thread t = w.thread;

  31.            if (t != null) {

  32.                final ReentrantLock mainLock = this.mainLock;

  33.                mainLock.lock();

  34.                try {

  35.                    // Recheck while holding lock.

  36.                    // Back out on ThreadFactory failure or if

  37.                    // shut down before lock acquired.

  38.                    int rs = runStateOf(ctl.get());

  39.                    if (rs < SHUTDOWN ||

  40.                        (rs == SHUTDOWN && firstTask == null)) {

  41.                        if (t.isAlive()) // precheck that t is startable

  42.                            throw new IllegalThreadStateException();

  43.                        workers.add(w);

  44.                        int s = workers.size();

  45.                        if (s > largestPoolSize)

  46.                            largestPoolSize = s;//这里这里!

  47.                        workerAdded = true;

  48.                    }

  49.                } finally {

  50.                    mainLock.unlock();

  51.                }

  52.                if (workerAdded) {

  53.                    t.start();

  54.                    workerStarted = true;

  55.                }

  56.            }

  57.        } finally {

  58.            if (! workerStarted)

  59.                addWorkerFailed(w);

  60.        }

  61.        return workerStarted;

  62.    }

发现largestPoolSize是worker集合的大小,但是注意,并不是worker集合中的所有worker都处于工作状态。因此这里结论就出来了:线程池的容量,值得是同时活跃(运行)的线程池个数;largestPoolSize的大小是线程池曾创建的线程个数,跟线程池的容量无关。

PS:杨青同学是这篇文章的灵感来源,他做了很多压测。给了我很多思路,并跟我一起分析了一些代码。


以上是关于Java线程池监控小结的主要内容,如果未能解决你的问题,请参考以下文章

JAVA 线程与线程池简单小结

java线程池监控

Java Executors小结

Java并发线程池监控

Java——线程池

Java线程池详解