详解Java线程池运行机制(结合源码)

Posted 海涛技术漫谈

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了详解Java线程池运行机制(结合源码)相关的知识,希望对你有一定的参考价值。

欢迎关注同名博客:

http://blog.csdn.net/zhanht


由于项目原因,近期使用多线程较为频繁,因此对多线程产生了浓厚的兴趣,也一直想弄懂它的原理,但每次都是浅尝而止。


决定提笔写这文章源于前两天code review的时候,组里的一位同事无意间提了一个问题:

线程池中,如果一个线程中的任务执行过程中,抛了异常,会发生什么?线程池中的线程个数会因此变少吗?


思考这个问题的过程中,我意识到,是时候对线程池知识做一个总结了。

接着往下看之前,也希望各位可以花个几分钟自己在自己脑子里好好思考下这个问题,然后带着问题往下接着走。


接下来,本文将结合JDK1.8的ThreadPoolExecutor类源码,从如下几个方面对线程池进行详细的阐述:


  1. 线程池的重要性

  2. 线程池整体工作示意图

  3. 线程池的创建

  4. 任务的提交

  5. 总结


一: 线程池的重要性


我记得李智慧老师的《大型网站技术架构》一书中说到,提高性能和可用性有三驾马车,分别是:缓存,异步和集群。而线程池正是异步的常用手段,

他合适使得主逻辑和附加逻辑进行解耦,能提高响应速度,提高性能,还能在高并发情况下达到削峰的作用。


在如今多核处理器的时代,多线程开发发挥着巨大的作用,总的来说,我认为它至少有如下三个明细优势:


     (1) 降低cpu等资源消耗:通过对线程的重用,避免不断的创建和销毁线程占用大量的cpu等资源

     (2) 提高速度和性能:利用多线程进行异步,并行 处理任务

     (3) 简单方便:线程池统一进行线程的管理,调度


二: 线程池整体工作示意图



该示意图描述了线程池的整个运行过程:

(1) 通过submit(Callable<T> task ) 或者 execute(Runnable command)方法提交任务,submit方法其实最后也是调用的execute方法,只是调用前将任务封装成了FutureTask,它会将任务运行的结果或者任务抛出的异常封装进 Future中,返回给调用方,这样主线程就能拿到返回值或者异常信息了。


(2) 如果线程池中线程个数小于corePoolSize,则新建一个线程用来执行这个任务。线程池,其实说白了就是一个装有Worker的HashSet,Worker是线程池中的一个内部类,它有两个重要属性:线程和任务

线程池新建线程其实就是new 一个Worker然后加进Set中,这个Worker会新建一个线程,并在此线程中,进行任务的处理。


(3) 如果线程个数达到corePoolSize时,任务则会被放进队列进行排队


(4) 如果队列也放满了,则会继续新建Worker,直到个数达到maxPoolSize


(5) 如果线程个数已经达到maxPoolSize,并且队列也满了,则会走拒绝策略,常见策略包括:主线程执行(CallerRunsPolicy),抛异常(AbortPolicy)和丢弃(DiscardPplicy)三种


三: 线程池的创建


可以通过ThreadPoolExecutor的构造方法直接创建线程池,更常见的一般是使用Executors中的工厂方法进行创建。


如果项目使用的spring框架,建议使用ThreadPoolTaskExecutor类,然后corePoolSize, maxPoolSize,queueCapacity和拒绝策略通过配置进行合理的配置。


orePoolSize, maxPoolSize和queueCapacity大小怎么设置,可以参考本期的另外一篇文章。


四: 任务的提交


前面介绍了,submit方法最后也是调用的execute方法,只是将任务进行了适当的包装处理,因此这里详细介绍execute方法。根据章节二的描述可知,提交任务无非就是往Set添加新的Worker,要么是任务入队列排队,或者进行拒绝策略。下面从execute方法开始结合源码进行详细的介绍:


