通过实例来透彻分析线程池的源码---ThreadPoolExecutor

Posted 小猪快跑22

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了通过实例来透彻分析线程池的源码---ThreadPoolExecutor相关的知识,希望对你有一定的参考价值。

前言

其实大概1年前就想把线程池的源码完整的撸一遍了,但是看的时候太注重细节了,比如其中的CAS操作、AQS以及ReentrantLock的lock、tryLock等,结果就是跑偏了;所幸今年年初的时候有时间,就把AQS的源码、CAS的使用以及ReentrantLock等一些并发编程的源码撸了一遍,再来看线程池的源码,感觉还是非常的舒爽,哈哈哈。共勉。

文章中设计到的 CAS操作,以及AQS操作可以参考我之前的博文:

CAS操作指南
AQS源码详解

讲线程池的原理之前,先得了解一下线程池中几个重要的概念。

  1. 核心线程数 (corePoolSize:核心线程的数量;它的作用可以这样理解:向线程池中添加任务,如果线程池中的线程数量小于 corePoolSize,那么直接新建线程执行任务;如果线程池中的线程数量大于corePoolSize,那么就会往 阻塞队列workQueue中添加任务,此时如果阻塞队列满了且线程池中的线程数量小于最大线程数 maximumPoolSize,那么也会新建一个线程执行任务;如果阻塞队列满且线程数量大于最大线程数maximumPoolSize,那么会执行饱和策略,默认的策略是抛弃要加入的任务。

  2. 最大线程数maximumPoolSize):如果阻塞队列满了,则判断线程池中的线程数量是否小于 maximumPoolSize,是则直接新建一个线程来处理任务,否则执行饱和策略。

  3. 阻塞队列**(workQueue)**:线程池中的线程数量大于核心线程的数量,则将新建的任务加入到阻塞队列。

  4. 空闲线程的存活时间 (keepAliveTime):线程空闲下来之后,线程的存活时间,超过这个时间还没有任务执行,则结束该线程。注意,这个回收只是回收非核心线程,比方说核心线程数是2,最大线程数是6,假设任务非常多,最后创建了6个线程来执行任务,最后后回收4个非核心线程,而核心线程不会回收,除非你任务设置要回收核心线程。

  5. 饱和策略 (RejectedExecutionHandler):当等待队列已满,线程数也达到最大线程数时,线程池会根据饱和策略来执行后续操作,默认的策略是抛弃要加入的任务。

下面来一张图来说明下:

一、线程池的几种状态:

  1. RUNNING: 运行状态,能够接受新的任务且会处理阻塞队列中的任务。
  2. SHUTDOWN:关闭状态,不接受新任务,但是会处理阻塞队列中的任务,执行线程池的 shutDown()对应的就是此状态。
  3. STOP: 停止状态,不接受新的任务,也不会处理等待队列中的任务并且会中断正在执行的任务。调用线程池的 shutDownNow()对应的是此状态
  4. TIDYING: 整理,即所有的任务都停止了,线程池中线程数量等于0,会调用 terminated()如果你自己实现线程池的话。
  5. TERMINATED: 结束状态,terminated()方法执行完了。

下面是一些重要的变量注释:

    //CAS, 它的高三位表示线程池的状态,低29位表示线程池中现有的线程数
    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    //表示线程池线程数的bit数
    private static final int COUNT_BITS = Integer.SIZE - 3;
    //最大的线程数量,数量是完全够用了 0001 1111 1111 1111 1111 1111 1111 1111
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

    // runState is stored in the high-order bits
    //1110 0000 0000 0000 0000 0000 0000 0000
    private static final int RUNNING    = -1 << COUNT_BITS;
    //0000 0000 0000 0000 0000 0000 0000 0000
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    //0010 0000 0000 0000 0000 0000 0000 0000
    private static final int STOP       =  1 << COUNT_BITS;
    //0100 0000 0000 0000 0000 0000 0000 0000
    private static final int TIDYING    =  2 << COUNT_BITS;
    //0110 0000 0000 0000 0000 0000 0000 0000
    private static final int TERMINATED =  3 << COUNT_BITS;

    // Packing and unpacking ctl
    //获取线程池的状态
    private static int runStateOf(int c)      return c & ~CAPACITY; 
    //获取线程池中已创建线程的数量
    private static int workerCountOf(int c)   return c & CAPACITY; 
    //组装状态和数量,成为ctl
    private static int ctlOf(int rs, int wc)  return rs | wc; 

    
    private static boolean runStateLessThan(int c, int s) 
        return c < s;
    
    
 
    private static boolean runStateAtLeast(int c, int s) 
        return c >= s;
    
    //判断线程是否在运行
    private static boolean isRunning(int c) 
        return c < SHUTDOWN;
    

