ThreadPoolExecutor任务提交过程源码浅析
Posted lay2017
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了ThreadPoolExecutor任务提交过程源码浅析相关的知识,希望对你有一定的参考价值。
线程池是一种重复利用既有线程的池化技术 ,它大量减少了线程的创建初始化过程,也可以防止海量线程创建占尽资源的风险。
任务提交过程
学习使用线程池的使用,我们都大概知道这样一个过程,如图:
这个是一个Runnable实例提交到线程池的过程,大体分为4个步骤:
1)判断当前线程数量是否小于核心线程数量,如果小于则创建一个新的线程去执行该任务;
2)如果线程数已经超过了核心线程数,那么就提交到等待工作队列(等待队列的任务将会被既有的线程获取并处理)。
3)如果等待队列已经满了,无法再提交任务,那么将会判断当前线程数是否超过最大线程数。如果没有超过,那么将创建一个新的线程去执行该任务。
4)如果线程数已经超过了最大线程数,那么将调用拒绝策略的处理器(默认处理器实现是抛出一个拒绝的异常)。
DEMO入口
下面,我们以一个DEMO作为入口,了解一下ThreadPoolExecutor的任务提交过程源码实现,如下:
DEMO中,第一行代码创建了一个ThreadPoolExecutor实例对象,构造过程传入了几个参数:
1)corePoolSize:核心线程数,一般线程池中存活的最大线程,即使闲置也不会被回收;
2)maximumPoolSize:最大线程数,线程池中允许最大存活的线程数量;
3)keepAliveTime:闲置线程的存活时间,如果没有工作任务,超过存活时间后闲置线程将会被回收;
4)TimeUnit:存活时间的单位;
5)LinkedBlockQueue:一个阻塞队列,存放待处理任务;
ThreadPoolExecutor构造方法源码
我们点开ThreadPoolExecutor实例的构造方法,代码如下:
我们看到,该构造方法调用了另外一个构造方法,同时参数增加了两个,一个是ThreadFactory的实例(用于创建Thread对象),另一个是RejectExecutionHandler的实例(拒绝任务提交时调用的处理器)。点击进入另一个构造方法,如:
这里一开始进行了参数值校验,后面的部分就是给成员变量赋值(keepAliveTime转成成了纳秒),构造方法即执行结束了。
submit提交方法的源码
接下来我们看看submit提交任务的源码,如:
submit方法内,显示将Runnable实例包装成了一个RunnableFuture(该接口继承了Runnable和Future)的实现类FutureTask的实例,执行FutureTask也就会调用Runnable。
下面,我们看看execute这个核心方法,点击进入该方法
注意:ThreadPoolExecutor通过一个AtomicInteger类型的ctl变量存储了runState(线程池状态)和workerCount(线程数)两个变量。具体实现可以看另外一个博文说明:https://www.cnblogs.com/lay2017/p/10946928.html
execute代码的核心步骤如下:
1、获得workerCount,判断workerCount是否小于corePoolSize,如果小于那么调用addWorker方法添加一个worker(这里的worker可以暂时理解为线程,后面我们会打开addWorker方法看看)执行该任务。
2、如果workerCount大于等于corePoolSize,或者addWorker方法添加worker失败(失败会重新获取ctl值),那么进入第二重判断。
3、第二重判断先是判断了线程池的状态,如果不是运行中则进行第三重判断,如果是运行中,那么尝试着吧任务提交到workQueue(工作队列中),如果提交任务失败也一样进入第三重判断。如果提交成功了,那么会再获取一次ctl并进行二次校验线程池状态(因为在offer到队列以后有可能线程池shutdown了),如果不是running状态,那么将会从队列中移除并调用拒绝策略。如果还是运行状态,但是当前线程数为0,那么为了把已经提交的任务处理掉会调用addWorker启动一个新的线程确保该任务有线程会处理。
4、在第二重判断中,如果线程池不是running状态或者提交到workQueue失败都会进入第三重判断。第三重判断中,如果不是running状态addWorker方法不会调用成功则直接调用reject方法,该方法将调用RejectExecutionHandler处理策略。如果是因为workQueue提交失败,如果线程数不大于MaximumPoolSize的话,addWorker方法将尝试创建一个新的线程去处理,否则调用reject拒绝任务提交。
我们看到,execute方法的执行逻辑基本和我们一开始所说的任务提交过程图是一致的。
addWorker方法添加一个线程
我们通过execute方法的逻辑大体了解了任务提交的源码过程,下面我们打开addWorker方法看看如何添加一个线程的。
addWorker方法主要分为两步:
1、在for循环中通过CAS乐观锁把workerCount数量+1
2、线程数增加成功以后,再实际地去创建一个worker实例对象,worker对象中包含任务对象和一个Thread线程对象。worker对象构建完成以后添加到WorkerSet集合容器里面,并调用Thread的start方法,当Thread执行以后会调用worker中的runWorker方法去处理任务(如果当前任务处理完了,会持续不断地从workQueue中获取任务并处理)。
总结
线程池的设计非常地面向对象,ThreadPoolExecutor就像是一个工厂的车间,这个车间里面有固定的几个工作人员(worker)和一个工作台(workQueue)。worker们会从workQueue上获取任务(RunnableFuture)并处理该任务昼夜不歇。如果workQueue上的任务满了,那么会招来更多地worker(创建新的Thread)来帮助处理任务。如果workQueue上的任务处理完了,大家闲在那里无所事事的话多余的worker就会离开(GC回收)留下几个worker驻守在那里(corePoolSize限定个数)等待新的任务来临。
以上是关于ThreadPoolExecutor任务提交过程源码浅析的主要内容,如果未能解决你的问题,请参考以下文章
JUC并发编程 共享模式之工具 线程池 JDK 提供的线程池工具类 -- ThreadPoolExecutor(提交任务: submitexecute invokAllinvokeAny)