线程池相关源码阅读笔记 | 手记

Posted Rinhowl

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了线程池相关源码阅读笔记 | 手记相关的知识,希望对你有一定的参考价值。

Sep 6 2018 | DELIBERATE PRATICE |

ABOUT THREAD POOL | 线程池 |

Tags | CODE | READING |


这几天断断续续看了线程池相关的源码,以下是我做的笔记,作为输出。




根据『由上而下』的学习原则,先来看看顶层接口Executor.

一、Executor接口

这是一个线程池相关的顶层接口,里面只声明了一个方法:

public interface Executor {

    /**
     * Executes the given command at some time in the future.  The command
     * may execute in a new thread, in a pooled thread, or in the calling
     * thread, at the discretion of the {@code Executor} implementation.
     *
     * @param command the runnable task
     * @throws RejectedExecutionException if this task cannot be
     * accepted for execution
     * @throws NullPointerException if command is null
     */

    void execute(Runnable command);
}

从方法名和参数就可以猜出来,这个方法就是用来执行传入的任务的。

那么下面来看看谁继承/实现了Executor这个接口。

二、ExecutorService接口

这个接口继承了Executor接口,追加了一些方法如:shutdown、submit、invokeAll、invokeAny。

那么这些方法都是干什么的呢?我粗略阅读源码,有如下大概的理解:  
1)shutdown:关闭Executor     ;  
2)submit:提交任务并执行,但返回值Future是什么鬼?  
3)invokeAll:执行给定的任务集,返回一个Future集合,其中包含所有完成或者超时到期的状态和结果。
4)invokeAny:传入任务集,如果有一个任务成功执行的话就返回结果,正常执行或者发生异常,未完成的任务都将被取消。

然后我就有一个问题,这个接口应该怎么被使用?

[注]:关于Future:Executor就是Runnable和Callable的调度容器,Future就是对于具体的Runnable或者Callable任务的执行结果进行取消、查询是否完成、获取结果、设置结果操作。(有待学习!)

三、AbstractExecutorService抽象类

AbstractExecutorService实现了ExecutorService这个接口,这个抽象类除了实现了接口中的大部分方法外,还增加了newTaskFor这个方法,根据待执行任务和默认值创建一个FutureTask,其实就是对Runnable实例进行再次包装,这个方法在类内部的被调用。

我看了一下submit方法的实现,还真有意思。先根据任务创建了一个FutureTask对象,然后调用继承字自接口Executor的execute方法执行FutureTask,最后返回这个FutureTask对象。然后我就有点纳闷了,execute方法不是还没有实现么?想到这个抽象类还只是实现了部分接口中的方法,所以就可以理解了,父类定义好执行任务的流程,具体是如何执行任务的,将留给子类去实现。

四、Executors类

这个类很独立,没有实现任何接口也没有继承自父类/父接口。值得关注的是里面的四个静态方法,分别是:

1) public static ExecutorService newFixedThreadPool(int nThreads):创建固定线程数量的线程池;  
2) public static ExecutorService newSingleThreadExecutor():创建单一线程的线程池  
3) public static ExecutorService newCachedThreadPool():创建可缓存的线程池  
4) public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize):创建可以定时执行任务的线程池

拿newFixedThreadPool这个方法,来看看其中到底是如何创建一个线程池的。

在这个方法中,就是创建了一个ThreadPoolExecutor(线程池处理器)对象,这个对象的详解,请看第五节内容。

public static ExecutorService newFixedThreadPool(int nThreads) {
        //构造一个核心线程数和最大线程数都是nThreads的线程池
        //存活时间为0L,因为是固定大小,所以这个参数用不到
        //时间单位设置为TimeUnit.MILLISECONDS
        //工作队列为LinkedBlockingQueue
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

五、ThreadPoolExecutor类

这个类继承自AbstractExecutorService类,是一个具体用来实现任务的执行等相关操作的类。

直接看参数列表最多的构造函数:

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler)

参数列表的参数都是什么意思呢?

corePoolSize:核心池的大小;  
maximumPoolSize:线程池允许的最大线程数量;  
keepAliveTime:线程存活时间;
unit:参数keepAliveTime的时间单位;  
workQueue:工作队列,用来保存待执行的任务;  
threadFactory:线程工厂,在Executor创建一个线程的时候使用到;  
handler:处理策略,当任务执行被阻塞的时候,也就是工作队列的容量饱和的时候,调用该处理策略

下面,再来看看execute方法的实现。这个方法是在Executor接口中定义的,用来执行传入的任务,可能会创建一个新的线程也可能是使用线程池中的线程来执行。