二、线程池的添加任务的源码解析:

我这里为了能将的清楚,我将线程池的主要代码都Copy了,加了些注释,其他的没变。

为了说明例子,我这里的线程池定义如下:

 ExecutorService executorService = new ThreadPoolExecutor(1, 2, 100, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(1));

其中 核心线程数等于1,最大线程数等于2,空闲线程的存活时间为100毫秒,阻塞队列是大小为1的数组类型的队列,此队列只能存放一个任务且不会自动扩容

下面我new了3个任务放进线程池,如下:

		Runnable runnable1 = () -> 
            Log.e("test thread pool", "r1 start");
            try 
                Thread.sleep(3_000);
             catch (InterruptedException e) 
                e.printStackTrace();
            
            Log.e("test thread pool", "r1 end");
        ;

        Runnable runnable2 = () -> 
            Log.e("test thread pool", "r2 start");
            try 
                Thread.sleep(3_000);
             catch (InterruptedException e) 
                e.printStackTrace();
            
            Log.e("test thread pool", "r2 end");
        ;

        Runnable runnable3 = () -> 
            Log.e("test thread pool", "r3 start");
            try 
                Thread.sleep(3_000);
             catch (InterruptedException e) 
                e.printStackTrace();
            
            Log.e("test thread pool", "r3 end");
        ;
			// 直线线程池的添加任务:
            executorService.execute(runnable1);
            executorService.execute(runnable2);
            executorService.execute(runnable3);

按照上面讲的线程池原理那么执行的流程应该是这样:
先创建一个线程来执行Runnable1,此时核心线程数等于1,那么会把Runnable2放到阻塞队列中去,由于阻塞对列只能存放1个任务,且最大线程数等于2,那么会新建一个线程来执行Runnable3。

注意,我这里的3个任务我都sleep(3_000),我为什么这里要让你们注意这里呢?
因为,如果我的任务1的任务很简单就是打印一行日志的话,那么这个任务很快就会执行完,那么可能在执行任务3的时候,任务1已经执行完,那么执行任务1的线程就会去阻塞队列中将任务2出队且执行,那么任务3就会被加入到阻塞队列中。

execute 方法:
   public void execute(Runnable command) 
        if (command == null)
            throw new NullPointerException();       
        
        int c = ctl.get();
        int wtc = workerCountOf(c); // 计算线程池中当前线程的数量
        Log.e("test thread pool", "c = " + c + ", wtc = " + wtc);
        // 如果线程的数量小于核心线程数,那么直接提交任务,并且创建线程来执行任务
        if (wtc < corePoolSize) 
            if (addWorker(command, true))
                return;
             // 如果提交任务失败,可能是线程池执行了shunDown或shutDownNow操作,
             // 那么重新获取ctl的值,执行下面的流程
            c = ctl.get();
            Log.e("test thread pool", "addWorker failed c = " + c);
        
        // 如果线程池是运行状态,那么将任务添加到阻塞队列,执行到这里只有2个条件:
        // 条件1:核心线程数已满
        // 条件2:线程池执行了shunDown或shutDownNow操作
        if (isRunning(c) && workQueue.offer(command)) 
            int recheck = ctl.get();
            Log.e("test thread pool", "recheck = " + recheck);
            // 如果当前线程池不是运行状态,那么将任务从阻塞队列中移除并执行拒绝策略
            if (!isRunning(recheck) && remove(command)) 
                Log.e("test thread pool", "...reject...");
                reject(command);
             else if (workerCountOf(recheck) == 0)  // 如果线程池中线程数量等于0,那么就添加一个空任务,目的就是继续执行阻塞队列中的任务
                Log.e("test thread pool", "...addWorker a null task...");
                addWorker(null, false);
            
        
        // 如果核心线程数已满且阻塞队列已满,那么就开启一个新线程来执行任务,
        // 如果添加失败则执行抛弃策略
        // 这里面的失败的条件,一般是执行下面addWorker(command, false)的时候,
        // 另外一个线程执行了线程池的shutDown()操作,这种情况基本不会出现,
        //因为线程池的操作如extcute或shutDown一般都是主线程中的,
        // 所以 addWorker和shutdown都是顺序执行的,不会出现失败的情况。
         else if (!addWorker(command, false))  
            Log.e("test thread pool", "...reject..2....");
            reject(command);
        
    

