Java线程池线程复用的秘密

Posted 郭霖

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Java线程池线程复用的秘密相关的知识,希望对你有一定的参考价值。



今日科技快讯


近日,第二届世界智能大会于天津梅江会展中心举行。本届世界智能大会以“智能时代:新进展、新趋势、新举措”为主题,在探讨人工智能产、学、研、用的同时,区块链成为大会热词,阿里巴巴董事局主席马云、浪潮集团董事长兼CEO孙丕恕、中国工程院院士邬贺铨都不约而同的看好区块链技术。


作者简介


周一早上好!经过周末的调整,新的一周继续努力!

本篇来自 Neo 的投稿,分享了Java线程池的相关知识,一起来看看!希望大家喜欢。


前言


去年面试的时候,被问到过线程池如何实现复用以达到节约线程资源的目的。当时回答比较简单,当时并不是很清楚线程池如何做到复用一个线程。今天我们就以   Executors.newCachedThreadPool() 方法创建的线程池为例,探究线程复用的秘密。Here we go!


线程池


我们先来看看,创建一个线程池需要哪些参数。

  • corePoolSize 核心线程数大小。当提交一个任务时,如果当前线程数小于corePoolSize,就会创建一个线程。即使其他有可用的空闲线程。

  • runnableTaskQueue(任务队列):用于保存等待执行的任务的阻塞队列。 可以选择以下几个阻塞队列:

    • ArrayBlockingQueue:是一个基于数组结构的有界阻塞队列,此队列按 FIFO(先进先出)原则对元素进行排序。

    • LinkedBlockingQueue:一个基于链表结构的阻塞队列,此队列按FIFO (先进先出) 排序元素,吞吐量通常要高于ArrayBlockingQueue。静态工厂方法Executors.newFixedThreadPool()使用了这个队列。

    • SynchronousQueue:一个不存储元素的阻塞队列。每个插入操作必须等上一个元素被移除之后,否则插入操作一直处于阻塞状态,吞吐量通常要高于LinkedBlockingQueue,静态工厂方法Executors.newCachedThreadPool使用了这个队列。

    • PriorityBlockingQueue:一个具有优先级的无限阻塞队列。

  • 不同的runnableTaskQueue对线程池运行逻辑有很大影响

  • maximumPoolSize(线程池最大大小):线程池允许创建的最大线程数。如果队列满了,并且已创建的线程数小于最大线程数,则线程池会再创建新的线程执行任务。值得注意的是如果使用了无界的任务队列这个参数就没什么效果。

  • keepAliveTime 线程执行结束后,保持存活的时间。

  • ThreadFactory:用于设置创建线程的工厂,可以通过线程工厂给每个创建出来的线程设置更有意义的名字。

  • RejectedExecutionHandler 线程池队列饱和之后的执行策略,默认是采用AbortPolicy。JDK提供四种实现方式:

    • AbortPolicy:直接抛出异常

    • CallerRunsPolicy :只用调用者所在线程来运行任务

    • DiscardOldestPolicy 丢弃队列里最近的一个任务,并执行当前任务

    • DiscardPolicy : 不处理,丢弃掉

  • TimeUnit: keepalive的时间单位,可选的单位有天(DAYS),小时(HOURS),分钟(MINUTES),毫秒(MILLISECONDS),微秒(MICROSECONDS, 千分之一毫秒)和毫微秒(NANOSECONDS, 千分之一微秒)。

我们来看看 Executors.newCachedThreadPool() 里面的构造:

public static ExecutorService newCachedThreadPool() {
       return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                     60L, TimeUnit.SECONDS,
                                     new SynchronousQueue<Runnable>());
   }
  • corePoolSize 为 0,意味着核心线程数是 0。

  • maximumPoolSize 是 Integer.MAX_VALUE ,意味这可以一直往线程池提交任务,不会执行 reject 策略。

  • keepAliveTime 和 unit 决定了线程的存活时间是 60s,意味着一个线程空闲60s后才会被回收。

  • reject 策略是默认的 AbortPolicy,当线程池超出最大限制时抛出异常。不过这里 CacheThreadPool 的没有最大线程数限制,所以 reject 策略没用。

  • runnableTaskQueue 是 SynchronousQueue。该队列的特点是一个不存储元素的阻塞队列。每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态。使用该队列是实现 CacheThreadPool 的关键之一。