通过看源码中的注释文档,这个方法的处理有三大步骤:

  1. 如果当前运行线程少于corePoolSize,便尝试创建新的线程来执行传入的任务。

  2. 如果任务成功添加到工作队列,那么仍然需要仔细检查是否应该添加一个线程(因为自上次检查后现有的线程已经死亡),或者自从进入此方法后池关闭了。 所以需要重新检查状态,如果线程池关闭了则需要回滚入队,或者如果线程池没有线程则创建新的线程来执行该任务。

  3. 如果不能添加任务到工作队列中,那么创建一个新的线程。如果失败,则表明线程池已经关闭或者工作队列已经饱和,因此拒绝该任务。

其中有几个方法值得关注:  
1)ctl.get():返回ctl的值,ctl是控制状态的属性,封装了workCount和runState,是一个AtomicInteger类型的变量;  
2)workerCountOf(int c): 传入ctl,获取当前运行线程总数workCount;  
3)runStateOf(int c):传入ctl,获取当前线程池的状态,有Running,Shutdown,Stop,Tidying,Terminate五种状态;    
4)addWorker(Runnable command, boolean core):添加工作线程来执行任务。首先根据当前线程池的状态和给定边界(核心池的大小或最大线程数量)判断是否可以添加新的工作线程,如果可以,还需要在获取到锁之后再作一次检验,防止在等待锁的时候线程池关闭或者线程工厂失效,二次检验的思路很值得学习!然后将工作线程添加到线程池中,这里的线程池是使用HashSet实现的,最后在对工作者计数做相应的调整。  
5)isRunning(int c):传入ctl,判断线程池是否正在运行。  
6)remove(Runnable command):将任务从工作队列中移除,该任务将不会在执行。  
7)reject(Runnable command):  拒绝执行传入的任务。

然后再来看execute的源码就容易理解了。

public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();

        int c = ctl.get();
        //工作线程数<核心线程数
        if (workerCountOf(c) < corePoolSize) {
            //创建一个工作线程来执行该任务
            if (addWorker(command, true))
                return;
             //因为工作线程数已经更新,所以ctl的值要重新获取
            c = ctl.get();
        }
        //如果线程池正在运行并成功将任务添加到工作队列
        if (isRunning(c) && workQueue.offer(command)) {
            // 重新检测
            int recheck = ctl.get();
            // 如果线程池没有运行,则将任务从工作队列中移除(回滚入队)
            if (! isRunning(recheck) && remove(command))
                //拒绝处理该任务
                reject(command);
             //如果当前线程池的线程数量为0
            else if (workerCountOf(recheck) == 0)
                //创建新的工作线程,从工作队列中取出任务执行
                addWorker(nullfalse);
            // 否则,任务将添加进工作队列中等待被执行
        }
        // 如果创建新的工作线程返回false,表明线程池已经停止,或线程工厂无法创建线程,则拒绝处理该任务。
        else if (!addWorker(command, false))
            reject(command);
    }

在查看上面源码的时候,对addWorker方法中的其中两行代码还挺感兴趣的:

             w = new Worker(firstTask);
            final Thread t = w.thread;

不禁想知道这个Worker类到底是什么,里面是封装了线程?

Worker其实就是一个工作线程,它作为ThreadPoolExecutor的内部类存在,实现了Runnable接口。

重点来看它的构造函数:

Worker(Runnable firstTask) {
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);
        }

构造函数主要做的工作就是将外部传入的任务赋值给成员变量firstTask,然后通过线程工厂并传入自身作为线程任务来构建一个线程对象,并赋值给成员变量thread。

这里调用了threadFactory的newThread()方法来创建线程,其实内部源码也是通过new Thread()的方式创建的线程

创建了Worker之后,就是创建了一个工作线程,其中封装了线程对象和线程任务,在addWorker方法的后面,会得到Worker对象中的线程对象,并调用start()方法来启动该线程。

当该线程得到cpu时间切片之后,就执行run()方法,实际上调用了ThreadPoolExecutor的runWorker()方法。

而runWorker()方法通过获取当前运行的线程,也就是刚才启动的线程,来获得firstTask,然后在调用firstTask的run()方法来执行任务。所以经过这么多的操作,最终就是为了让线程池的线程执行到传入任务对象的run()方法。

当然,这个方法(runWork)中还有一些操作值得注意:  
1)当获取到这个工作线程中的firstTask之后,会立马将其设置为null,防止下一次重复执行;  


