线程池相关源码阅读笔记 | 手记
Posted Rinhowl
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了线程池相关源码阅读笔记 | 手记相关的知识,希望对你有一定的参考价值。
Sep 6 2018 | DELIBERATE PRATICE |
ABOUT THREAD POOL | 线程池 |
Tags | CODE | READING |
这几天断断续续看了线程池相关的源码,以下是我做的笔记,作为输出。
根据『由上而下』的学习原则,先来看看顶层接口Executor
.
一、Executor接口
这是一个线程池相关的顶层接口,里面只声明了一个方法:
public interface Executor {
/**
* Executes the given command at some time in the future. The command
* may execute in a new thread, in a pooled thread, or in the calling
* thread, at the discretion of the {@code Executor} implementation.
*
* @param command the runnable task
* @throws RejectedExecutionException if this task cannot be
* accepted for execution
* @throws NullPointerException if command is null
*/
void execute(Runnable command);
}
从方法名和参数就可以猜出来,这个方法就是用来执行传入的任务的。
那么下面来看看谁继承/实现了Executor这个接口。
二、ExecutorService接口
这个接口继承了Executor接口,追加了一些方法如:shutdown、submit、invokeAll、invokeAny。
那么这些方法都是干什么的呢?我粗略阅读源码,有如下大概的理解:
1)shutdown:关闭Executor ;
2)submit:提交任务并执行,但返回值Future是什么鬼?
3)invokeAll:执行给定的任务集,返回一个Future集合,其中包含所有完成或者超时到期的状态和结果。
4)invokeAny:传入任务集,如果有一个任务成功执行的话就返回结果,正常执行或者发生异常,未完成的任务都将被取消。
然后我就有一个问题,这个接口应该怎么被使用?
[注]:关于Future
:Executor就是Runnable和Callable的调度容器,Future就是对于具体的Runnable或者Callable任务的执行结果进行取消、查询是否完成、获取结果、设置结果操作。(有待学习!)
三、AbstractExecutorService抽象类
AbstractExecutorService实现了ExecutorService这个接口,这个抽象类除了实现了接口中的大部分方法外,还增加了newTaskFor这个方法,根据待执行任务和默认值创建一个FutureTask
,其实就是对Runnable实例进行再次包装,这个方法在类内部的被调用。
我看了一下submit方法的实现,还真有意思。先根据任务创建了一个FutureTask对象,然后调用继承字自接口Executor的execute方法执行FutureTask,最后返回这个FutureTask对象。然后我就有点纳闷了,execute方法不是还没有实现么?想到这个抽象类还只是实现了部分接口中的方法,所以就可以理解了,父类定义好执行任务的流程,具体是如何执行任务的,将留给子类去实现。
四、Executors类
这个类很独立,没有实现任何接口也没有继承自父类/父接口。值得关注的是里面的四个静态方法,分别是:
1) public static ExecutorService newFixedThreadPool(int nThreads):创建固定线程数量的线程池;
2) public static ExecutorService newSingleThreadExecutor():创建单一线程的线程池
3) public static ExecutorService newCachedThreadPool():创建可缓存的线程池
4) public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize):创建可以定时执行任务的线程池
拿newFixedThreadPool这个方法,来看看其中到底是如何创建一个线程池的。
在这个方法中,就是创建了一个ThreadPoolExecutor(线程池处理器)对象,这个对象的详解,请看第五节内容。
public static ExecutorService newFixedThreadPool(int nThreads) {
//构造一个核心线程数和最大线程数都是nThreads的线程池
//存活时间为0L,因为是固定大小,所以这个参数用不到
//时间单位设置为TimeUnit.MILLISECONDS
//工作队列为LinkedBlockingQueue
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
五、ThreadPoolExecutor类
这个类继承自AbstractExecutorService类,是一个具体用来实现任务的执行等相关操作的类。
直接看参数列表最多的构造函数:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
参数列表的参数都是什么意思呢?
corePoolSize:核心池的大小;
maximumPoolSize:线程池允许的最大线程数量;
keepAliveTime:线程存活时间;
unit:参数keepAliveTime的时间单位;
workQueue:工作队列,用来保存待执行的任务;
threadFactory:线程工厂,在Executor创建一个线程的时候使用到;
handler:处理策略,当任务执行被阻塞的时候,也就是工作队列的容量饱和的时候,调用该处理策略
下面,再来看看execute方法的实现。这个方法是在Executor接口中定义的,用来执行传入的任务,可能会创建一个新的线程也可能是使用线程池中的线程来执行。
通过看源码中的注释文档,这个方法的处理有三大步骤:
如果当前运行线程少于corePoolSize,便尝试创建新的线程来执行传入的任务。
如果任务成功添加到工作队列,那么仍然需要仔细检查是否应该添加一个线程(因为自上次检查后现有的线程已经死亡),或者自从进入此方法后池关闭了。 所以需要重新检查状态,如果线程池关闭了则需要回滚入队,或者如果线程池没有线程则创建新的线程来执行该任务。
如果不能添加任务到工作队列中,那么创建一个新的线程。如果失败,则表明线程池已经关闭或者工作队列已经饱和,因此拒绝该任务。
其中有几个方法值得关注:
1)ctl.get():返回ctl的值,ctl是控制状态的属性,封装了workCount和runState,是一个AtomicInteger类型的变量;
2)workerCountOf(int c): 传入ctl,获取当前运行线程总数workCount;
3)runStateOf(int c):传入ctl,获取当前线程池的状态,有Running,Shutdown,Stop,Tidying,Terminate五种状态;
4)addWorker(Runnable command, boolean core):添加工作线程来执行任务。首先根据当前线程池的状态和给定边界(核心池的大小或最大线程数量)判断是否可以添加新的工作线程,如果可以,还需要在获取到锁之后再作一次检验,防止在等待锁的时候线程池关闭或者线程工厂失效,二次检验的思路很值得学习!然后将工作线程添加到线程池中,这里的线程池是使用HashSet实现的,最后在对工作者计数做相应的调整。
5)isRunning(int c):传入ctl,判断线程池是否正在运行。
6)remove(Runnable command):将任务从工作队列中移除,该任务将不会在执行。
7)reject(Runnable command): 拒绝执行传入的任务。
然后再来看execute的源码就容易理解了。
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
//工作线程数<核心线程数
if (workerCountOf(c) < corePoolSize) {
//创建一个工作线程来执行该任务
if (addWorker(command, true))
return;
//因为工作线程数已经更新,所以ctl的值要重新获取
c = ctl.get();
}
//如果线程池正在运行并成功将任务添加到工作队列
if (isRunning(c) && workQueue.offer(command)) {
// 重新检测
int recheck = ctl.get();
// 如果线程池没有运行,则将任务从工作队列中移除(回滚入队)
if (! isRunning(recheck) && remove(command))
//拒绝处理该任务
reject(command);
//如果当前线程池的线程数量为0
else if (workerCountOf(recheck) == 0)
//创建新的工作线程,从工作队列中取出任务执行
addWorker(null, false);
// 否则,任务将添加进工作队列中等待被执行
}
// 如果创建新的工作线程返回false,表明线程池已经停止,或线程工厂无法创建线程,则拒绝处理该任务。
else if (!addWorker(command, false))
reject(command);
}
在查看上面源码的时候,对addWorker方法中的其中两行代码还挺感兴趣的:
w = new Worker(firstTask);
final Thread t = w.thread;
不禁想知道这个Worker类到底是什么,里面是封装了线程?
Worker其实就是一个工作线程,它作为ThreadPoolExecutor的内部类存在,实现了Runnable接口。
重点来看它的构造函数:
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
构造函数主要做的工作就是将外部传入的任务赋值给成员变量firstTask,然后通过线程工厂并传入自身作为线程任务来构建一个线程对象,并赋值给成员变量thread。
这里调用了threadFactory的newThread()方法来创建线程,其实内部源码也是通过new Thread()的方式创建的线程
创建了Worker之后,就是创建了一个工作线程,其中封装了线程对象和线程任务,在addWorker方法的后面,会得到Worker对象中的线程对象,并调用start()方法来启动该线程。
当该线程得到cpu时间切片之后,就执行run()方法,实际上调用了ThreadPoolExecutor的runWorker()方法。
而runWorker()方法通过获取当前运行的线程,也就是刚才启动的线程,来获得firstTask,然后在调用firstTask的run()方法来执行任务。所以经过这么多的操作,最终就是为了让线程池的线程执行到传入任务对象的run()方法。
当然,这个方法(runWork)中还有一些操作值得注意:
1)当获取到这个工作线程中的firstTask之后,会立马将其设置为null,防止下一次重复执行;
2)当该工作线程开始工作的时候,并不是只把传入的firstTask执行完就可以“休息”了,而是要不断地从工作队列中获取正在阻塞的工作任务来执行,所以源码的设计就是当该工作线程中包装的第一个任务执行完后,就从工作队列中取任务,直到getTask()方法返回为空,才执行processWorkerExit方法来结束当先线程的生命,从线程池中"消失"。
3)在正式执行任务对象的run()方法之前和之后,分别调用了beforeExecute()方法和afterExecute()方法,这两个方法用于在任务前后执行一些自定义的操作,可以根据自己的需求扩展线程池的功能,比如获取当前运行的线程数量。这两个方法是空的,留给继承类去填充功能。
问题:线程池如何保持“活性”的?
我尝试写了一个demo,创建一个大小为2的线程池,然后执行一个任务,但是执行完任务后线程池并不会自动关闭,线程池是怎么设计到这一点的?我大概知道是线程阻塞,但是是在哪里阻塞了呢?我就开始看源码找啊。
就我的demo来说,只有一个线程任务,就叫任务A吧。所以在第一次执行execute方法的时候,线程池会创建一个把任务A当做为firstTask的工作线程,然后执行任务A,执行完之后,这个线程就会去工作队列中获取任务,但这时候的工作队列其实是空的,我想问题就应该在这里。
我认为,理想的状态下,线程在此时想要从工作队列获取任务的时候,由于工作队列为空而获取不到所以阻塞在这里,等待有新任务的添加。
事实是不是这样子的呢?看源码求证一下。
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
// 1,rs>=STOP,一定满足rs>=SHUTDOWN,表明不再接受新任务,也不处理队列中的任务
// 2,rs>=SHUTDOWN,表明不再接受新的任务,但工作队列为空
// 以上两种情况都会返回null ,让runWorker退出while循环,也就是当前线程结束了。
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// Are workers subject to culling?
// 如果coreThread允许超时,那么超过corePoolSize的线程必定有超时
// 一般来说,coreThread即使是空闲也不会被回收,只要超过corePoolSize的线程才会,所以allowCoreThreadTimeOut一般为false
// 所以timed的值取决于wc > corePoolSize的值
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
// 当前线程数 > 最大线程数 或者 当前线程超时
// 当前线程数 > 1 或者 工作队列为空
// 返回null结束当前线程
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
//考虑coreThread不会超时的情况:
//如果当前线程数超过corePoolSize,则以指定的超时时间从工作队列取
//否则,核心线程直接以阻塞的形式从工作队列中取得任务(查看take()源码)
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
从上面的源码分析可以知道,当第一次执行excute的方法的时候,wc<corePoolSize,所以会以阻塞的形式从工作队列中获取任务,但是当前的工作队列为空,所以就会一直等待,直到有任务添加进工作队列。
第二次调用execute方法的时候和第一种情况类似,都是会先创建一个新的线程执行传入的任务,执行完该任务后再去工作队列中取。根据take()的源码,上一个线程获得了队列的锁,正在阻塞等待任务的添加。而此时当前线程是因为调用take()方法获取不到锁而阻塞。
如果是第三次调用execute方法,会是什么情况呢?根据execute方法的源码可知,会将任务直接添加到工作队列中。关键代码:
//调用workQueue.offer(command)将任务添加至工作队列中
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
问题:如何关闭线程池?
经过上一个问题的讨论,线程池保持存活的原因是因为线程池中有线程阻塞,如果线程池中没有线程池阻塞,线程池就会关闭。可以通过一个demo来佐证这个观点。
// 创建一个线程池但不执行任何任务,线程池将并不会保持'活性'
ExecutorService threadPool = Executors.newSingleThreadScheduledExecutor();
但是一旦线程池中有线程进入阻塞状态,该怎么关闭这个线程池呢?来看看ThreadPoolExecutor是怎么设计的。
来看看shutdown()方法的源码:
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 检验调用者是否有许可关闭每一条线程
checkShutdownAccess();
// 将线程池状态修改为SHUTDOWN,如果至少是这个状态就直接返回
advanceRunState(SHUTDOWN);
// 顾名思义,中断空闲工作线程
interruptIdleWorkers();
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
// 尝试终止线程池
tryTerminate();
}
再来看看shutdownNow()方法的源码,跟shutdown方法很相似但还是有区别的:
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
// 将线程池的状态修改为STOP
advanceRunState(STOP);
// 中断所有工作线程
interruptWorkers();
// 排出工作队列中还未执行的任务
tasks = drainQueue();
} finally {
mainLock.unlock();
}
tryTerminate();
return tasks;
}
看完上面两段代码,我只对三个方法比较感兴趣,分别是:interruptIdleWorkers()、interruptWorkers()、tryTerminate(),现在逐个再来看看源码。
1) interruptIdleWorkers() 其中调用的是有参数的interruptIdleWorkers()方法,所以具体来看这个方法的源码:
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 通过遍历取出线程池的线程
for (Worker w : workers) {
Thread t = w.thread;
// 判断线程没有被中断,而且可以获取到锁
// w.tryLock()返回为true则代表能获取到锁,表明该工作线程处于空闲
// 因为在runWorker方法中循环里,首先就需要执行w.lock(),所以可以获取到锁就表明该工作线程空闲
if (!t.isInterrupted() && w.tryLock()) {
try {
// 中断该线程
t.interrupt();
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
if (onlyOne)
break;
}
} finally {
mainLock.unlock();
}
}
2) interruptWorkers() 这个方法就简单很多,因为少了判断,所以直接遍历就可以了。
private void interruptWorkers() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers)
w.interruptIfStarted();
} finally {
mainLock.unlock();
}
}
void interruptIfStarted() {
Thread t;
// state初始化为-1
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
3)tryTerminate() 这个方法会将线程池的状态设置为“终止”状态,并伴随着一些自定义的操作等。
final void tryTerminate() {
for (;;) {
int c = ctl.get();
//1,线程池正在运行状态;
//2, 线程池的状态至少为TIDYING,表明线程池已经停止或在停止了
//3,线程池的状态为SHUTDOWN,而且工作队列不为空
// 以上三种状态直接返回
if (isRunning(c) ||
runStateAtLeast(c, TIDYING) ||
(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
return;
//工作线程不等于0,工作线程还没有关闭,则先关闭
if (workerCountOf(c) != 0) { // Eligible to terminate
interruptIdleWorkers(ONLY_ONE);
return;
}
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 进入TIDYING状态
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
// 留给子类实现的方法
terminated();
} finally {
// 进入TERMINATED状态
ctl.set(ctlOf(TERMINATED, 0));
// 继续awaitTermination
termination.signalAll();
}
return;
}
} finally {
mainLock.unlock();
}
// else retry on failed CAS
}
}
当执行完t.interupt()方法后,正在阻塞的线程因为中断信号而中断,所以提前结束runWorker方法中的while循环,进入processWorkerExit()方法,中断该线程。
所以线程池的关闭过程大致就是让阻塞的线程中断,然后将线程池的状态修改为TERMINATED来表示线程池已经终止。
关于线程池的知识点,以前也只是了解了概念,并没有深入去学习。现在试着去阅读源码,发现还是可以读懂的
通过阅读源码可以学习到线程池的工作原理,也能见识到很多前辈的牛逼设计。深入阅读源码确实需要花时间,我就暂且阅读到这里啦,以后能力更强的时候再去阅读兴许收获又不一样。
对了,上面的内容只是作为我的阅读笔记,如有错误,欢迎指正!
以上是关于线程池相关源码阅读笔记 | 手记的主要内容,如果未能解决你的问题,请参考以下文章
源码阅读:从三个重量级的开源框架中看线程池的应用(redisnginxskynet)
newCacheThreadPool()newFixedThreadPool()newScheduledThreadPool()newSingleThreadExecutor()自定义线程池(代码片段