知其然而知其所以然~线程池深入源码分析-手把手debug源码系列

Posted 张子行的博客

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了知其然而知其所以然~线程池深入源码分析-手把手debug源码系列相关的知识,希望对你有一定的参考价值。

本文将从为什么使用线程池 ?—> 线程池的使用方式----> 线程池源码逐步分析线程池

为什么使用线程池?

  1. 降低资源开销:通过重复利用已创建的线程降低创建销毁线程带来的开销
  2. 提高响应速度:当有任务到达时,任务可以不需要等待线程创建就可以立即执行
  3. 提高线程的可管理性:使用线程池可以对线程进行统一的监控管理

线程池的常见创建形式有哪几种?

  1. 通过Executors对象创建我们线程池
ExecutorService executorService = Executors.newFixedThreadPool(3);
  1. 通过实例化ThreadPoolExecutor创建我们的线程池
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
                10,
                20,
                1,
                TimeUnit.SECONDS,
                new LinkedBlockingDeque<>(5),
                Executors.defaultThreadFactory(),
                new ThreadPoolExecutor.AbortPolicy());

通过ThreadPoolExecutor这种方式创建线程池可能稍微有点繁琐,但是对比通过Executors创建的线程池,可控性会提高很多。推荐使用ThreadPoolExecutor()

构建ThreadPoolExecutor参数释义

  1. 核心线程数(corePoolSize)
  2. 最大线程数(maximumPoolSize)
  3. 空闲线程活跃时间(keepAliveTime)
  4. 阻塞队列(queue)
  5. 饱和策略(policy)
    在这里插入图片描述

核心线程数~最大线程数~阻塞队列之间的关系图解

  1. 如果线程数小于核心线程数那么正常执行任务
  2. 如果线程数大于核心线程数且线程池状态为Running,且阻塞队列未满,那么将当前线程添加到阻塞队列。如果此时线程池状态不是运行状态,移除任务执行饱和策略,反之开启一个新的work,从阻塞队列中拿取任务然后运行
  3. 如果阻塞队列已经满了,当前任务添加到阻塞队列失败,执行reject(饱和策略)
    在这里插入图片描述

创建好的线程池后,我们直接将任务submit()或者execute()就好了。

submit与execute有什么区别?

  • submit用于执行有返回值的任务(CallAble)
    在这里插入图片描述
  • execute用于执行没有返回值的任务(RunnAble)
    在这里插入图片描述

好了前言就到这吧,书写如下demo开始debug线程池源码

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
                10,
                20,
                1,
                TimeUnit.SECONDS,
                new LinkedBlockingDeque<>(5),
                Executors.defaultThreadFactory(),
                new ThreadPoolExecutor.AbortPolicy());

        FutureTask<Integer> result = new FutureTask<Integer>(new task2());
        /**
         * execute用于执行没有返回值的任务
         */
        threadPoolExecutor.execute(new task());
        /**
         * submit用于执行有返回值的任务。
         */
        Future<?> submit = threadPoolExecutor.submit(new task2());
        System.out.println(submit.get());
        threadPoolExecutor.shutdown();
    }

    static class task implements Runnable {
        @Override
        public void run() {
            System.out.println("task runnable");
        }
    }

    static class task2 implements Callable<Integer> {
        @Override
        public Integer call() throws Exception {
            System.out.println("task callable");
            return 1;
        }
    }

上述demo运行结果图
在这里插入图片描述

直接在 threadPoolExecutor.execute(new task()) 这断点,看线程池是如何执行任务的?
在这里插入图片描述

线程池是如何执行任务的?

public void execute(Runnable command) {
		/**
		* 任务为null直接抛出异常
		*/
        if (command == null)
            throw new NullPointerException();
            
        int c = ctl.get();
        //如果线程数量小于核心线程数,
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);
    }

ThreadPoolExecute变量是如何设计的?

俺看了一眼上述的代码execute()直呼瑟瑟发抖。先别急研究上面的代码,先来分析一下ThreadPoolExecutor类中的各个变量的含义如下图

在这里插入图片描述
从上图可知线程池的状态有那几种?

  1. 运行状态:RUNNING = -1 << COUNT_BITS;
  2. 关闭状态:SHUTDOWN = 0 << COUNT_BITS;
  3. 停止状态:STOP = 1 << COUNT_BITS;
  4. 过度状态:TIDYING = 2 << COUNT_BITS;
  5. 结束状态:TERMINATED = 3 << COUNT_BITS;

COUNT_BITS = 32 - 3 = 29

 private static final int COUNT_BITS = Integer.SIZE - 3;

-1 的补码为:11111111111111111111111111111111

RUNNING = -1 << COUNT_BITS = -1 << 29 = 11100000000000000000000000000000

