ThreadPoolExecutor任务提交过程源码浅析

Posted lay2017

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了ThreadPoolExecutor任务提交过程源码浅析相关的知识,希望对你有一定的参考价值。

线程池是一种重复利用既有线程的池化技术 ,它大量减少了线程的创建初始化过程,也可以防止海量线程创建占尽资源的风险。

任务提交过程

学习使用线程池的使用,我们都大概知道这样一个过程,如图:

技术图片

这个是一个Runnable实例提交到线程池的过程,大体分为4个步骤:

1)判断当前线程数量是否小于核心线程数量,如果小于则创建一个新的线程去执行该任务;

2)如果线程数已经超过了核心线程数,那么就提交到等待工作队列(等待队列的任务将会被既有的线程获取并处理)。

3)如果等待队列已经满了,无法再提交任务,那么将会判断当前线程数是否超过最大线程数。如果没有超过,那么将创建一个新的线程去执行该任务。

4)如果线程数已经超过了最大线程数,那么将调用拒绝策略的处理器(默认处理器实现是抛出一个拒绝的异常)。

DEMO入口

下面,我们以一个DEMO作为入口,了解一下ThreadPoolExecutor的任务提交过程源码实现,如下:

技术图片

DEMO中,第一行代码创建了一个ThreadPoolExecutor实例对象,构造过程传入了几个参数:

1)corePoolSize:核心线程数,一般线程池中存活的最大线程,即使闲置也不会被回收;

2)maximumPoolSize:最大线程数,线程池中允许最大存活的线程数量;

3)keepAliveTime:闲置线程的存活时间,如果没有工作任务,超过存活时间后闲置线程将会被回收;

4)TimeUnit:存活时间的单位;

5)LinkedBlockQueue:一个阻塞队列,存放待处理任务;

ThreadPoolExecutor构造方法源码

我们点开ThreadPoolExecutor实例的构造方法,代码如下:

技术图片

我们看到,该构造方法调用了另外一个构造方法,同时参数增加了两个,一个是ThreadFactory的实例(用于创建Thread对象),另一个是RejectExecutionHandler的实例(拒绝任务提交时调用的处理器)。点击进入另一个构造方法,如:

技术图片

这里一开始进行了参数值校验,后面的部分就是给成员变量赋值(keepAliveTime转成成了纳秒),构造方法即执行结束了。

 submit提交方法的源码

 接下来我们看看submit提交任务的源码,如:

技术图片

submit方法内,显示将Runnable实例包装成了一个RunnableFuture(该接口继承了Runnable和Future)的实现类FutureTask的实例,执行FutureTask也就会调用Runnable。

下面,我们看看execute这个核心方法,点击进入该方法

技术图片

注意:ThreadPoolExecutor通过一个AtomicInteger类型的ctl变量存储了runState(线程池状态)和workerCount(线程数)两个变量。具体实现可以看另外一个博文说明:https://www.cnblogs.com/lay2017/p/10946928.html

execute代码的核心步骤如下:

1、获得workerCount,判断workerCount是否小于corePoolSize,如果小于那么调用addWorker方法添加一个worker(这里的worker可以暂时理解为线程,后面我们会打开addWorker方法看看)执行该任务。

2、如果workerCount大于等于corePoolSize,或者addWorker方法添加worker失败(失败会重新获取ctl值),那么进入第二重判断。

3、第二重判断先是判断了线程池的状态,如果不是运行中则进行第三重判断,如果是运行中,那么尝试着吧任务提交到workQueue(工作队列中),如果提交任务失败也一样进入第三重判断。如果提交成功了,那么会再获取一次ctl并进行二次校验线程池状态(因为在offer到队列以后有可能线程池shutdown了),如果不是running状态,那么将会从队列中移除并调用拒绝策略。如果还是运行状态,但是当前线程数为0,那么为了把已经提交的任务处理掉会调用addWorker启动一个新的线程确保该任务有线程会处理。

4、在第二重判断中,如果线程池不是running状态或者提交到workQueue失败都会进入第三重判断。第三重判断中,如果不是running状态addWorker方法不会调用成功则直接调用reject方法,该方法将调用RejectExecutionHandler处理策略。如果是因为workQueue提交失败,如果线程数不大于MaximumPoolSize的话,addWorker方法将尝试创建一个新的线程去处理,否则调用reject拒绝任务提交。

 我们看到,execute方法的执行逻辑基本和我们一开始所说的任务提交过程图是一致的。

addWorker方法添加一个线程

 我们通过execute方法的逻辑大体了解了任务提交的源码过程,下面我们打开addWorker方法看看如何添加一个线程的。

技术图片

技术图片

技术图片

addWorker方法主要分为两步:

1、在for循环中通过CAS乐观锁把workerCount数量+1

2、线程数增加成功以后,再实际地去创建一个worker实例对象,worker对象中包含任务对象和一个Thread线程对象。worker对象构建完成以后添加到WorkerSet集合容器里面,并调用Thread的start方法,当Thread执行以后会调用worker中的runWorker方法去处理任务(如果当前任务处理完了,会持续不断地从workQueue中获取任务并处理)。

总结

线程池的设计非常地面向对象,ThreadPoolExecutor就像是一个工厂的车间,这个车间里面有固定的几个工作人员(worker)和一个工作台(workQueue)。worker们会从workQueue上获取任务(RunnableFuture)并处理该任务昼夜不歇。如果workQueue上的任务满了,那么会招来更多地worker(创建新的Thread)来帮助处理任务。如果workQueue上的任务处理完了,大家闲在那里无所事事的话多余的worker就会离开(GC回收)留下几个worker驻守在那里(corePoolSize限定个数)等待新的任务来临。

 

以上是关于ThreadPoolExecutor任务提交过程源码浅析的主要内容,如果未能解决你的问题,请参考以下文章

JUC并发编程 共享模式之工具 线程池 JDK 提供的线程池工具类 -- ThreadPoolExecutor(提交任务: submitexecute invokAllinvokeAny)

Java线程池:ThreadPoolExecutor

Java线程池:ThreadPoolExecutor

线程池ThreadPoolExecutor工作原理

ThreadPoolExecutor异常处理

python多线程并行计算通过向线程池ThreadPoolExecutor提交任务的实现方法