ScheduledThreadPoolExecutor源码解析
Posted miaomiaoLoveCode
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了ScheduledThreadPoolExecutor源码解析相关的知识,希望对你有一定的参考价值。
ScheduledThreadPoolExecutor主要用来定期执行任务,或者是在给定的延迟之后运行任务。它的功能与Timer类似,但是比起Timer,ScheduledThreadPoolExecutor功能更强大,使用也更灵活。
ScheduledThreadPoolExecutor与Timer区别:
- Timer对应单个后台线程,所有的任务都由同一个线程调度,因此所有的任务都是串行执行的,前一个任务的延迟或者异常都将会影响到之后的任务;
- ScheduledThreadPoolExecutor对应多个后台线程,每一个调度的任务都将由线程池中的一个线程去执行,在同一时刻,任务并发执行,并且它们之间不会相互干扰。
ScheduledThreadPoolExecutor源码分析
在开始分析源码具体实现之前,先给一个简单的ScheduledThreadPoolExecutor使用案例:
1. ScheduledThreadPoolExecutor声明;
2. 调用scheduleAtFixedRate方法和scheduleWithFixedDelay方法提交定时任务task。
类声明
在看构造方法之前先来看看ScheduledThreadPoolExecutor类声明:
从ScheduledThreadPoolExecutor类声明可以看出:
- ScheduledThreadPoolExecutor是ThreadPoolExecutor的子类,并且实现了接口ScheduledExecutorService;
- ScheduledThreadPoolExecutor是另外一种线程池,它同ThreadPoolExecutor拥有相同的特性,但是又略有不同,具体的不同之处会在后文做详细的介绍。
构造方法
ScheduledThreadPoolExecutor提供3个构造方法以供使用者使用:
从构造方法可以看出,ScheduledThreadPoolExecutor使用DelayQueue来作为线程池的工作队列,由于DelayQueue是无界队列,根据线程池的工作原理,核心参数maximumPoolSize在ScheduledThreadPoolExecutor中是没有什么意义的。
总的来说,ScheduledThreadPoolExecutor为了实现周期性执行任务,对ThreadPoolExecutor做了以下改动:
- 工作队列使用DelayQueue;
- 任务提交之后统统都进工作队列;
- 获取任务的方式改变,执行了任务之后,也增加了额外的处理,具体的改变后文会一一给出详细的分析。
任务提交与调度
ScheduledThreadPoolExecutor中最常用的任务提交的方法有两个:ScheduleAtFixedRate方法和ScheduleWithFixedDelay方法。
具体的执行流程如下:
1. 参数校验,不合法参数抛出异常;
2. 构造task;
3. 调用delayedExecute方法进行后续相关处理。
接下来我们先来分析ScheduledThreadPoolExecutor的调度任务的最小单位ScheduledFutureTask相关实现。
ScheduledFutureTask
- 成员变量
ScheduledFutureTask包含3个成员变量:
1. sequenceNumber:任务被添加到ScheduledThreadPoolExecutor中的序号;
2. time:任务将要被执行的具体时间;
3. period:任务执行的间隔周期。
ScheduledThreadPoolExecutor会把待执行的任务放到工作队列DelayQueue中,DelayQueue封装了一个PriorityQueue,PriorityQueue会对队列中的ScheduledFutureTask进行排序,具体的排序算法实现如下:
compareTo实现
首先按照time排序,time小的排在前面,time大的排在后面;
如果time相同,按照sequenceNumber排序,sequenceNumber小的排在前面,sequenceNumber大的排在后面,换句话说,如果两个task的执行时间相同,优先执行先提交的task。
任务调度之run方法实现
run方法是调度task的核心,task的执行实际上是run方法的执行。
具体执行流程如下:- 步骤1:判断当前task是否可以执行,如果不能执行,调用cancel方法取消task执行,否则,跳转到步骤2;
- 步骤2:判断当前task是否到达执行时间点,如果到达,执行该task,否则跳转到步骤3;
- 步骤3:重置状态,计算任务下次执行时间,重新把任务添加到工作队列中,让该任务可重复执行。
看完task之后,我们接下看看看构造好task之后的delayedExecute方法相关实现。
* delayedExecute实现*
delayedExecute方法主要完成了这些操作:
1. 将task添加到工作队列;
2. 调用ensurePrestart()方法做预处理,该方法实现在线程池一文中做过详细分析在这里就不再做赘述了。
任务调度小结
到这里为止,ScheduledThreadPoolExecutor的task执行过程可以总结为下图:
- 步骤1:线程1从工作队列DelayQueue中获取已到期的task;
- 步骤2:线程1执行该task;
- 步骤3:线程1修改ScheduledFutureTask的time变量为下次被执行的时间;
- 步骤4:线程1将修改后的task重新放回DelayQueue中。
接下来我们来详细看看各个步骤DelayQueue具体相关实现。
任务获取 - take实现
具体执行步骤如下:
1. 获取Lock;
2. 从优先队列中获取任务:
- 步骤1:如果PriorityQueue为空,当前线程到Condition中等待,否则执行步骤2;
- 步骤2:如果PriorityQueue的第一个元素的时间比当前时间小,获取该任务,否则,线程到Condition中等待;
- 步骤3:获取任务成功后,如果PriorityQueue不为空,唤醒等待在Condition中的所有线程。
3. 释放Lock。
ScheduledThreadPoolExecutor的getTask()方法会无限循环获取task,直到线程从PriorityQueue中获取到一个元素后,才会退出无限循环。
任务添加 - add实现
add方法的核心是offer方法调用,我们来看看offer方法的具体实现:
具体执行步骤如下:
1. 获取Lock;
2. 添加任务:
- 向PriorityQueue添加任务;
- 如果待添加的任务是PriorityQueue的第一个元素,唤醒在Condition中等待的所有线程。
3. 释放Lock。
以上是关于ScheduledThreadPoolExecutor源码解析的主要内容,如果未能解决你的问题,请参考以下文章