所以 RUNNING = 11100000000000000000000000000000 = -536870912
同理可得 SHUTDOWN = 00000000000000000000000000000000 = 0
同理可得 STOP = 00100000000000000000000000000000 = 536870912
同理可得 TIDYING = 01000000000000000000000000000000 = 1073741824
同理可得 TERMINATED = 01100000000000000000000000000000 = 1610612736

小结观察每个线程状态的高三位都是独一无二的,高三位记录着线程运行状态,而后29位记录着对应状态的线程数(work数量)。(初始状态后29位组成全是0)

如何确定线程池的状态?

runStateOf:确定线程池的状态的方法

 COUNT_BITS = 32 - 3 = 29
 private static final int CAPACITY   = (1 << COUNT_BITS) - 1;
 private static int runStateOf(int c)     { return c & ~CAPACITY; }

CAPACITY = 1<<29 -1 = 00011111111111111111111111111111(线程的最大容量)

~CAPACITY = 11100000000000000000000000000000

c & ~CAPACITY 计算出来的结果永远只与 c 的高三位有关,因此通过 c&~CAPACITY 操作可以确定线程池的运行状态

不论是运行状态还是什么别的状态,它们的前三位都是独一无二的,也就是说高三位是记录线程池的运行状态的

如何得到线程池中线程数量?

workerCountOf:可以确定线程池中的线程数量(work对象的数量)

private static int workerCountOf(int c)  { return c & CAPACITY; }
  • CAPACITY = 1<<29 -1 = 00011111111111111111111111111111

由于 CAPACITY 高三位全是0 ,所以c & CAPACITY的结果只与低29位有关。
例如:假设线程数为1 , 1转换成补码为 00000000000000000000000000000001 与 CAPACITY做 & 运算,得到的结果为 1。

如何初始化线程池状态、线程初始数量?

ctlOf:初始化线程池状态、数量的方法

 private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
 private static int ctlOf(int rs, int wc) { return rs | wc; }

拿ctlOf(RUNNING, 0)的返回值举例 RUNNING|0 = 11100000000000000000000000000000 | 0 = 11100000000000000000000000000000。
即:初始化了线程池状态为RUNNING 且初始线程数量是 0 。

开始真正分析线程池执行流程(execute)

  1. 如果线程数小于核心线程数那么正常执行任务
  2. 如果线程数大于核心线程数且线程池状态为Running,且阻塞队列未满,那么将当前线程添加到阻塞队列。如果此时线程池状态不是运行状态,移除任务执行饱和策略,反之从阻塞队列中拿取任务然后运行
  3. 如果阻塞队列已经满了,当前任务添加到阻塞队列失败,执行reject(饱和策略)
public void execute(Runnable command) {
		/**
		* 任务为null直接抛出异常
		*/
        if (command == null)
            throw new NullPointerException();
        /**
        * 获取线程状态、数量。默认线程状态为为运行状态
        */
        int c = ctl.get();
        //如果当前工作线程数量 < 核心线程数量
        if (workerCountOf(c) < corePoolSize) {
        	//将任务添加到works(hashset)中,同时尝试运行当前任务和阻塞队列中的任务
        	//如果线程池中线程数量大于核心线程数那么会拒绝运行此任务
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        //如果线程池状态为运行状态,此时工作线程数量大于核心线程数量,如果此时阻塞队列没有满,那么将任务尝试放入阻塞队列中
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            //重新检查线程池状态如果此时是非运行状态,移除任务,同时执行
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
            //开启一个work,运行阻塞队列中的任务
            //如果当前线程池数量大于最大线程数量那么添加任务失败
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);
    }

addWorker

addWorker(Runnable firstTask, boolean core) 第二个参数:如果core 为
true,那么表示如果线程数大于核心线程数将会拒绝任务的启动,如果为false,表示如果大于最大线程数,将会拒绝任务的启动。

此方法关系到线程的启动,先过一遍代码注释,下面有总结。

private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);
//下面判断条件等价于:rs >= SHUTDOWN && rs != SHUTDOWN || rs >= SHUTDOWN && firstTask != null || rs >= SHUTDOWN && workQueue.isEmpty()
            //1.线程池状态不是运行状态,是STOP、TIDYING、TERMINATED中的一种拒绝执行任务
            //2.线程池状态不是运行状态,是其他状态中的一种,任务不为null,将会拒绝任务
            //3.线程池状态不是运行状态,是其他状态中的一种,,任务队列为空,将会拒绝任务
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;
//死循环
            for (;;) {
            //线程数
                int wc = workerCountOf(c);
                //1.线程数超过了最大容量将会拒绝任务
                //2.线程数大于核心线程数,或者大于最大最大线程数,将会拒绝任务
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                 //CAS操作,对work数量加一
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                //其他CAS操作失败的线程一直自旋
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
        //将任务包装成一个Worker对象
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    int rs = runStateOf(ctl.get());
                    //1.线程池是运行状态,将任务添加至工作集合
                    //2.任务为null且线程池是SHUTDOWN状态,也将任务添加进工作集合
                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        //判断线程是否存活
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                            //works为一个hashSet
                        workers.add(w);
                        //工作集合的大小
                        int s = workers.size();
                        if (s > largestPoolSize)
                        //更新最大线程池的大小
                            largestPoolSize = s;
                            //标记任务添加成功
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                //开启任务
                    t.start();
                    //标记任务启动成功
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }

拒绝任务的情况分析(addWorker失败)

rs >= SHUTDOWN && rs != SHUTDOWN || rs >= SHUTDOWN && firstTask != null || rs >= SHUTDOWN && workQueue.isEmpty()

上面的代码和下面的代码可以等价替换

if (rs >= SHUTDOWN &&
    ! (rs == SHUTDOWN &&
       firstTask == null &&
       ! workQueue.isEmpty()))
  1. 线程池状态不是运行状态,是STOP、TIDYING、TERMINATED中的一种直接拒绝任务
rs >= SHUTDOWN && rs != SHUTDOWN
  1. 要想进入if分支必然只可能是第一点中的 rs != SHUTDOWN 为 false ,也就是说线程池状态不是运行状态,且任务不为null,将会直接拒绝任务
rs >= SHUTDOWN && firstTask != null
  1. 线程池状态不是运行状态,阻塞队列为空,将会直接拒绝任务
rs >= SHUTDOWN && workQueue.isEmpty()
  1. 线程数超过了最大容量将会直接拒绝任务
  2. 线程数大于核心线程数,或者大于最大最大线程数,也会直接拒绝任务
if (wc >= CAPACITY ||
    wc >= (core ? corePoolSize : maximumPoolSize))
    return false;

添加至工作集合(workers)情况分析

  1. 线程池是运行状态,将任务添加至工作集合
  2. 任务为null且线程池是SHUTDOWN状态,也将任务添加进工作集合
if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) 

添加成功后,将会启动对应Work中的Thread
在这里插入图片描述

线程启动流程(start)

运行到 t.start(); 将会来到Work内部的run方法
在这里插入图片描述

runWorker

一直循环如果当前任务不为null且阻塞队列存在任务就一直执行任务,即运行work中的task.run(),里面比较重要的一个点就是

如果线程池正在停止,请确保线程被中断;如果没有,请确保线程不被中断。这需要在第二种情况下重新检查以处理shutdownNow这种情况,同时清除中断。

runStateAtLeast(ctl.get(), STOP) && !wt.isInterrupted()||( (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP)) && !wt.isInterrupted()
  1. 如果线程池状态为STOP、TIDYING、TERMINATED中的状态之一且任务线程没有被中断,将会中断当前任务线程
  2. 如果线程池状态为RUNNING或SHUTDOWN,但是当前线程已经中断,重新检查线程池状态如果为STOP、TIDYING、TERMINATED中的状态之一,将会中断当前任务线程。不论线程是否中断task.run()都会运行

这里设置线程中断的目的就是:无论线程池是处于什么状态,我们在task.run()期间都能获取到合理的线程状态

while (task != null || (task = getTask()) != null) {

}

上面这段代码task == null 出现的情况是 addWork(null,false),才会出现task == null,此时将会直接处理阻塞队列中的任务

runWorker源代码

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    //拿到Work对象中的任务
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
    //task == null 的情况即为addWork(null,false)
    //如果当前任务不为null或者阻塞队列中能获取到任务
        while (task != null || (task = getTask()) != null) {
        //加锁,与shutdown有关
            w.lock();
            //条件可以拆分runStateAtLeast(ctl.get(), STOP) && !wt.isInterrupted()||( (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP)) && !wt.isInterrupted() ) 
//1.如果线程池处于STOP、TIDYING、TERMINATED中的状态之一 ,且当前线程没有被中断,中断当前线程
//2.如果第一点不满足,只可能!wt.isInterrupted() 为 true ,runStateAtLeast(ctl.get(), STOP) 为 false。那么当前线程池的状态只可能处于RUNNING或SHUTDOWN状态 如果当前线程(可以理解为第一个任务线程)中断了,此时再重新检查一下线程池状态,如果线程池还处于STOP、TIDYING、TERMINATED中的状态之一,那么中断work中的线程(任务线程)
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
            //空方法实现
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                //运行任务
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                //空方法实现
                    afterExecute(task, thrown);
                }
            } finally {
            //滞空任务
                task = null;
                //记录完成任务的个数
                w.completedTasks++;
                //解锁
                w.unlock();
            }
        }
        深入浅出Java并发编程指南「源码分析篇」透析ThreadPoolExecutor线程池运作机制和源码体系

高并发之——从源码角度分析创建线程池究竟有哪些方式

深入源码分析Java线程池的实现原理

深入源码分析Java线程池的实现原理

Java核心深入理解线程池ThreadPool

Gorm 源码分析