JUC源码解析ScheduledThreadPoolExecutor

Posted 林城画序

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了JUC源码解析ScheduledThreadPoolExecutor相关的知识,希望对你有一定的参考价值。

简介

它是一个线程池执行器(ThreadPoolExecutor),在给定的延迟(delay)后执行。在多线程或者对灵活性有要求的环境下,要优于java.util.Timer。

提交的任务在执行之前支持取消,默认情况下,在延迟到来之前,不会自动从队列中删除,但可以设置,使其立刻从队列中移除。

有两种模式,固定频率(scheduleAtFixedRate)和固定延迟(scheduleWithFixedDelay),不管哪种模式,同一个任务不会被叠加执行,即便是不同的线程执行同一个任务。

继承ThreadPoolExecutor,维护一个固定大小的线程池和一个无界延迟队列(delay queue)。

ScheduledFutureTask,用来描述要执行的任务,DelayedWorkQueue,则是装在这些任务的delay queue.

 

固定频率

一个任务,从第一次开始执行的时间点开始,每隔一定的时间执行一次,如果执行的时间大于间隔时间,则要等这次执行结束,再执行下一次。

如上图所示,蓝色表示任务执行,白色表示间隔时间。

 

固定延迟

一个任务,每一次执行结束之后,延迟一定的时间,执行下一次。

 

 如上图所示,蓝色表示任务执行,白色表示间隔时间。

 

源码分析

属性

1     private volatile boolean continueExistingPeriodicTasksAfterShutdown; // shut down之后,是否取消period任务
2 
3     private volatile boolean executeExistingDelayedTasksAfterShutdown = true; // shut down之后,是否取消non-period任务
4 
5     private volatile boolean removeOnCancel = false; // cancel后,是否从队列里移除此任务
6 
7     private static final AtomicLong sequencer = new AtomicLong(); // 给任务编号

 

ScheduledFutureTask

属性

1         private final long sequenceNumber; // 序列编号
2 
3         private long time; // 执行时间
4 
5         private final long period; // 周期,正值:固定频率;负值:固定延迟;0:不重复执行
6 
7         RunnableScheduledFuture<V> outerTask = this; // 实际任务
8 
9         int heapIndex; // 堆索引

 

构造方法

 1         ScheduledFutureTask(Runnable r, V result, long ns) {
 2             super(r, result);
 3             this.time = ns;
 4             this.period = 0; // 不重复执行
 5             this.sequenceNumber = sequencer.getAndIncrement();
 6         }
 7 
 8         ScheduledFutureTask(Runnable r, V result, long ns, long period) {
 9             super(r, result);
10             this.time = ns;
11             this.period = period;
12             this.sequenceNumber = sequencer.getAndIncrement();
13         }
14 
15         ScheduledFutureTask(Callable<V> callable, long ns) {
16             super(callable);
17             this.time = ns;
18             this.period = 0;
19             this.sequenceNumber = sequencer.getAndIncrement();
20         }

 

关键方法

getDelay(TimeUnit)

1         public long getDelay(TimeUnit unit) {
2             return unit.convert(time - now(), NANOSECONDS);
3         }

 

compareTo(Delayed other)

 1         public int compareTo(Delayed other) { // 根据延迟比较元素,在延迟队列中,延迟越小越靠前,延迟最小的在队首,最先出队被执行
 2             if (other == this)
 3                 return 0;
 4             if (other instanceof ScheduledFutureTask) {
 5                 ScheduledFutureTask<?> x = (ScheduledFutureTask<?>) other;
 6                 long diff = time - x.time;
 7                 if (diff < 0)
 8                     return -1;
 9                 else if (diff > 0)
10                     return 1;
11                 else if (sequenceNumber < x.sequenceNumber)
12                     return -1;
13                 else
14                     return 1;
15             }
16             long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
17             return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
18         }

 

setNextRunTime()

1         private void setNextRunTime() { // 设置下一次执行时间
2             long p = period;
3             if (p > 0) // 固定频率,从第一次时间点,每次加period
4                 time += p;
5             else
6                 time = triggerTime(-p); // 固定延迟,每次执行结束后,加period作为下一次执行时间
7         }

 

cancel(boolean mayInterruptIfRunning)

1         public boolean cancel(boolean mayInterruptIfRunning) { // 取消任务
2             boolean cancelled = super.cancel(mayInterruptIfRunning);
3             if (cancelled && removeOnCancel && heapIndex >= 0)
4                 remove(this);
5             return cancelled;
6         }

 