2)当该工作线程开始工作的时候,并不是只把传入的firstTask执行完就可以“休息”了,而是要不断地从工作队列中获取正在阻塞的工作任务来执行,所以源码的设计就是当该工作线程中包装的第一个任务执行完后,就从工作队列中取任务,直到getTask()方法返回为空,才执行processWorkerExit方法来结束当先线程的生命,从线程池中"消失"。  


3)在正式执行任务对象的run()方法之前和之后,分别调用了beforeExecute()方法和afterExecute()方法,这两个方法用于在任务前后执行一些自定义的操作,可以根据自己的需求扩展线程池的功能,比如获取当前运行的线程数量。这两个方法是空的,留给继承类去填充功能。

问题:线程池如何保持“活性”的?

我尝试写了一个demo,创建一个大小为2的线程池,然后执行一个任务,但是执行完任务后线程池并不会自动关闭,线程池是怎么设计到这一点的?我大概知道是线程阻塞,但是是在哪里阻塞了呢?我就开始看源码找啊。

就我的demo来说,只有一个线程任务,就叫任务A吧。所以在第一次执行execute方法的时候,线程池会创建一个把任务A当做为firstTask的工作线程,然后执行任务A,执行完之后,这个线程就会去工作队列中获取任务,但这时候的工作队列其实是空的,我想问题就应该在这里。

我认为,理想的状态下,线程在此时想要从工作队列获取任务的时候,由于工作队列为空而获取不到所以阻塞在这里,等待有新任务的添加。

事实是不是这样子的呢?看源码求证一下。

private Runnable getTask() {
        boolean timedOut = false// Did the last poll() time out?

        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            // 1,rs>=STOP,一定满足rs>=SHUTDOWN,表明不再接受新任务,也不处理队列中的任务
            // 2,rs>=SHUTDOWN,表明不再接受新的任务,但工作队列为空
            // 以上两种情况都会返回null ,让runWorker退出while循环,也就是当前线程结束了。
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }

            int wc = workerCountOf(c);

            // Are workers subject to culling?
            // 如果coreThread允许超时,那么超过corePoolSize的线程必定有超时
            // 一般来说,coreThread即使是空闲也不会被回收,只要超过corePoolSize的线程才会,所以allowCoreThreadTimeOut一般为false
            // 所以timed的值取决于wc > corePoolSize的值
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

            // 当前线程数 > 最大线程数 或者 当前线程超时
            // 当前线程数 > 1 或者 工作队列为空
            // 返回null结束当前线程
            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }

            //考虑coreThread不会超时的情况:
            //如果当前线程数超过corePoolSize,则以指定的超时时间从工作队列取
            //否则,核心线程直接以阻塞的形式从工作队列中取得任务(查看take()源码)
            try {
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if (r != null)
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }

从上面的源码分析可以知道,当第一次执行excute的方法的时候,wc<corePoolSize,所以会以阻塞的形式从工作队列中获取任务,但是当前的工作队列为空,所以就会一直等待,直到有任务添加进工作队列。

第二次调用execute方法的时候和第一种情况类似,都是会先创建一个新的线程执行传入的任务,执行完该任务后再去工作队列中取。根据take()的源码,上一个线程获得了队列的锁,正在阻塞等待任务的添加。而此时当前线程是因为调用take()方法获取不到锁而阻塞。

如果是第三次调用execute方法,会是什么情况呢?根据execute方法的源码可知,会将任务直接添加到工作队列中。关键代码:

//调用workQueue.offer(command)将任务添加至工作队列中
if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(nullfalse);
        }

问题:如何关闭线程池?

经过上一个问题的讨论,线程池保持存活的原因是因为线程池中有线程阻塞,如果线程池中没有线程池阻塞,线程池就会关闭。可以通过一个demo来佐证这个观点。

        // 创建一个线程池但不执行任何任务,线程池将并不会保持'活性'
        ExecutorService threadPool = Executors.newSingleThreadScheduledExecutor();

但是一旦线程池中有线程进入阻塞状态,该怎么关闭这个线程池呢?来看看ThreadPoolExecutor是怎么设计的。

来看看shutdown()方法的源码:

public void shutdown() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            // 检验调用者是否有许可关闭每一条线程
            checkShutdownAccess();
            // 将线程池状态修改为SHUTDOWN,如果至少是这个状态就直接返回
            advanceRunState(SHUTDOWN);
            // 顾名思义,中断空闲工作线程
            interruptIdleWorkers();
            onShutdown(); // hook for ScheduledThreadPoolExecutor
        } finally {
            mainLock.unlock();
        }
        // 尝试终止线程池
        tryTerminate();
    }

