浅谈ThreadPoolExecutor线程池底层源码
Posted 默辨
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了浅谈ThreadPoolExecutor线程池底层源码相关的知识,希望对你有一定的参考价值。
一、线程池基础知识
线程池具体使用细节不再本篇文章的讨论范围,想了解用法的请自行百度,这里仅展示一个线程池小Demo
该线程池的作用为:使用线程池执行100000个线程,线程任务实现的是callable接口,每个线程打印相应的逻辑,然后返回(此处线程池并没有对返回的结果进行接收)
public class TestThreadPool
public static void main(String[] args) throws ExecutionException, InterruptedException
CopyOnWriteArrayList<Future> retList = new CopyOnWriteArrayList<>();
List<Task> taskList = new ArrayList<>();
for (int i = 0; i < 100000; i++)
taskList.add(new Task(i));
ExecutorService threadPool = new ThreadPoolExecutor(
1,//核心的线程数量
3,//最大的线程数量
10,//等待一定事件后关闭最大线程
TimeUnit.MILLISECONDS,//等待时间的单位
new LinkedBlockingQueue<>(10),//创建一个队列
Executors.defaultThreadFactory(),//创建线程的线程工厂
new ThreadPoolExecutor.CallerRunsPolicy());// 拒绝策略
for (int i = 0; i < taskList.size(); i++)
threadPool.submit(taskList.get(i));
class Task implements Callable<Integer>
private Integer num;
public Task(Integer num)
this.num = num;
@Override
public Integer call() throws Exception
System.out.println("---uuu--" + num);
return num;
三大方法
//1.创建一个只有一个线程的线程池
ExecutorService threadPool = Executors.newSingleThreadExecutor();
//2.创建一个可伸缩的线程池
ExecutorService threadPool = Executors.newCachedThreadPool();
//3.创建一个指定最大数量的线程池
ExecutorService threadPool = Executors.newFixedThreadPool(3);
七大参数
ExecutorService threadPool = new ThreadPoolExecutor(
1,//核心的线程数量
3,//最大的线程数量
10,//等待一定事件后关闭最大线程
TimeUnit.MILLISECONDS,//等待时间的单位
new LinkedBlockingQueue<>(10),//创建一个队列,存放任务
Executors.defaultThreadFactory(),//创建线程的线程工厂
new ThreadPoolExecutor.CallerRunsPolicy());// 拒绝策略
四种拒绝策略
//多出来的线程,直接抛出异常
new ThreadPoolExecutor.AbortPolicy()
//谁开启的这个线程,就让这个线程返回给谁执行。比如main线程开启的,那就返回给main线程执行
new ThreadPoolExecutor.CallerRunsPolicy()
//如果队列线程数量满了以后,直接丢弃,不抛出异常
new ThreadPoolExecutor.DiscardPolicy()
//队列满了以后,尝试去和最早的线程竞争,也不会抛出异常
new ThreadPoolExecutor.DiscardOldestPolicy
二、execute流程简单分析
1、调用execute方法,submit就是在execute的方法上套了一层RunnableFuture。(下面代码中的workQueue就是我们初始化线程池的时候传入的队列)
2、addWorker方法。该方法主要看关注传入的参数firskTask,即我们传入的线程任务,也就是execute方法的参数
3、它将我们传入的线程封装为一个worker对象。将线程任务firskTask赋值给firstTask属性,将worker对象赋值给thread属性。(对应的getThreadFactory方法就是调用我们初始化线程池的时候的线程工厂去创建)
4、回到第二步的代码中,除了将我们的线程任务添加到workers集合中,还调用了t变量的start方法。t变量就是第三步的thread变量,即使我们的worker对象
5、调用worker对象的start方法。由于Worker实现了Runnable接口,所以start方法对应的就是调用Runnable接口的run方法
6、然后调用对应的runWorker方法。在该方法中,拿到worker对象的firstTask属性,即我们前文execute方法的参数,也即是我们的线程任务。然后调用线程任务的run 方法
简单理一下这个流程,我们可以得到如下结论:
- 当我们使用线程池执行线程任务的时候,我们执行线程的时候调用的使run方法,并非调用start方法的形式来开启多线程
- 线程池将我们传入的线程任务封装成一个worker对象,在这个过程中是有调用这个封装的worker对象的start方法
三、线程出现异常,后续流程源码
1、还是以execute方法为入口,然后直接跳转到上面第二小节的第六点,进入runWorker方法。run方法发生异常,会进入进入后面的processWorkerExit方法。传入的参数w是worker对象
2、在该方法中,首先将worker对象从workers队列中移除(前面有说到workers这个队列中存放的是所有任务),然后添加一个firstTask为null的任务。这里又会调用addWorker方法,但是由于firstTask是null,所以就没有对应的线程任务可以执行
总结:即如果使用线程池在执行多个线程任务的时候,其中一个线程发生异常。那么线程池会捕获这个发生异常的线程,然后将这个线程从线程任务队列中移除(workers)。再添加一个任务为null的线程任务,再执行这个null任务。也可以变相的理解为我们要执行的线程任务,被丢弃了。
四、拒绝策略的源码
我相信你在看了源码之后,就能够很简单的记住,它们每种拒绝策略的作用
使用拒绝策略的时候会调用这个reject方法,对应的handler在初始化线程池的时候会传入具体的实现
AbortPolicy:直接抛出异常
public void rejectedExecution(Runnable r, ThreadPoolExecutor e)
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
CallerRunsPolicy:交由调用它的线程执行
public void rejectedExecution(Runnable r, ThreadPoolExecutor e)
if (!e.isShutdown())
r.run();
DiscardOldestPolicy:弹出队列(poll)中的其他线程任务
public void rejectedExecution(Runnable r, ThreadPoolExecutor e)
if (!e.isShutdown())
e.getQueue().poll();
e.execute(r);
DiscardPolicy:什么操作都不执行
public void rejectedExecution(Runnable r, ThreadPoolExecutor e)
自定义拒绝策略
自定义拒绝策略只需要实现RejectedExecutionHandler接口,再重写rejectedExecution方法即可。我们可以将任务写到类似mq的中间件,或者持久化到数据库。再编写一个线程去不断监控线程池的队列情况,如果发现队列中任务下降到一定的阈值,那么我们就可以读取之前任务,将他们再次存放到队列中。
以上是关于浅谈ThreadPoolExecutor线程池底层源码的主要内容,如果未能解决你的问题,请参考以下文章
高并发中,那些不得不说的线程池与ThreadPoolExecutor类