run()

 1         public void run() { // 执行任务
 2             boolean periodic = isPeriodic();
 3             if (!canRunInCurrentRunState(periodic))
 4                 cancel(false);
 5             else if (!periodic)
 6                 ScheduledFutureTask.super.run(); // 单次执行
 7             else if (ScheduledFutureTask.super.runAndReset()) { // 周期执行,runAndReset
 8                 setNextRunTime(); // 设置下次执行时间
 9                 reExecutePeriodic(outerTask); // 重新加入队列
10             }
11         }

 

构造方法

 1     public ScheduledThreadPoolExecutor(int corePoolSize) {
 2         super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue());
 3     }
 4 
 5     public ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory) {
 6         super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue(), threadFactory);
 7     }
 8 
 9     public ScheduledThreadPoolExecutor(int corePoolSize, RejectedExecutionHandler handler) {
10         super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue(), handler);
11     }
12 
13     public ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory,
14             RejectedExecutionHandler handler) {
15         super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue(), threadFactory, handler);
16     }

 

关键方法

scheduleAtFixedRate

 1     public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) { // 固定频率
 2         if (command == null || unit == null)
 3             throw new NullPointerException();
 4         if (period <= 0)
 5             throw new IllegalArgumentException();
 6         ScheduledFutureTask<Void> sft = new ScheduledFutureTask<Void>(command, null, triggerTime(initialDelay, unit),
 7                 unit.toNanos(period)); // period 大于 0
 8         RunnableScheduledFuture<Void> t = decorateTask(command, sft);
 9         sft.outerTask = t;
10         delayedExecute(t);
11         return t;
12     }

 

scheduleWithFixedDelay

 1     public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) { // 固定延迟
 2         if (command == null || unit == null)
 3             throw new NullPointerException();
 4         if (delay <= 0)
 5             throw new IllegalArgumentException();
 6         ScheduledFutureTask<Void> sft = new ScheduledFutureTask<Void>(command, null, triggerTime(initialDelay, unit),
 7                 unit.toNanos(-delay)); // -delay 小于 0
 8         RunnableScheduledFuture<Void> t = decorateTask(command, sft);
 9         sft.outerTask = t;
10         delayedExecute(t);
11         return t;
12     }

 

delayedExecute(RunnableScheduledFuture<?> task)

 1     private void delayedExecute(RunnableScheduledFuture<?> task) {
 2         if (isShutdown()) // 如果线程池已经shut down,则拒绝任务
 3             reject(task);
 4         else {
 5             super.getQueue().add(task); // 否则,添加任务到延迟队列
 6             if (isShutdown() && !canRunInCurrentRunState(task.isPeriodic()) && remove(task))
 7                 task.cancel(false); // 根据run-after-shutdown参数,决定是否取任务
 8             else
 9                 ensurePrestart(); // 保证线程启动
10         }
11     }

 

reExecutePeriodic(RunnableScheduledFuture<?> task)

1     void reExecutePeriodic(RunnableScheduledFuture<?> task) { // 周期性任务重新入队,策略同delayedExecute
2         if (canRunInCurrentRunState(true)) {
3             super.getQueue().add(task);
4             if (!canRunInCurrentRunState(true) && remove(task))
5                 task.cancel(false);
6             else
7                 ensurePrestart();
8         }
9     }

 

ensurePrestart()

1     void ensurePrestart() {
2         int wc = workerCountOf(ctl.get());
3         if (wc < corePoolSize)
4             addWorker(null, true);
5         else if (wc == 0)
6             addWorker(null, false);
7     }

该方法在ThreadPoolExecutor类,保证线程池中至少有一个活动线程。

 

triggerTime()

 1     long triggerTime(long delay) { // 返回延迟动作的触发时间
 2         return now() + ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));
 3     }
 4 
 5     private long overflowFree(long delay) { // 处理溢出情况
 6         Delayed head = (Delayed) super.getQueue().peek();
 7         if (head != null) {
 8             long headDelay = head.getDelay(NANOSECONDS);
 9             if (headDelay < 0 && (delay - headDelay < 0))
10                 delay = Long.MAX_VALUE + headDelay;
11         }
12         return delay;
13     }

 

DelayedWorkQueue