再来看看shutdownNow()方法的源码,跟shutdown方法很相似但还是有区别的:

public List<Runnable> shutdownNow() {
        List<Runnable> tasks;
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            checkShutdownAccess();
            // 将线程池的状态修改为STOP
            advanceRunState(STOP);
            // 中断所有工作线程
            interruptWorkers();
            // 排出工作队列中还未执行的任务
            tasks = drainQueue();
        } finally {
            mainLock.unlock();
        }
        tryTerminate();
        return tasks;
    }

看完上面两段代码,我只对三个方法比较感兴趣,分别是:interruptIdleWorkers()、interruptWorkers()、tryTerminate(),现在逐个再来看看源码。

1) interruptIdleWorkers()  其中调用的是有参数的interruptIdleWorkers()方法,所以具体来看这个方法的源码:

private void interruptIdleWorkers(boolean onlyOne) {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            // 通过遍历取出线程池的线程
            for (Worker w : workers) {
                Thread t = w.thread;
                // 判断线程没有被中断,而且可以获取到锁
                // w.tryLock()返回为true则代表能获取到锁,表明该工作线程处于空闲
                // 因为在runWorker方法中循环里,首先就需要执行w.lock(),所以可以获取到锁就表明该工作线程空闲
                if (!t.isInterrupted() && w.tryLock()) {
                    try {
                        // 中断该线程
                        t.interrupt();
                    } catch (SecurityException ignore) {
                    } finally {
                        w.unlock();
                    }
                }
                if (onlyOne)
                    break;
            }
        } finally {
            mainLock.unlock();
        }
    }

2) interruptWorkers() 这个方法就简单很多,因为少了判断,所以直接遍历就可以了。

private void interruptWorkers() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            for (Worker w : workers)
                w.interruptIfStarted();
        } finally {
            mainLock.unlock();
        }
    }


void interruptIfStarted() {
            Thread t;
            // state初始化为-1
            if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                }
            }
        }

3)tryTerminate() 这个方法会将线程池的状态设置为“终止”状态,并伴随着一些自定义的操作等。

final void tryTerminate() {
        for (;;) {
            int c = ctl.get();
            //1,线程池正在运行状态;
            //2, 线程池的状态至少为TIDYING,表明线程池已经停止或在停止了
            //3,线程池的状态为SHUTDOWN,而且工作队列不为空
            // 以上三种状态直接返回
            if (isRunning(c) ||
                runStateAtLeast(c, TIDYING) ||
                (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
                return;
            //工作线程不等于0,工作线程还没有关闭,则先关闭
            if (workerCountOf(c) != 0) { // Eligible to terminate
                interruptIdleWorkers(ONLY_ONE);
                return;
            }

            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                // 进入TIDYING状态
                if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                    try {
                        // 留给子类实现的方法
                        terminated();
                    } finally {
                        // 进入TERMINATED状态
                        ctl.set(ctlOf(TERMINATED, 0));
                        // 继续awaitTermination
                        termination.signalAll();
                    }
                    return;
                }
            } finally {
                mainLock.unlock();
            }
            // else retry on failed CAS
        }
    }

当执行完t.interupt()方法后,正在阻塞的线程因为中断信号而中断,所以提前结束runWorker方法中的while循环,进入processWorkerExit()方法,中断该线程。

所以线程池的关闭过程大致就是让阻塞的线程中断,然后将线程池的状态修改为TERMINATED来表示线程池已经终止。


关于线程池的知识点,以前也只是了解了概念,并没有深入去学习。现在试着去阅读源码,发现还是可以读懂的

通过阅读源码可以学习到线程池的工作原理,也能见识到很多前辈的牛逼设计。深入阅读源码确实需要花时间,我就暂且阅读到这里啦,以后能力更强的时候再去阅读兴许收获又不一样。

对了,上面的内容只是作为我的阅读笔记,如有错误,欢迎指正!



以上是关于线程池相关源码阅读笔记 | 手记的主要内容,如果未能解决你的问题,请参考以下文章

阅读JDK源码后,我有了优化它的冲动!

Java线程池源码阅读

JDK线程池框架Executor源码阅读

源码阅读:从三个重量级的开源框架中看线程池的应用(redisnginxskynet)

newCacheThreadPool()newFixedThreadPool()newScheduledThreadPool()newSingleThreadExecutor()自定义线程池(代码片段

Java线程池底层源码分享和相关面试题(持续更新)