Executor Framework分析 ScheduledThreadPoolExecutor
Posted ZhangJianIsAStark
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Executor Framework分析 ScheduledThreadPoolExecutor相关的知识,希望对你有一定的参考价值。
本篇博客分析下ScheduledThreadPoolExecutor的源码。
ScheduledThreadPoolExecutor继承ThreadPoolExecutor,
实现ScheduledExecutorService接口。
在普通线程池的基础上,增加了延迟、周期性执行任务等能力。
一、schedule
我们先来看一个延迟执行任务的接口源码:
public ScheduledFuture<?> schedule(Runnable command,
long delay,
TimeUnit unit)
if (command == null || unit == null)
throw new NullPointerException();
//decorateTask函数,默认返回传入的参数ScheduledFutureTask
//这个可以由子类复写,实现特定的需求
RunnableScheduledFuture<Void> t = decorateTask(command,
//ScheduledFutureTask为ScheduledThreadPoolExecutor中定义的内部类
new ScheduledFutureTask<Void>(command, null,
triggerTime(delay, unit),
sequencer.getAndIncrement()));
//延迟执行
delayedExecute(t);
return t;
我们再来看一个周期性执行任务的接口源码:
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit)
if (command == null || unit == null)
throw new NullPointerException();
if (period <= 0L)
throw new IllegalArgumentException();
//同样创建一个ScheduledFutureTask, 只不过参数多了一些
ScheduledFutureTask<Void> sft =
new ScheduledFutureTask<Void>(command,
null,
triggerTime(initialDelay, unit),
unit.toNanos(period),
sequencer.getAndIncrement());
//也调用一下decorateTask
RunnableScheduledFuture<Void> t = decorateTask(command, sft);
sft.outerTask = t;
//最后延迟执行
delayedExecute(t);
return t;
通过对比schedule和scheduleAtFixedRate,不难发现:两个函数的实现几乎一致,
同样是利用参数构建出ScheduledFutureTask,然后调用delayedExecute延迟处理。
不同的地方是:两个函数构建ScheduledFutureTask时的参数有差别,
scheduleAtFixedRate构建ScheduledFutureTask时,多传入了一个参数作为周期。
二、delayedExecute
现在我们继续跟进delayedExecute函数:
private void delayedExecute(RunnableScheduledFuture<?> task)
//shutdown时,拒绝task
if (isShutdown())
reject(task);
else
//将任务加入到队列中
super.getQueue().add(task);
//加入队列后,需再次判断是否shutdown
if (isShutdown() &&
//判断shutdown时,是否运行周期或延迟的任务
//默认情况下,shutdown后可以运行延迟执行的任务,不能再运行周期性任务
//可以通过接口改变默认行为
!canRunInCurrentRunState(task.isPeriodic()) &&
//无法执行任务时,则移除并取消
remove(task))
task.cancel(false);
else
//确保任务开始执行
ensurePrestart();
从上述代码来看,delayedExecute主要的作用是:
判断任务是否应该被加入到工作队列中,以及是否应该执行。
继续看一下ensurePrestart的代码:
void ensurePrestart()
int wc = workerCountOf(ctl.get());
if (wc < corePoolSize)
addWorker(null, true);
else if (wc == 0)
addWorker(null, false);
从代码不难看出,ensurePrestart原来就是在满足条件的情况下,创建工作线程Worker。
根据前面博客Executor Framework分析 (二) ThreadPoolExecutor主要参数分析,
我们知道addWorker成功后,Worker线程就会启动,然后源源不断地从WorkQueue取出需要执行的任务,并调用对应的run方法。
三、add、take与poll
根据前文我们知道,当任务到来时,将调用add方法,将任务添加到线程池对应的WorkQueue中。
同时,根据博客Executor Framework分析 (二) ThreadPoolExecutor主要参数分析,
我们知道从WorkQueue中取出要执行的任务时,需要考虑线程池是否设置了生存时间。
如果设置了生存时间,将会调用WorkQueue的带超时时间的poll函数;
如果未设置生存时间,将会调用WorkQueue的take函数。
对于ScheduledThreadPoolExecutor而言,
对应的WorkQueue默认为内部定义的DelayedWorkQueue,
我们看一下其中一个构造函数的实现:
public ScheduledThreadPoolExecutor(int corePoolSize,
RejectedExecutionHandler handler)
super(corePoolSize, Integer.MAX_VALUE,
DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
//WorkQueue指定为DelayedWorkQueue
new DelayedWorkQueue(), handler);
因此,这部分我们来看看DelayedWorkQueue实现的add、take与poll方法。
3.1 add
public boolean add(Runnable e)
//实际上是offer函数在起作用
return offer(e);
public boolean offer(Runnable x)
if (x == null)
throw new NullPointerException();
RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x;
final ReentrantLock lock = this.lock;
lock.lock();
try
//size为队列当前保存数据的数量
int i = size;
//queue.length为当前的容量
if (i >= queue.length)
//超出容量时,增加
grow();
//递增数量
size = i + 1;
//第一个任务,直接加入
if (i == 0)
queue[0] = e;
setIndex(e, 0);
else
//将任务插入到合适的位置
siftUp(i, e);
//这里处理的情况类似于:
//take获取task时, 队列为空,于是阻塞
//新的任务加入后,就会通知,解除阻塞
//同时这里也涉及到Leader-Follower模式
//后面分析take函数时,再分析
if (queue[0] == e)
leader = null;
available.signal();
finally
lock.unlock();
return true;
上面的代码比较容易理解,唯一需要进一步分析的应该就是siftUp:
//k记录最后存储的位置
private void siftUp(int k, RunnableScheduledFuture<?> key)
while (k > 0)
//相当于折半
int parent = (k - 1) >>> 1;
//获取折半序号对应的任务
RunnableScheduledFuture<?> e = queue[parent];
//compareTo比的应该是triggerTime
//key的时间大于parent,则找到对应的位置,结束循环
if (key.compareTo(e) >= 0)
break;
//若key的时间小parent,则将parent放到当前的位置k
queue[k] = e;
setIndex(e, k);
//更新k值
k = parent;
//找到最终的k值后,保存key
queue[k] = key;
setIndex(key, k);
可以看出siftUp函数实际上就是按照时间顺序,
将新加入的事件尽可能地插入到队列中合适的位置。
不过从上述代码来看,调用siftUp后,
队列中事件的并不是严格的按时间顺序排列。
在这一部分的最后,我们看看ScheduledFutureTask对应的compareTo函数:
public int compareTo(Delayed other)
if (other == this) // compare zero if same object
return 0;
if (other instanceof ScheduledFutureTask)
ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
//time记录的就是ScheduledFutureTask的triggerTime
long diff = time - x.time;
//当前对象的time小于比较的对象时,返回-1
//与上文的分析一致
if (diff < 0)
return -1;
else if (diff > 0)
return 1;
//时间相等时,按sequenceNumber来比较
//在ScheduledThreadPoolExecutor中,后进入的Task的sequenceNumber较大
//即时间相等时,按FIFO顺序处理
else if (sequenceNumber < x.sequenceNumber)
return -1;
else
return 1;
//非ScheduledFutureTask之间的比较时,仅将triggerTime作为依据
long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
3.2 take
了解完add函数后,我们来看看take函数的源码:
public RunnableScheduledFuture<?> take() throws InterruptedException
//靠DelayedQueue中的锁来支撑并发
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try
for (;;)
//取出当前队列的第一个任务
RunnableScheduledFuture<?> first = queue[0];
//当前没有任务,一直await
//直到前文的add加入新的任务
if (first == null)
available.await();
else
//有任务需要处理时,获取对应的等待时间
//即triggerTime与当前时间的差值
long delay = first.getDelay(NANOSECONDS);
//到了执行时间,则调用finishPoll,返回任务
if (delay <= 0L)
return finishPoll(first);
//以下处理未到执行时间的情况
first = null; // don't retain ref while waiting
//leader记录DelayedQueue中处理队列头部任务的线程
//采用Leader-Follower模式,减小不必要的竞争
if (leader != null)
//若已存在处理队列头部任务的线程,则当前线程一直等待
available.await();
else
//否则当前线程成为leader
Thread thisThread = Thread.currentThread();
leader = thisThread;
try
//等待delay时间自动唤醒
//或add时有新的任务被加入到队列头部
available.awaitNanos(delay);
finally
//被唤醒时,释放leader
//若等待delay被唤醒,则其它线程还处于await状态
//当前线程会调用finishPoll
//若新的任务被加入到队列头部,则会重新选择leader
if (leader == thisThread)
leader = null;
finally
//结束时,再必要时通知其它线程成为leader
if (leader == null && queue[0] != null)
available.signal();
lock.unlock();
take函数是一个阻塞函数,在DelayedQueue中,
队列头部任务的执行时间达到时,take才会返回结果。
由于DelayedQueue可能被并发访问,在take函数阻塞时,持有的锁将被释放掉,
导致多个线程可能同时take等待。
为此DelayedQueue引入了Leader-Follower模式。
DelayedQueue利用Leader记录处理队列头部任务的线程。
当某个线程成为Leader时,仅需要等待头部任务对应的delay时间,
但其它线程将无限等待,因此可以减少不必要的竞争开销。
当Leader线程返回结果后,则通知其它线程退出等待,重新竞争Leader。
当有新的任务被加入到队列头部时,旧有的leader就失效了,所有的线程重新竞争leader。
因此所有的线程在等待后,都有可能得到或失去leader权。
在这一部分的最后,看一下finishPoll函数:
//返回传入的参数
private RunnableScheduledFuture<?> finishPoll(RunnableScheduledFuture<?> f)
//由于队列头部的任务被取出
//因此需要重新调整队列中的数据
int s = --size;
RunnableScheduledFuture<?> x = queue[s];
queue[s] = null;
if (s != 0)
//将当前的队尾元素插入到合适的位置
siftDown(0, x);
setIndex(f, -1);
return f;
跟进下siftDown函数:
//k记录最后存储数据的位置,初始为0
private void siftDown(int k, RunnableScheduledFuture<?> key)
//找到中间的位置
int half = size >>> 1;
//队尾的任务,最少被移动到队列中间
//同时对中间前半部分的任务重排序
while (k < half)
//决定child的位置
int child = (k << 1) + 1;
RunnableScheduledFuture<?> c = queue[child];
int right = child + 1;
//选择连续两个中,triggerTime小的
if (right < size && c.compareTo(queue[right]) > 0)
c = queue[child = right];
//key的triggerTime小于c, 退出循环
if (key.compareTo(c) <= 0)
break;
//否则将c移动到k的位置
queue[k] = c;
setIndex(c, k);
//增加k值
k = child;
queue[k] = key;
setIndex(key, k);
3.3 poll
了解take的源码后,poll的代码就比较好理解了,
我们主要看看有超时等待的poll函数:
public RunnableScheduledFuture<?> poll(long timeout, TimeUnit unit)
throws InterruptedException
//等待的极限时间
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try
for (;;)
RunnableScheduledFuture<?> first = queue[0];
if (first == null)
//poll不是无限阻塞的,
//若队列为空,且极限等待时间小于等于0,则直接返回null
if (nanos <= 0L)
return null;
else
//队列为空,但极限时间大于0,则阻塞一段时间
//被唤醒后剩余的等待时间
nanos = available.awaitNanos(nanos);
else
//获取到任务后,同样判断任务的时间是否合适
long delay = first.getDelay(NANOSECONDS);
if (delay <= 0L)
//时间满足条件后,同样调用finishPoll
return finishPoll(first);
//拿到任务但还需等待执行,
//极限极限等待时间小于等于0,直接返回null
if (nanos <= 0L)
return null;
//否则进入等待时间
first = null; // don't retain ref while waiting
//极限等待时间小于任务的delay,或当前存在leader线程
//则等待nanos时间
if (nanos < delay || leader != null)
nanos = available.awaitNanos(nanos);
else
//极限等待时间大于任务delay且leader为null
//那么当前线程成为leader
Thread thisThread = Thread.currentThread();
leader = thisThread;
try
//最多等待delay时间
long timeLeft = available.awaitNanos(delay);
//更新剩余的极限等待时间
nanos -= delay - timeLeft;
finally
if (leader == thisThread)
leader = null;
finally
if (leader == null && queue[0] != null)
available.signal();
lock.unlock();
从逻辑来看,poll与take函数基本一致,只是需要额外考虑极限等待时间。
由于存在极限等待时间,因此调用poll函数时,leader机制带来的效果并不明显。
四、ScheduledFutureTask的执行
Worker线程从队列中取出任务后,最终将调用任务的run方法。
对于ScheduledThreadPoolExecutor来说,实际执行的任务类型为ScheduledFutureTask,
因此这一部分我们看看对应的run方法:
public void run()
//判断当前任务是否周期性执行
boolean periodic = isPeriodic();
//根据线程池当前的状态,判断能否执行任务
if (!canRunInCurrentRunState(periodic))
cancel(false);
else if (!periodic)
//非周期函数直接执行,调用父类FutureTask的run方法
super.run();
//对于周期性任务,本次执行成功后,安排下一次
else if (super.runAndReset())
//计算下一次执行的时间
setNextRunTime();
//将新的任务重新加入的队列中执行
reExecutePeriodic(outerTask);
reExecutePeriodic的代码如下:
void reExecutePeriodic(RunnableScheduledFuture<?> task)
//逻辑与前文的delayedExecute比较类似
if (canRunInCurrentRunState(true))
super.getQueue().add(task);
if (!canRunInCurrentRunState(true) && remove(task))
task.cancel(false);
else
ensurePrestart();
五、总结
至此,ScheduledThreadPoolExecutor的主要功能分析完毕。
从代码来看,ScheduledThreadPoolExecutor主要通过DelayedQueue来实现它的核心功能。
了解DelayedQueue的add和take函数后,基本就能搞清楚其延迟执行任务的原理。
至于周期性任务,不过是每次执行完毕后,更改任务的执行时间,
然后重新将任务添加到队列中。
以上是关于Executor Framework分析 ScheduledThreadPoolExecutor的主要内容,如果未能解决你的问题,请参考以下文章
Executor Framework分析 ScheduledThreadPoolExecutor
Executor Framework分析 ScheduledThreadPoolExecutor
Executor Framework分析 ThreadPoolExecutor部分函数分析
Executor Framework分析 ThreadPoolExecutor部分函数分析