J.U.C并发框架源码阅读(十四)ScheduledThreadPoolExecutor
Posted stevenczp
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了J.U.C并发框架源码阅读(十四)ScheduledThreadPoolExecutor相关的知识,希望对你有一定的参考价值。
基于版本jdk1.7.0_80
java.util.concurrent.ScheduledThreadPoolExecutor
代码如下
/* * ORACLE PROPRIETARY/CONFIDENTIAL. Use is subject to license terms. * * * * * * * * * * * * * * * * * * * * */ /* * * * * * * Written by Doug Lea with assistance from members of JCP JSR-166 * Expert Group and released to the public domain, as explained at * http://creativecommons.org/publicdomain/zero/1.0/ */ package java.util.concurrent; import java.util.concurrent.atomic.*; import java.util.concurrent.locks.*; import java.util.*; /** * A {@link ThreadPoolExecutor} that can additionally schedule * commands to run after a given delay, or to execute * periodically. This class is preferable to {@link java.util.Timer} * when multiple worker threads are needed, or when the additional * flexibility or capabilities of {@link ThreadPoolExecutor} (which * this class extends) are required. * * <p>Delayed tasks execute no sooner than they are enabled, but * without any real-time guarantees about when, after they are * enabled, they will commence. Tasks scheduled for exactly the same * execution time are enabled in first-in-first-out (FIFO) order of * submission. * * <p>When a submitted task is cancelled before it is run, execution * is suppressed. By default, such a cancelled task is not * automatically removed from the work queue until its delay * elapses. While this enables further inspection and monitoring, it * may also cause unbounded retention of cancelled tasks. To avoid * this, set {@link #setRemoveOnCancelPolicy} to {@code true}, which * causes tasks to be immediately removed from the work queue at * time of cancellation. * * <p>Successive executions of a task scheduled via * {@code scheduleAtFixedRate} or * {@code scheduleWithFixedDelay} do not overlap. While different * executions may be performed by different threads, the effects of * prior executions <a * href="package-summary.html#MemoryVisibility"><i>happen-before</i></a> * those of subsequent ones. * * <p>While this class inherits from {@link ThreadPoolExecutor}, a few * of the inherited tuning methods are not useful for it. In * particular, because it acts as a fixed-sized pool using * {@code corePoolSize} threads and an unbounded queue, adjustments * to {@code maximumPoolSize} have no useful effect. Additionally, it * is almost never a good idea to set {@code corePoolSize} to zero or * use {@code allowCoreThreadTimeOut} because this may leave the pool * without threads to handle tasks once they become eligible to run. * * <p><b>Extension notes:</b> This class overrides the * {@link ThreadPoolExecutor#execute execute} and * {@link AbstractExecutorService#submit(Runnable) submit} * methods to generate internal {@link ScheduledFuture} objects to * control per-task delays and scheduling. To preserve * functionality, any further overrides of these methods in * subclasses must invoke superclass versions, which effectively * disables additional task customization. However, this class * provides alternative protected extension method * {@code decorateTask} (one version each for {@code Runnable} and * {@code Callable}) that can be used to customize the concrete task * types used to execute commands entered via {@code execute}, * {@code submit}, {@code schedule}, {@code scheduleAtFixedRate}, * and {@code scheduleWithFixedDelay}. By default, a * {@code ScheduledThreadPoolExecutor} uses a task type extending * {@link FutureTask}. However, this may be modified or replaced using * subclasses of the form: * * <pre> {@code * public class CustomScheduledExecutor extends ScheduledThreadPoolExecutor { * * static class CustomTask<V> implements RunnableScheduledFuture<V> { ... } * * protected <V> RunnableScheduledFuture<V> decorateTask( * Runnable r, RunnableScheduledFuture<V> task) { * return new CustomTask<V>(r, task); * } * * protected <V> RunnableScheduledFuture<V> decorateTask( * Callable<V> c, RunnableScheduledFuture<V> task) { * return new CustomTask<V>(c, task); * } * // ... add constructors, etc. * }}</pre> * * @since 1.5 * @author Doug Lea */ public class ScheduledThreadPoolExecutor extends ThreadPoolExecutor implements ScheduledExecutorService { /* * This class specializes ThreadPoolExecutor implementation by * * 1. Using a custom task type, ScheduledFutureTask for * tasks, even those that don\'t require scheduling (i.e., * those submitted using ExecutorService execute, not * ScheduledExecutorService methods) which are treated as * delayed tasks with a delay of zero. * * 2. Using a custom queue (DelayedWorkQueue), a variant of * unbounded DelayQueue. The lack of capacity constraint and * the fact that corePoolSize and maximumPoolSize are * effectively identical simplifies some execution mechanics * (see delayedExecute) compared to ThreadPoolExecutor. * * 3. Supporting optional run-after-shutdown parameters, which * leads to overrides of shutdown methods to remove and cancel * tasks that should NOT be run after shutdown, as well as * different recheck logic when task (re)submission overlaps * with a shutdown. * * 4. Task decoration methods to allow interception and * instrumentation, which are needed because subclasses cannot * otherwise override submit methods to get this effect. These * don\'t have any impact on pool control logic though. */ /** * False if should cancel/suppress periodic tasks on shutdown. */ private volatile boolean continueExistingPeriodicTasksAfterShutdown; /** * False if should cancel non-periodic tasks on shutdown. */ private volatile boolean executeExistingDelayedTasksAfterShutdown = true; /** * True if ScheduledFutureTask.cancel should remove from queue */ private volatile boolean removeOnCancel = false; /** * Sequence number to break scheduling ties, and in turn to * guarantee FIFO order among tied entries. */ private static final AtomicLong sequencer = new AtomicLong(0); /** * Returns current nanosecond time. */ final long now() { return System.nanoTime(); } private class ScheduledFutureTask<V> extends FutureTask<V> implements RunnableScheduledFuture<V> { /** Sequence number to break ties FIFO */ private final long sequenceNumber; /** The time the task is enabled to execute in nanoTime units */ private long time; /** * Period in nanoseconds for repeating tasks. A positive * value indicates fixed-rate execution. A negative value * indicates fixed-delay execution. A value of 0 indicates a * non-repeating task. */ private final long period; /** The actual task to be re-enqueued by reExecutePeriodic */ RunnableScheduledFuture<V> outerTask = this; /** * Index into delay queue, to support faster cancellation. */ int heapIndex; /** * Creates a one-shot action with given nanoTime-based trigger time. */ ScheduledFutureTask(Runnable r, V result, long ns) { super(r, result); this.time = ns; this.period = 0; this.sequenceNumber = sequencer.getAndIncrement(); } /** * Creates a periodic action with given nano time and period. */ ScheduledFutureTask(Runnable r, V result, long ns, long period) { super(r, result); this.time = ns; this.period = period; this.sequenceNumber = sequencer.getAndIncrement(); } /** * Creates a one-shot action with given nanoTime-based trigger. */ ScheduledFutureTask(Callable<V> callable, long ns) { super(callable); this.time = ns; this.period = 0; this.sequenceNumber = sequencer.getAndIncrement(); } public long getDelay(TimeUnit unit) { return unit.convert(time - now(), TimeUnit.NANOSECONDS); } public int compareTo(Delayed other) { if (other == this) // compare zero ONLY if same object return 0; if (other instanceof ScheduledFutureTask) { ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other; long diff = time - x.time; if (diff < 0) return -1; else if (diff > 0) return 1; else if (sequenceNumber < x.sequenceNumber) return -1; else return 1; } long d = (getDelay(TimeUnit.NANOSECONDS) - other.getDelay(TimeUnit.NANOSECONDS)); return (d == 0) ? 0 : ((d < 0) ? -1 : 1); } /** * Returns true if this is a periodic (not a one-shot) action. * * @return true if periodic */ public boolean isPeriodic() { return period != 0; } /** * Sets the next time to run for a periodic task. */ private void setNextRunTime() { long p = period; if (p > 0) time += p; else time = triggerTime(-p); } public boolean cancel(boolean mayInterruptIfRunning) { boolean cancelled = super.cancel(mayInterruptIfRunning); if (cancelled && removeOnCancel && heapIndex >= 0) remove(this); return cancelled; } /** * Overrides FutureTask version so as to reset/requeue if periodic. */ public void run() { boolean periodic = isPeriodic(); if (!canRunInCurrentRunState(periodic)) cancel(false); else if (!periodic) ScheduledFutureTask.super.run(); else if (ScheduledFutureTask.super.runAndReset()) { setNextRunTime(); reExecutePeriodic(outerTask); } } } /** * Returns true if can run a task given current run state * and run-after-shutdown parameters. * * @param periodic true if this task periodic, false if delayed */ boolean canRunInCurrentRunState(boolean periodic) { return isRunningOrShutdown(periodic ? continueExistingPeriodicTasksAfterShutdown : executeExistingDelayedTasksAfterShutdown); } /** * Main execution method for delayed or periodic tasks. If pool * is shut down, rejects the task. Otherwise adds task to queue * and starts a thread, if necessary, to run it. (We cannot * prestart the thread to run the task because the task (probably) * shouldn\'t be run yet,) If the pool is shut down while the task * is being added, cancel and remove it if required by state and * run-after-shutdown parameters. * * @param task the task */ private void delayedExecute(RunnableScheduledFuture<?> task) { if (isShutdown()) reject(task); else { super.getQueue().add(task); if (isShutdown() && !canRunInCurrentRunState(task.isPeriodic()) && remove(task)) task.cancel(false); else ensurePrestart(); } } /** * Requeues a periodic task unless current run state precludes it. * Same idea as delayedExecute except drops task rather than rejecting. * * @param task the task */ void reExecutePeriodic(RunnableScheduledFuture<?> task) { if (canRunInCurrentRunState(true)) { super.getQueue().add(task); if (!canRunInCurrentRunState(true) && remove(task)) task.cancel(false); else ensurePrestart(); } } /** * Cancels and clears the queue of all tasks that should not be run * due to shutdown policy. Invoked within super.shutdown. */ @Override void onShutdown() { BlockingQueue<Runnable> q = super.getQueue(); boolean keepDelayed = getExecuteExistingDelayedTasksAfterShutdownPolicy(); boolean keepPeriodic = getContinueExistingPeriodicTasksAfterShutdownPolicy(); if (!keepDelayed && !keepPeriodic) { for (Object e : q.toArray()) if (e instanceof RunnableScheduledFuture<?>) ((RunnableScheduledFuture<?>) e).cancel(false); q.clear(); } else { // Traverse snapshot to avoid iterator exceptions for (Object e : q.toArray()) { if (e instanceof RunnableScheduledFuture) { RunnableScheduledFuture<?> t = (RunnableScheduledFuture<?>)e; if ((t.isPeriodic() ? !keepPeriodic : !keepDelayed) || t.isCancelled()) { // also remove if already cancelled if (q.remove(t)) t.cancel(false); } } } } tryTerminate(); } /** * Modifies or replaces the task used to execute a runnable. * This method can be used to override the concrete * class used for managing internal tasks. * The default implementation simply returns the given task. * * @param runnable the submitted Runnable * @param task the task created to execute the runnable * @return a task that can execute the runnable * @since 1.6 */ protected <V> RunnableScheduledFuture<V> decorateTask( Runnable runnable, RunnableScheduledFuture<V> task) { return task; } /** * Modifies or replaces the task used to execute a callable. * This method can be used to override the concrete * class used for managing internal tasks. * The default implementation simply returns the given task. * * @param callable the submitted Callable * @param task the task created to execute the callable * @return a task that can execute the callable * @since 1.6 */ protected <V> RunnableScheduledFuture<V> decorateTask( Callable<V> callable, RunnableScheduledFuture<V> task) { return task; } /** * Creates a new {@code ScheduledThreadPoolExecutor} with the * given core pool size. * * @param corePoolSize the number of threads to keep in the pool, even * if they are idle, unless {@code allowCoreThreadTimeOut} is set * @throws IllegalArgumentException if {@code corePoolSize < 0} */ public ScheduledThreadPoolExecutor(int corePoolSize) { super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS, new DelayedWorkQueue()); } /** * Creates a new {@code ScheduledThreadPoolExecutor} with the * given initial parameters. * * @param corePoolSize the number of threads to keep in the pool, even * if they are idle, unless {@code allowCoreThreadTimeOut} is set * @param threadFactory the factory to use when the executor * creates a new thread * @throws IllegalArgumentException if {@code corePoolSize < 0} * @throws NullPointerException if {@code threadFactory} is null */ public ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory) { super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS, new DelayedWorkQueue(), threadFactory); } /** * Creates a new ScheduledThreadPoolExecutor with the given * initial parameters. * * @param corePoolSize the number of threads to keep in the pool, even * if they are idle, unless {@code allowCoreThreadTimeOut} is set * @param handler the handler to use when execution is blocked * because the thread bounds and queue capacities are reached * @throws IllegalArgumentException if {@code corePoolSize < 0} * @throws NullPointerException if {@code handler} is null */ public ScheduledThreadPoolExecutor(int corePoolSize, RejectedExecutionHandler handler) { super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS, new DelayedWorkQueue(), handler); } /** * Creates a new ScheduledThreadPoolExecutor with the given * initial parameters. * * @param corePoolSize the number of threads to keep in the pool, even * if they are idle, unless {@code allowCoreThreadTimeOut} is set * @param threadFactory the factory to use when the executor * creates a new thread * @param handler the handler to use when execution is blocked * because the thread bounds and queue capacities are reached * @throws IllegalArgumentException if {@code corePoolSize < 0} * @throws NullPointerException if {@code threadFactory} or * {@code handler} is null */ public ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory, RejectedExecutionHandler handler) { super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS, new DelayedWorkQueue(), threadFactory, handler); } /** * Returns the trigger time of a delayed action. */ private long triggerTime(long delay, TimeUnit unit) { return triggerTime(unit.toNanos((delay < 0) ? 0 : delay)); } /** * Returns the trigger time of a delayed action. */ long triggerTime(long delay) { return now() + ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay)); } /** * Constrains the values of all delays in the queue to be within * Long.MAX_VALUE of each other, to avoid overflow in compareTo. * This may occur if a task is eligible to be dequeued, but has * not yet been, while some other task is added with a delay of * Long.MAX_VALUE. */ private long overflowFree(long delay) { Delayed head = (Delayed) super.getQueue().peek(); if (head != null) { long headDelay = head.getDelay(TimeUnit.NANOSECONDS); if (headDelay < 0 && (delay - headDelay < 0)) delay = Long.MAX_VALUE + headDelay; } return delay; } /** * @throws RejectedExecutionException {@inheritDoc} * @throws NullPointerException {@inheritDoc} */ public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) { if (command == null || unit == null) throw new NullPointerException(); RunnableScheduledFuture<?> t = decorateTask(command, new ScheduledFutureTask<Void>(command, null, triggerTime(delay, unit))); delayedExecute(t); return t; } /** * @throws RejectedExecutionException {@inheritDoc} * @throws NullPointerException {@inheritDoc} */ public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) { if (callable == null || unit == null) throw new NullPointerException(); RunnableScheduledFuture<V> t = decorateTask(callable, new ScheduledFutureTask<V>(callable, triggerTime(delay, unit))); delayedExecute(t); return t; } /** * @throws RejectedExecutionException {@inheritDoc} * @throws NullPointerException {@inheritDoc} * @throws IllegalArgumentException {@inheritDoc} */ public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) { if (command == null || unit == null) throw new NullPointerException(); if (period <= 0) throw new IllegalArgumentException(); ScheduledFutureTask<Void> sft = new ScheduledFutureTask<Void>(command, null, triggerTime(initialDelay, unit), unit.toNanos(period)); RunnableScheduledFuture<Void> t = decorateTask(command, sft); sft.outerTask = t; delayedExecute(t); return t; } /** * @throws RejectedExecutionException {@inheritDoc} * @throws NullPointerException {@inheritDoc} * @throws IllegalArgumentException {@inheritDoc} */ public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) { if (command == null || unit == null) throw new NullPointerException(); if (delay <= 0) throw new IllegalArgumentException(); ScheduledFutureTask<Void> sft = new ScheduledFutureTask<Void>(command, null, triggerTime(initialDelay, unit), unit.toNanos(-delay)); RunnableScheduledFuture<Void> t = decorateTask(command, sft); sft.outerTask = t; delayedExecute(t); return t; } /** * Executes {@code command} with zero required delay. * This has effect equivalent to * {@link #schedule(Runnable,long,TimeUnit) schedule(command, 0, anyUnit)}. * Note that inspections of the queue and of the list returned by * {@code shutdownNow} will access the zero-delayed * {@link ScheduledFuture}, not the {@code command} itself. * * <p>A consequence of the use of {@code ScheduledFuture} objects is * that {@link ThreadPoolExecutor#afterExecute afterExecute} is always * called with a null second {@code Throwable} argument, even if the * {@code command} terminated abruptly. Instead, the {@code Throwable} * thrown by such a task can be obtained via {@link Future#get}. * * @throws RejectedExecutionException at discretion of * {@code RejectedExecutionHandler}, if the task * cannot be accepted for execution because the * executor has been shut down * @throws NullPointerException {@inheritDoc} */ public void execute(Runnable command) { schedule(command, 0, TimeUnit.NANOSECONDS); } // Override AbstractExecutorService methods /** * @throws RejectedExecutionException {@inheritDoc} * @throws NullPointerException {@inheritDoc} */ public Future<?> submit(Runnable task) { return schedule(task, 0, TimeUnit.NANOSECONDS); } /** * @throws RejectedExecutionException {@inheritDoc} * @throws NullPointerException {@inheritDoc} */ public <T> Future<T> submit(Runnable task, T result) { return schedule(Executors.callable(task, result), 0, TimeUnit.NANOSECONDS); } /** * @throws RejectedExecutionException {@inheritDoc} * @throws NullPointerException {@inheritDoc} */ public <T> Future<T> submit(Callable<T> task) { return schedule(task, 0, TimeUnit.NANOSECONDS); } /** * Sets the policy on whether to continue executing existing * periodic tasks even when this executor has been {@code shutdown}. * In this case, these tasks will only terminate upon * {@code shutdownNow} or after setting the policy to * {@code false} when already shutdown. * This value is by default {@code false}. * * @param value if {@code true}, continue after shutdown, else don\'t. * @see #getContinueExistingPeriodicTasksAfterShutdownPolicy */ public void setContinueExistingPeriodicTasksAfterShutdownPolicy(boolean value) { continueExistingPeriodicTasksAfterShutdown = value; if (!value && isShutdown()) onShutdown(); } /** * Gets the policy on whether to continue executing existing * periodic tasks even when this executor has been {@code shutdown}. * In this case, these tasks will only terminate upon * {@code shutdownNow} or after setting the policy to * {@code false} when already shutdown. * This value is by default {@code false}. * * @return {@code true} if will continue after shutdown * @see #setContinueExistingPeriodicTasksAfterShutdownPolicy */ public boolean getContinueExistingPeriodicTasksAfterShutdownPolicy() { J.U.C并发框架源码阅读CountDownLatchJ.U.C并发框架源码阅读ArrayBlockingQueue