按照之前自定义的线程池executorService = new ThreadPoolExecutor(1, 2, 100, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(1))执行的结果如下:

只看我用绿框框出来的即可,其他log是我跟踪分析的时候用的。

结果分析:

开始时因为线程池中线程个数是0,而且核心线程数是1,所以直接创建一个线程来执行任务1;

添加任务2的时候,此时线程池中线程的个数等于1了,即核心线程已满,所以将任务2添加到阻塞队列中;

添加任务3的时候,核心线程已满且阻塞队列已满(这里我队列的大小设置为1,即只能存放1个任务),但是线程池中的线程数小于最大的线程数(2个),所以会新建一个线程执行任务3。从上面的日志可以看出,执行任务1和任务3的线程分别是 :pool-1-thread-1 和 pool-1-thread-2

日志还可以看出,执行任务1和任务2的线程都是 pool-1-thread-1,这里面涉及到的就是线程池中线程的复用,到底是怎么实现的呢?

那就得看看 之前的addWorker方法啦:

    private boolean addWorker(Runnable firstTask, boolean core) 
        retry:
        for (; ; ) 
            int c = ctl.get();
            int rs = runStateOf(c); // 获取线程池的状态
            Log.e("test thread pool", "addWorker rs = " + rs + ", firstTask = " + firstTask + ", ....... core = " + core);
           //如果线程池的状态到了SHUTDOWN或者之上的状态时候,只有一种情况还需要继续添加线程,
            //那就是线程池已经SHUTDOWN,但是队列中还有任务在排队,而且不接受新任务(所以firstTask必须为null)
           //这里还继续添加线程的目的是,尽快完成阻塞队列中的任务
            if (rs >= SHUTDOWN &&
                    !(rs == SHUTDOWN &&
                            firstTask == null &&
                            !workQueue.isEmpty()))
                return false;

            for (; ; ) 
                // 获取线程个数
                int wc = workerCountOf(c);
                // 如果线程数大于CAPACITY 或者线程数大于等于核心线程数或者最大线程数
                // 表示添加任务失败
                if (wc >= CAPACITY ||
                        wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                // 线程池中线程的个数加1,如果成功的话直接跳出最外层的for循环    
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                // 检测当前线程状态如果发生了变化,则继续回到retry,重新开始循环
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            
        

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try 
           // 用Worker类包装任务,真正的执行任务就是在Worker中
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) 
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock(); // 加锁
                try 
                    int rs = runStateOf(ctl.get()); // 获取线程池的状态

					// rs < SHUTDOWN 表示线程池是运行状态
					// (rs == SHUTDOWN && firstTask == null) 表示线程池执行了shutDown,
				   // 且阻塞队列中还有任务,这时候需要添加一个空的任务,即创建新的线程来加速阻塞队列中的任务尽快完成
                    if (rs < SHUTDOWN ||
                            (rs == SHUTDOWN && firstTask == null)) 
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    
                 finally 
                    mainLock.unlock();
                
                if (workerAdded) 
                    Log.e("test thread pool", "addWorker start");
                    // 这里真正的在线程中处理任务啦
                    t.start();
                    workerStarted = true;
                
            
         finally 
            // 添加任务出错的机制
            if (!workerStarted)
                addWorkerFailed(w);
        
        return workerStarted;
    