execute作为提交任务的方法,要么新建并添加线程(addWorker),要么入队列(workQueue.offer()),或者走拒绝策略。


  1. public void execute(Runnable command) {

  2.        if (command == null)

  3.            throw new NullPointerException();

  4.       /**

  5.         * ct1是个AtomicInteger类型,int是32位的

  6.         * 前3位用于表示线程池的状态,后29位

  7.         * 表示Worker(线程池中线程)的数量

  8.         * 因此最大的线程个数为2的29次方减一

  9.         */

  10.        int c = ctl.get();

  11.       /**

  12.         * 这块代码主要就是,如果当前线程个数

  13.         *  小于corePoolSize

  14.         * 就addWorker,新创建一个worker

  15.         * 并加入Set<Worker>

  16.         */

  17.        if (workerCountOf(c) < corePoolSize) {

  18.            if (addWorker(command, true))

  19.                return;

  20.            c = ctl.get();

  21.        }

  22.        /**

  23.         * 这块代码是当前线程个数大于等于

  24.         * corePoolSize或者上面addWorker失败

  25.         * 然后进行入队列的操作

  26.         * 入队列成功后,如果线程池状态突变了,

  27.         * 则会出队并拒绝这个任务

  28.         * 接下来会再次检查线程池的线程个数

  29.         * 如果为0,那么就没线程执行队列等待的任务

  30.         * 会addWorker,新建一个

  31.         */

  32.        if (isRunning(c) && workQueue.offer(command)) {

  33.            int recheck = ctl.get();

  34.            if (! isRunning(recheck) && remove(command))

  35.                reject(command);

  36.            else if (workerCountOf(recheck) == 0)

  37.                addWorker(null, false);

  38.        }

  39.        /**

  40.         * 走到这,说明队列满了,会再次addWorker

  41.         * (直到maxPoolSize)

  42.         * 如果失败,则走拒绝策略拒绝这个任务

  43.         */

  44.        else if (!addWorker(command, false))

  45.            reject(command);

  46.    }

execute方法里面的核心,是addWorker,下面看源码:


  1. private boolean addWorker(Runnable firstTask,

  2.                                                  boolean core) {

  3.               // goto用法的label,用来方便的跳出双层循环

  4.        retry:

  5.        /**

  6.         * 这快代码主要是校验线程池状态和线程个数

  7.         * 判断是否还能添加Worker

  8.         */

  9.        for (;;) {

  10.            int c = ctl.get();

  11.            int rs = runStateOf(c);

  12.            /**

  13.             * 这块代码比较绕,读懂的关键在于

  14.             * 把握 &&运算符的短路特性

  15.             * 主要功能是看什么情况下会返回false,

  16.             * 即addWorker失败

  17.             *

  18.             * 首先介绍下,线程池包括如下状态:

  19.             *      RUNNING:可以新加线程,

  20.             *                          同时可以处理queue中的线程

  21.             *      SHUTDOWN:不增加新线程,

  22.             *                              但是处理queue中的线程

  23.             *      STOP:不增加新线程,但是

  24.             *                   处理queue中的线程

  25.             *      TIDYING:所有的线程都终止了 queue中

  26.             *                      )同时workerCount为0,

  27.             *                       那么此时进入TIDYING

  28.             *      TERMINATED:terminated()方法结束

  29.             *                               ,变为TERMINATED

  30.             *

  31.             * 分析可知,addWorker失败有如下情况:

  32.             * 1. rs>SHUTDOWN

  33.             * 2. rs=SHUTDOWN,firstTask != null,

  34.             *      因为shutDown状态下不接受新线程,

  35.             *     只处理queue中的任务

  36.             * 3. rs=SHUTDOWN,firstTask = null,queue为空

  37.             *      此时说明是新增线程用来消耗queue中

  38.             *      的任务,但 queue为空,所以也返回失败

  39.             */

  40.            if (rs >= SHUTDOWN &&

  41.                    ! (rs == SHUTDOWN &&

  42.                            firstTask == null &&

  43.                            ! workQueue.isEmpty()))

  44.                return false;

  45.            /**

  46.             * 上面关注的是线程池的状态,

  47.             *  这块代码关注的则是worker(即线程)的数量

  48.             */

  49.            for (;;) {

  50.                int wc = workerCountOf(c);

  51.                /**

  52.                 * capacity为线程的最大个数,

  53.                 *     受int的29位限制,大于此值直接返回失败

  54.                 * core表示的是以corePoolSize还是

  55.                 *       maximumPoolSize为上限

  56.                 * 最初始阶段,线程是逐渐增加至

  57.                 *       corePoolSize的,core为true

  58.                 * 后面,对列也满了后,就也

  59.                 *         maximumPoolSize为上限了,core为false

  60.                 */

  61.                if (wc >= CAPACITY ||

  62.                      wc >= (core ? corePoolSize : maximumPoolSize))

  63.                    return false;

  64.                // workerCount加1成功后,跳出循环

  65.                if (compareAndIncrementWorkerCount(c))

  66.                    break retry;

  67.                c = ctl.get();  // Re-read ctl

  68.                if (runStateOf(c) != rs)

  69.                    continue retry;

  70.            }

  71.        }

  72.        /**

  73.         * 接下来的这块代码,就是新建线程

  74.         *  并且维护进set中

  75.         */

  76.        boolean workerStarted = false;

  77.        boolean workerAdded = false;

  78.        Worker w = null;

  79.        try {

  80.            // Worker实现了Runnable接口,

  81.            //  内部维护了线程和任务两个重要属性,

  82.            //  关键是它的run()方法

  83.            w = new Worker(firstTask);

  84.            final Thread t = w.thread;

  85.            if (t != null) {

  86.                // HashSet是线程不安全的,所以操作要加锁

  87.                final ReentrantLock mainLock = this.mainLock;

  88.                mainLock.lock();

  89.                try {

  90.                    int rs = runStateOf(ctl.get());

  91.                    /**

  92.                     * rs < SHUTDOWN(即running):可以

  93.                     *      正常添加线程

  94.                     * rs == SHUTDOWN && firstTask == null

  95.                     *    此时添加的是消耗queue中任务的线程

  96.                     */

  97.                    if (rs < SHUTDOWN ||

  98.                            (rs == SHUTDOWN && firstTask == null)) {

  99.                        if (t.isAlive()) // precheck that t is startable

  100.                            throw new IllegalThreadStateException();

  101.                        workers.add(w);

  102.                        int s = workers.size();

  103.                        if (s > largestPoolSize)

  104.                            largestPoolSize = s;

  105.                        workerAdded = true;

  106.                    }

  107.                } finally {

  108.                    mainLock.unlock();

  109.                }

  110.                /**

  111.                 * 添加成功后,启动新worker中的线程

  112.                 * 拿到时间片后,会执行Worker中

  113.                 * 的runWorker()方法

  114.                 */

  115.                if (workerAdded) {

  116.                    t.start();

  117.                    workerStarted = true;

  118.                }

  119.            }

  120.        } finally {

  121.            if (! workerStarted)

  122.                addWorkerFailed(w);

  123.        }

  124.        return workerStarted;

  125.    }