属性

1         private static final int INITIAL_CAPACITY = 16; // 初始容量
2         private RunnableScheduledFuture<?>[] queue = new RunnableScheduledFuture<?>[INITIAL_CAPACITY]; // 堆,充当优先级队列
3         private final ReentrantLock lock = new ReentrantLock(); // 可重入锁
4         private int size = 0;
5         private Thread leader = null; // 领导者线程
6         private final Condition available = lock.newCondition(); // 条件队列

 

关键方法

以下这些方法的解释可参考前两篇文章,【JUC源码解析】DelayQueue【JUC源码解析】PriorityBlockingQueue

siftUp

 1         private void siftUp(int k, RunnableScheduledFuture<?> key) { // 向上调整,同
 2             while (k > 0) {
 3                 int parent = (k - 1) >>> 1;
 4                 RunnableScheduledFuture<?> e = queue[parent];
 5                 if (key.compareTo(e) >= 0)
 6                     break;
 7                 queue[k] = e;
 8                 setIndex(e, k);
 9                 k = parent;
10             }
11             queue[k] = key;
12             setIndex(key, k);
13         }

 

siftDown

 1         private void siftDown(int k, RunnableScheduledFuture<?> key) { // 向下调整
 2             int half = size >>> 1;
 3             while (k < half) {
 4                 int child = (k << 1) + 1;
 5                 RunnableScheduledFuture<?> c = queue[child];
 6                 int right = child + 1;
 7                 if (right < size && c.compareTo(queue[right]) > 0)
 8                     c = queue[child = right];
 9                 if (key.compareTo(c) <= 0)
10                     break;
11                 queue[k] = c;
12                 setIndex(c, k);
13                 k = child;
14             }
15             queue[k] = key;
16             setIndex(key, k);
17         }

 

grow

1         private void grow() { // 扩容
2             int oldCapacity = queue.length;
3             int newCapacity = oldCapacity + (oldCapacity >> 1); // grow 50%
4             if (newCapacity < 0) // overflow
5                 newCapacity = Integer.MAX_VALUE;
6             queue = Arrays.copyOf(queue, newCapacity);
7         }

 

offer

 1         public boolean offer(Runnable x) {
 2             if (x == null)
 3                 throw new NullPointerException();
 4             RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>) x;
 5             final ReentrantLock lock = this.lock;
 6             lock.lock();
 7             try {
 8                 int i = size;
 9                 if (i >= queue.length)
10                     grow();
11                 size = i + 1;
12                 if (i == 0) {
13                     queue[0] = e;
14                     setIndex(e, 0);
15                 } else {
16                     siftUp(i, e);
17                 }
18                 if (queue[0] == e) {
19                     leader = null;
20                     available.signal();
21                 }
22             } finally {
23                 lock.unlock();
24             }
25             return true;
26         }

 

take

 1         public RunnableScheduledFuture<?> take() throws InterruptedException {
 2             final ReentrantLock lock = this.lock;
 3             lock.lockInterruptibly();
 4             try {
 5                 for (;;) {
 6                     RunnableScheduledFuture<?> first = queue[0];
 7                     if (first == null)
 8                         available.await();
 9                     else {
10                         long delay = first.getDelay(NANOSECONDS);
11                         if (delay <= 0)
12                             return finishPoll(first);
13                         first = null; // don\'t retain ref while waiting
14                         if (leader != null)
15                             available.await();
16                         else {
17                             Thread thisThread = Thread.currentThread();
18                             leader = thisThread;
19                             try {
20                                 available.awaitNanos(delay);
21                             } finally {
22                                 if (leader == thisThread)
23                                     leader = null;
24                             }
25                         }
26                     }
27                 }
28             } finally {
29                 if (leader == null && queue[0] != null)
30                     available.signal();
31                 lock.unlock();
32             }
33         }

 

 

行文至此结束。

 

尊重他人的劳动,转载请注明出处:http://www.cnblogs.com/aniao/p/aniao_stpe.html

以上是关于JUC源码解析ScheduledThreadPoolExecutor的主要内容,如果未能解决你的问题,请参考以下文章

JUC源码解析ScheduledThreadPoolExecutor

JUC源码解析Phaser

JUC源码解析CompletableFuture

JUC源码解析CountDownLatch

JUC源码解析ConcurrentLinkedQueue

JUC长征篇之ReentrantLock源码解析