addWorker的代码注释的很清楚,代码也比较简单,下面主要看看里面真正的任务执行类 Worker,和 添加任务失败的方法 addWorkerFailed

真正的任务执行类 Worker

Worker 的代码很简单,主要就是继承了AQS来加、解锁,以及创建线程来执行任务,构造函数如下:

       Worker(Runnable firstTask) 
            setState(-1); // 这里面就是设置AQS中state的值为-1,用途是调用shutDown时根据状态来响应中断操作的,
            // 要执行的 Runnable任务
            this.firstTask = firstTask;
            // 通过线程工厂方法来创建新的线程
            this.thread = getThreadFactory().newThread(this);
        

真正执行的任务方法 runWorker

    final void runWorker(Worker w) 
        Log.e("test thread pool", "runWorker begin task = " + w.firstTask);
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try 
           // 如果task你为空,或者去阻塞队列中去取任务不为空,这里的getTask 如果阻塞队列中任务为空 会阻塞当前线程
            // 这里就是线程复用的核心,比方说当这个程执行完当前任务后,就去队列中取任务来执行,这就完成了线程的复用
            while (task != null || (task = getTask()) != null) 
                Log.e("test thread pool", "runWorker  while task = " + task);
                w.lock();
               // 第一个条件只要调用了shutDownNow才会成立,如果调用了shutDownNow 那么就会执行线程的中断即中断正在执行的任务
                if ((runStateAtLeast(ctl.get(), STOP) ||
                        (Thread.interrupted() &&
                                runStateAtLeast(ctl.get(), STOP))) &&
                        !wt.isInterrupted()) 
                    Log.e("test thread pool", "runWorker  interrupt");
                    wt.interrupt(); // 中断操作
                
                try 
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try 
                        task.run(); // 开始执行任务
                     catch (RuntimeException x) 
                        thrown = x;
                        throw x;
                     catch (Error x) 
                        thrown = x;
                        throw x;
                     catch (Throwable x) 
                        thrown = x;
                        throw new Error(x);
                     finally 
                        afterExecute(task, thrown);
                    
                 finally 
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                
            
            completedAbruptly = false;
         finally 
            Log.e("test thread pool", "runWorker  processWorkerExit");
            // 处理线程完成所有任务后的操作,注意,这里可能执行不到,为什么?
            // 因为上面注释说了,getTask取队列中的任务时,
            // 如果队列为空,那么就会阻塞住当前线程,所有这里就执行不到了;
            // 但是调用shutDown就能执行到这里了,这也是为什么我们经常看到线程池的例子都是excute(r)后调用shutDown()
            processWorkerExit(w, completedAbruptly);
        
    
下面再看看 getTask 方法

getTask 方法 就是从阻塞队列中获取待执行的任务,按照先进先出的原则取任务。

    private Runnable getTask() 
        boolean timedOut = false; // Did the last poll() time out?

        for (; ; ) 
            int c = ctl.get();
            int rs = runStateOf(c); // 获取线程池状态
            Log.e("test thread pool", "--getTask  ----- c = " + c + ", rs = " + rs);
            // 线程池的状态是大于等于 SHUTDOWN 且 状态大于等于STOP或者阻塞队列为空
            // 这里满足的条件有2种:
            // 1. 调用shutDown 后直到阻塞队列中的任务都执行完
            // 2. 调用 shutDownNow() 后线程池的状态就变成STOP了
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty()))  // **注1**
                decrementWorkerCount(); // 线程池中的线程数减1
                Log.e("test thread pool", "--getTask  c = " + ctl.get());
                // retur

以上是关于通过实例来透彻分析线程池的源码---ThreadPoolExecutor的主要内容,如果未能解决你的问题,请参考以下文章

java线程池源码的简单分析

线程池的使用及ThreadPoolExecutor的execute和addWorker源码分析

通过ThreadPoolExecutor源码分析线程池实现原理

Java 线程池 ThreadPoolExecutor源码简析

Java 线程池 ThreadPoolExecutor源码简析

手撕ArrayList底层,透彻分析源码