SynchronousQueue 的详细原理参考这里:

https://blog.csdn.net/yanyan19880509/article/details/52562039

我们看看 CacheThreadPool 的注释介绍,大意是说当有任务提交进来,会优先使用线程池里可用的空闲线程来执行任务,但是如果没有可用的线程会直接创建线程。空闲的线程会保留 60s,之后才会被回收。这些特性决定了,当需要执行很多短时间的任务时,CacheThreadPool 的线程复用率比较高, 会显著的提高性能。而且线程60s后会回收,意味着即使没有任务进来,CacheThreadPool 并不会占用很多资源。

注释简单明了说明了 CacheThreadPool 的特性和适用场景,我们后面在阅读代码的过程中,会对注释的说明有进一步的理解。

终于到了要进入源码的时候,天天看郭神博客让我学到一个技巧,必须带着问题去看阅读,不管是看书还是看代码,这样才能事半功倍。那么问题来了:

  1. CacheThreadPool 如何实现线程保留60s。

  2. CacheThreadPool 如何实现线程复用。

带着这两个问题,去源码里寻找答案吧~

首先我们向线程池提交任务一般用 execute() 方法,我们就从这里入手:

public void execute(Runnable command) {
 if (command == null)
           throw new NullPointerException();
       //1.如果当前存在的线程少于corePoolSize,会新建线程来执行任务。然后各种检查状态
       int c = ctl.get();
       if (workerCountOf(c) < corePoolSize) {
           if (addWorker(command, true))
               return;
           c = ctl.get();
       }
       //2.如果task被成功加入队列,还是要double-check
       if (isRunning(c) && workQueue.offer(command)) {
           int recheck = ctl.get();
           if (! isRunning(recheck) && remove(command))
               reject(command);
           else if (workerCountOf(recheck) == 0)
               addWorker(null, false);
       }
       //3.如果task不能加入到队列,会尝试创建线程。如果创建失败,走reject流程
       else if (!addWorker(command, false))
           reject(command);

第一步比较简单,如果当前运行的线程少于核心线程,调用 addWorker(),创建一个线程。但是因为 CacheThreadPool 的 corePoolSize 是0,所以会跳过这步,并不会创建核心线程。

关键在第二步,首先判断了线程池是否运行状态,紧接着调用 workQueue.offer() 往对列添加 task 。 workQueue 是一个 BlockingQueue ,我们知道 BlockingQueue.offer() 方法是向队列插入元素,如果成功返回 true ,如果队列没有可用空间返回 false 。 CacheThreadPool 用的是 SynchronousQueue ,前面了解过 SynchronousQueue 的特性,添加到 SynchronousQueue 的元素必须被其他线程取出,才能塞入下一个元素。等会我们再来看看哪里是从 SynchronousQueue 取出元素。这里当任务入队列成功后,再次检查了线程池状态,还是运行状态就继续。然后检查当前运行线程数量,如果当前没有运行中的线程,调用 addWorker() ,第一个参数为 null 第二个参数是 false ,标明了非核心线程。

为什么这里 addWorker() 第一个方法要用null?带着这个疑问,我们来看看 addWorker() 方法:

private boolean addWorker(Runnable firstTask, boolean core) {
    //...这里有一段cas代码,通过双重循环目的是通过cas增加线程池线程个数
    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        w = new Worker(firstTask);
        final Thread t = w.thread;
     //...省略部分代码
     workers.add(w);
     //...省略部分代码
     workerAdded=true;
     if (workerAdded) {
       t.start();
       workerStarted = true;
       }
}

源代码比较长,这里省略了一部分。过程主要分成两步,第一步是一段 cas 代码通过双重循环检查状态并为当前线程数扩容 +1,第二部是将任务包装成 worker 对象,用线程安全的方式添加到当前工作 HashSet() 里,并开始执行线程。

终于读到线程开始执行的地方了,里程碑式的胜利啊同志们!

但是我们注意到,task 为 null ,Worker 里面的 firstTask 是 null ,那么 wokrer thread 里面是怎么工作下去的呢?继续跟踪代码,Worker 类继承 Runnable 接口,因此 worker thread start 后,走的是 worker.run()方法:

public void run() {
   runWorker(this);
}

继续进入 runWorker() 方法:

final void runWorker(Worker w) {
       Thread wt = Thread.currentThread();
       Runnable task = w.firstTask;
       w.firstTask = null;
       w.unlock(); // allow interrupts
       boolean completedAbruptly = true;
     //省略代码
           while (task != null || (task = getTask()) != null) {
               //..省略
               try {
                   beforeExecute(wt, task);
                   Throwable thrown = null;
                   try {
                       task.run();
                   } catch (Exception x) {
                       thrown = x; throw x;
                   }
               //省略代码
           }
       //省略代码
   }

可以看到这里判断了 firstTask 如果为空,就调用 getTask() 方法。getTask() 方法是从 workQueue 拉取任务。所以到这里之前的疑问就解决了,调用 addWorker(null,false) 的目的是启动一个线程,然后再 workQueue 拉取任务执行。继续跟踪 getTask() 方法:

private Runnable getTask() {
   boolean timedOut = false; // Did the last poll() time out?

   for (;;) {
       //..省略

       // Are workers subject to culling?
       boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
      //..省略

       try {
           Runnable r = timed ?
               workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
               workQueue.take();
           if (r != null)
               return r;
           timedOut = true;
       } catch (InterruptedException retry) {
           timedOut = false;
       }
   }
}

终于看到从 workQueue 拉取元素了。 CacheThreadPool 构造的时候 corePoolSize 是 0,allowCoreThreadTimeOut 默认是 false ,因此 timed 一直为 true ,会调用 workQueue.poll() 从队列拉取一个任务,等待 60s, 60s后超时,线程就会会被回收。如果 60s 内,进来一个任务,会发生什么情况?任务在 execute() 方法里,会被 offer() 进 workQueue ,因为目前队列是空的,所以 offer 进来后,马上会被阻塞的 worker.poll() 拉取出来,然后在 runWorker() 方法里执行,因为线程没有新建所以达到了线程的复用。至此,我们已经明白了线程复用的秘密,以及线程保留 60s 的实现方法。回到 execute() 方法,还有剩下一个逻辑

//3.如果task不能加入到队列,会尝试创建线程。如果创建失败,走reject流程
else if (!addWorker(command, false))
   reject(command);

因为 CacheThreadPool 用的 SynchronousQueue ,所以没有空闲线程, SynchronousQueue 有一个元素正在被阻塞,那么就不能加入到队列里。会走到 addWorker(commond,false) 这里,这个时候因为就会新建线程来执行任务。如果 addWorker() 返回 false 才会走 reject 策略。那么什么时候 addWorker() 什么时候会返回false呢?我们看代码:

private boolean addWorker(Runnable firstTask, boolean core) {
       retry:
       for (;;) {
           int c = ctl.get();
           int rs = runStateOf(c);
           1.线程池已经shutdown,或者提交进来task为ull且队列也是空,返回false
           if (rs >= SHUTDOWN &&
               ! (rs == SHUTDOWN &&
                  firstTask == null &&
                  ! workQueue.isEmpty()
))
               return false
;

           for (;;) {
               int wc = workerCountOf(c);
               2.如果需要创建核心线程但是当前线程已经大于corePoolSize 返回false,如果是非核心线程但是已经超出maximumPoolSize,返回false
               if (wc >= CAPACITY ||
                   wc >= (core ? corePoolSize : maximumPoolSize)
)
                   return false
;
               if (compareAndIncrementWorkerCount(c))
                   break retry;
               c = ctl.get();  // Re-read ctl
               if (runStateOf(c) != rs)
                   continue retry;
               //省略代码。。。
               if (rs < SHUTDOWN ||
                       (rs == SHUTDOWN && firstTask == null)) {
                       if (t.isAlive())
                       throw new IllegalThreadStateException();
                       //省略代码。。。
                   }
           }
       }
 //省略代码。。。
     }

addWorker() 有以下情况会返回 false :

  1. 线程池已经 shutdown,或者提交进来 task 为ull且同时任务队列也是空,返回 false。

  2. 如果需要创建核心线程但是当前线程已经大于 corePoolSize 返回 false,如果是非核心线程但是已经超出 maximumPoolSize ,返回 false。

  3. 创建线程后,检查是否已经启动。

我们逐条检查。第一点只有线程池被 shutDown() 才会出现。第二点由于 CacheThreadPool 的 corePoolSize 是 0 , maximumPoolSize  是 Intger.MAX_VALUE ,所以也不会出现。第三点是保护性错误,我猜因为线程允许通过外部的 ThreadFactory 创建,所以检查了一下是否外部已经 start,如果开发者编码规范,一般这种情况也不会出现。

综上,在线程池没有 shutDown 的情况下,addWorker() 不会返回 false ,不会走reject流程,所以理论上 CacheThreadPool 可以一直提交任务,符合CacheThreadPool注释里的描述。

引申

Executors 还提供了这么一个方法 Executors.newFixedThreadPool(4) 来创建一个有固定线程数量的线程池,我们看看创建的参数:

public static ExecutorService newFixedThreadPool(int nThreads) {
       return new ThreadPoolExecutor(nThreads, nThreads,
                                     0L, TimeUnit.MILLISECONDS,
                                     new LinkedBlockingQueue<Runnable>());
   }

参数中核心线程和最大线程一样,线程保留时间 0 ,使用 LinkedBlockingQueue 作为任务队列,这样的线程池有什么样的特性呢?我们看看注释说明,大意是说这是一个有着固定线程数量且使用无界队列作为线程队列的线程池。如果有新的任务提交,但是没有线程可用,这个任务会一直等待直到有可用的线程。如果一个线程因为异常终止了,当线程不够用的时候会再创建一个出来。线程会一直保持,直到线程池 shutDown。

和 CacheThreadPool 相比,FixedThreadPool 注释里描述的特性有几个不同的地方。

  1. 因为 corePoolSize == maximumPoolSize ,所以FixedThreadPool只会创建核心线程。

  2. 在 getTask() 方法,如果队列里没有任务可取,线程会一直阻塞在 LinkedBlockingQueue.take() ,线程不会被回收。

  3. 由于线程不会被回收,会一直卡在阻塞,所以没有任务的情况下, FixedThreadPool 占用资源更多。

FixedThreadPool 和 CacheThreadPool 也有相同点,都使用无界队列,意味着可用一直向线程池提交任务,不会触发 reject 策略。


总结


好了,又到了总结的时候,相信各位认真看完的应该对链表的基本操作非常熟悉了.

CacheThreadPool 的运行流程如下:

  1. 提交任务进线程池。

  2. 因为 corePoolSize 为0的关系,不创建核心线程。

  3. 尝试将任务添加到 SynchronousQueue 队列。

  4. 如果SynchronousQueue 入列成功,等待被当前运行的线程空闲后拉取执行。如果当前运行线程为0,调用addWorker( null , false )创建一个非核心线程出来,然后从 SynchronousQueue 拉取任务并在当前线程执行,实现线程的复用。

  5. 如果 SynchronousQueue 已有任务在等待,入列失败。因为 maximumPoolSize 无上限的原因,创建新的非核心线程来执行任务。

纵观整个流程,通过设置 ThreadPoolExecutor 的几个参数,并加上应用 SynchronousQueue 的特性,然后在 ThreadPoolExecutor 的运行框架下,构建出了一个可以线程复用的线程池。ThreadPoolExecutor 还有很强的扩展性,可以通过自定义参数来实现不同的线程池。这么牛X的代码,这辈子写是不可能写得出来了,争取能完全读懂吧。。

谢谢阅读,相信看完这篇文章的你,下次被问到线程池相关的问题,再也不会答不上来了吧~


欢迎长按下图 -> 识别图中二维码

以上是关于Java线程池线程复用的秘密的主要内容,如果未能解决你的问题,请参考以下文章

Java 线程池中的线程复用是如何实现的?

线程复用:线程池

含源码解析,深入Java 线程池原理

java 线程池机制的原理是啥?

Java——线程池

Java线程池中的线程复用是如何实现的