到目前为止,就算是往线程池添加线程成功了,接下来就是线程执行任务。

Worker创建成功后,先执行firstTask,之后工作就是不断的去队列拿任务执行

runWorker()方法:


  1. final void runWorker(Worker w) {

  2.    // 获取当前线程,worker的构造方法

  3.    // 会通过线程工厂方法新建一个线程

  4.    Thread wt = Thread.currentThread();

  5.    // 刚创建worker的firstTask才可能非null,

  6.    //创建用来消耗queue中的任务的firstTask都是给的null

  7.    Runnable task = w.firstTask;

  8.    w.firstTask = null;

  9.    w.unlock();

  10.    boolean completedAbruptly = true;

  11.    try {

  12.        // task不为null执行task,否者通过getTask()去队列拿

  13.        while (task != null || (task = getTask()) != null) {

  14.            w.lock();

  15.            // 再次判断状态

  16.            if ((runStateAtLeast(ctl.get(), STOP) ||

  17.                    (Thread.interrupted() &&

  18.                            runStateAtLeast(ctl.get(), STOP))) &&

  19.                    !wt.isInterrupted())

  20.                wt.interrupt();

  21.            try {

  22.                // 线程池提供的一个钩子方法

  23.                beforeExecute(wt, task);

  24.                Throwable thrown = null;

  25.                try {

  26.                    // 执行任务

  27.                    task.run();

  28.                } catch (RuntimeException x) {

  29.                    thrown = x; throw x;

  30.                } catch (Error x) {

  31.                    thrown = x; throw x;

  32.                } catch (Throwable x) {

  33.                    thrown = x; throw new Error(x);

  34.                } finally {

  35.                    afterExecute(task, thrown);

  36.                }

  37.            } finally {

  38.                task = null;

  39.                w.completedTasks++;

  40.                w.unlock();

  41.            }

  42.        }

  43.        /**

  44.         *  这里需要注意下

  45.         *   completedAbruptly默认是true

  46.         *   只有当上面的循环正常退出

  47.         *   即队列任务执行完了,才会设为false

  48.         *  任务执行过程中抛异常,依然未true

  49.        */

  50.        completedAbruptly = false;

  51.    } finally {

  52.        /**

  53.        * 注意这里,这个方法实现就是

  54.        *  文章开头问题的答案所在

  55.        * 如果线程执行任务过程中抛了异常

  56.        * 会发生什么

  57.        */

  58.        processWorkerExit(w, completedAbruptly);

  59.    }

  60. }


runWorker()方法比较简单,关键在于去队列拿任务的getTask()方法,和线程异常关闭处理的 processWorkerExit方法,下面分别进行阐述:


getTask()方法:


  1. private Runnable getTask() {

  2.        // 判断最后的poll是否要超时,

  3.        // 用于考量当前的线程个数是否富余

  4.        boolean timedOut = false;

  5.        for (;;) {

  6.            int c = ctl.get();

  7.            int rs = runStateOf(c);

  8.            /**

  9.             * 如果rs=shutdown,queue为空:

  10.             *         减少workerCount并返回null

  11.             * 如果rs>=stop

  12.             *         同样减少workerCount并返回null

  13.             */

  14.            if (rs >= SHUTDOWN && (rs >= STOP

  15.                                         || workQueue.isEmpty())) {

  16.                decrementWorkerCount();

  17.                return null;

  18.            }

  19.            int wc = workerCountOf(c);

  20.            /**

  21.             *  允许核心线程超时退出 或者

  22.             *   当前worker个数 > corePoolSize时

  23.             *  timed = true, 表示当前线程允许退出

  24.             */

  25.            boolean timed = allowCoreThreadTimeOut

  26.                              || wc > corePoolSize;

  27.            /**

  28.             * 这里关键在于分析if里面什么时候为true

  29.             * (1): wc > maximumPoolSize, 为true,

  30.             *        减少线程数,返回null

  31.             * (2): 1 < wc <= maximumPoolSize,

  32.             *        里面简化为 timed && timedOut

  33.             *        允许线程退出,并且当前线程富余

  34.             *        才减少线程

  35.             * (3): wc <= 1, 里面简化为:

  36.             *        timed && timedOut && workQueue.isEmpty()

  37.             *     此时只有一个工作线程,只要队列不为空

  38.             *     就得一直处理,不能减小线程数

  39.             */

  40.            if ((wc > maximumPoolSize || (timed && timedOut))

  41.                    && (wc > 1 || workQueue.isEmpty())) {

  42.                if (compareAndDecrementWorkerCount(c))

  43.                    return null;

  44.                continue;

  45.            }

  46.            try {

  47.                /**

  48.                 * 1. timed为true,当前线程允许退出,

  49.                 *      通过poll从队列拿

  50.                 *      拿到直接返回任务,如果超时时间内没拿到,

  51.                 *      说明任务不多,但线程多

  52.                 *      即worker有富余,因此timedOu 置为 true

  53.                  *      上面就能减少线程数了

  54.                 * 2. timed为false,不允许当前线程退出

  55.                  *    拿不到任务一直阻塞

  56.                 */

  57.                Runnable r = timed ?

  58.                        workQueue.poll(keepAliveTime,

  59.                                     TimeUnit.NANOSECONDS) :

  60.                        workQueue.take();

  61.                if (r != null)

  62.                    return r;

  63.                timedOut = true;

  64.            } catch (InterruptedException retry) {

  65.                timedOut = false;

  66.            }

  67.        }

  68.    }


processWorkerExit()方法:


  1. private void processWorkerExit(Worker w,

  2.                                boolean completedAbruptly) {

  3.        // 如果异常退出,workerCount先减一

  4.        if (completedAbruptly)

  5.            decrementWorkerCount();

  6.        final ReentrantLock mainLock = this.mainLock;

  7.        mainLock.lock();

  8.        try {

  9.            // 任务抛异常了,线程池也认为

  10.            // 此线程完成了它的工作

  11.            completedTaskCount += w.completedTasks;

  12.            workers.remove(w);

  13.        } finally {

  14.            mainLock.unlock();

  15.        }

  16.        tryTerminate();

  17.        int c = ctl.get();

  18.        if (runStateLessThan(c, STOP)) {

  19.            // 线程异常退出,不会进if,直接addWorker

  20.            if (!completedAbruptly) {

  21.                /**

  22.                 * 这里面处理的是线程正常退出逻辑

  23.                 * 如果允许核心线程超时退出,

  24.                  *      并且队列不为空,有任务需要

  25.                  *      处理,则 min = 1

  26.                 * 如果不允许核心线程超时退出

  27.                 *     则min = corePoolSize

  28.                 *

  29.                 * 如果当前的线程数小于min

  30.                  *    则新建一个线程进行补位

  31.                 */

  32.                int min = allowCoreThreadTimeOut ? 0 : corePoolSize;

  33.                if (min == 0 && ! workQueue.isEmpty())

  34.                    min = 1;

  35.                if (workerCountOf(c) >= min)

  36.                    return;

  37.            }

  38.            addWorker(null, false);

  39.        }

  40.    }



五: 总结


至此,关于线程池原理就介绍完了,下面进行简单的总结。

线程池,其实说白了就是一个 Set<Worker>,Worker是线程池的内部类,它实现了Runnable接口,维护了线程和任务,当新建一个Worker时,它的构造方法内会新创建一个线程用来执行任务,他首先条件入参传进来的任务,执行完后,就不管的从workQueue中不断的通过getTask()从队列获取任务来执行。


以上是关于详解Java线程池运行机制(结合源码)的主要内容,如果未能解决你的问题,请参考以下文章

Java线程池详解

Java线程池详解

Java 线程池详解

深入浅出Java并发编程指南「源码分析篇」透析ThreadPoolExecutor线程池运作机制和源码体系

Java线程池Executor框架详解

SpringBoot 整合线程池及各参数详解