ScheduledThreadPoolExecutor源码解析

Posted miaomiaoLoveCode

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了ScheduledThreadPoolExecutor源码解析相关的知识,希望对你有一定的参考价值。

ScheduledThreadPoolExecutor主要用来定期执行任务,或者是在给定的延迟之后运行任务。它的功能与Timer类似,但是比起Timer,ScheduledThreadPoolExecutor功能更强大,使用也更灵活。

ScheduledThreadPoolExecutor与Timer区别:

  • Timer对应单个后台线程,所有的任务都由同一个线程调度,因此所有的任务都是串行执行的,前一个任务的延迟或者异常都将会影响到之后的任务;
  • ScheduledThreadPoolExecutor对应多个后台线程,每一个调度的任务都将由线程池中的一个线程去执行,在同一时刻,任务并发执行,并且它们之间不会相互干扰。

ScheduledThreadPoolExecutor源码分析

在开始分析源码具体实现之前,先给一个简单的ScheduledThreadPoolExecutor使用案例:

1. ScheduledThreadPoolExecutor声明;
2. 调用scheduleAtFixedRate方法和scheduleWithFixedDelay方法提交定时任务task。

类声明

在看构造方法之前先来看看ScheduledThreadPoolExecutor类声明:

从ScheduledThreadPoolExecutor类声明可以看出:

  • ScheduledThreadPoolExecutor是ThreadPoolExecutor的子类,并且实现了接口ScheduledExecutorService;
  • ScheduledThreadPoolExecutor是另外一种线程池,它同ThreadPoolExecutor拥有相同的特性,但是又略有不同,具体的不同之处会在后文做详细的介绍。
构造方法

ScheduledThreadPoolExecutor提供3个构造方法以供使用者使用:

从构造方法可以看出,ScheduledThreadPoolExecutor使用DelayQueue来作为线程池的工作队列,由于DelayQueue是无界队列,根据线程池的工作原理,核心参数maximumPoolSize在ScheduledThreadPoolExecutor中是没有什么意义的。

总的来说,ScheduledThreadPoolExecutor为了实现周期性执行任务,对ThreadPoolExecutor做了以下改动:

  • 工作队列使用DelayQueue;
  • 任务提交之后统统都进工作队列;
  • 获取任务的方式改变,执行了任务之后,也增加了额外的处理,具体的改变后文会一一给出详细的分析。
任务提交与调度

ScheduledThreadPoolExecutor中最常用的任务提交的方法有两个:ScheduleAtFixedRate方法和ScheduleWithFixedDelay方法。

具体的执行流程如下:
1. 参数校验,不合法参数抛出异常;
2. 构造task;
3. 调用delayedExecute方法进行后续相关处理。

接下来我们先来分析ScheduledThreadPoolExecutor的调度任务的最小单位ScheduledFutureTask相关实现。

ScheduledFutureTask
- 成员变量
ScheduledFutureTask包含3个成员变量:

1. sequenceNumber:任务被添加到ScheduledThreadPoolExecutor中的序号;
2. time:任务将要被执行的具体时间;
3. period:任务执行的间隔周期。

ScheduledThreadPoolExecutor会把待执行的任务放到工作队列DelayQueue中,DelayQueue封装了一个PriorityQueue,PriorityQueue会对队列中的ScheduledFutureTask进行排序,具体的排序算法实现如下:

  • compareTo实现

    • 首先按照time排序,time小的排在前面,time大的排在后面;

    • 如果time相同,按照sequenceNumber排序,sequenceNumber小的排在前面,sequenceNumber大的排在后面,换句话说,如果两个task的执行时间相同,优先执行先提交的task。

  • 任务调度之run方法实现
    run方法是调度task的核心,task的执行实际上是run方法的执行。

    具体执行流程如下:

    • 步骤1:判断当前task是否可以执行,如果不能执行,调用cancel方法取消task执行,否则,跳转到步骤2;
    • 步骤2:判断当前task是否到达执行时间点,如果到达,执行该task,否则跳转到步骤3;
    • 步骤3:重置状态,计算任务下次执行时间,重新把任务添加到工作队列中,让该任务可重复执行。

看完task之后,我们接下看看看构造好task之后的delayedExecute方法相关实现。

* delayedExecute实现*

delayedExecute方法主要完成了这些操作:
1. 将task添加到工作队列;
2. 调用ensurePrestart()方法做预处理,该方法实现在线程池一文中做过详细分析在这里就不再做赘述了。

任务调度小结

到这里为止,ScheduledThreadPoolExecutor的task执行过程可以总结为下图:

  • 步骤1:线程1从工作队列DelayQueue中获取已到期的task;
  • 步骤2:线程1执行该task;
  • 步骤3:线程1修改ScheduledFutureTask的time变量为下次被执行的时间;
  • 步骤4:线程1将修改后的task重新放回DelayQueue中。

接下来我们来详细看看各个步骤DelayQueue具体相关实现。

任务获取 - take实现

具体执行步骤如下:
1. 获取Lock;
2. 从优先队列中获取任务:
- 步骤1:如果PriorityQueue为空,当前线程到Condition中等待,否则执行步骤2;
- 步骤2:如果PriorityQueue的第一个元素的时间比当前时间小,获取该任务,否则,线程到Condition中等待;
- 步骤3:获取任务成功后,如果PriorityQueue不为空,唤醒等待在Condition中的所有线程。
3. 释放Lock。

ScheduledThreadPoolExecutor的getTask()方法会无限循环获取task,直到线程从PriorityQueue中获取到一个元素后,才会退出无限循环。

任务添加 - add实现

add方法的核心是offer方法调用,我们来看看offer方法的具体实现:

具体执行步骤如下:
1. 获取Lock;
2. 添加任务:
- 向PriorityQueue添加任务;
- 如果待添加的任务是PriorityQueue的第一个元素,唤醒在Condition中等待的所有线程。
3. 释放Lock。

以上是关于ScheduledThreadPoolExecutor源码解析的主要内容,如果未能解决你的问题,请参考以下文章