Java ScheduledExecutorService源码分析
Posted Just DO IT by luckygxf
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Java ScheduledExecutorService源码分析相关的知识,希望对你有一定的参考价值。
Java 定时任务可以用Timer + TimerTask来做,或者使用ScheduledExecutorService,使用ScheduledExecutorService有两个好处:
1. 如果任务执行时间过长,TimerTask会出现延迟执行的情况。比如,第一任务在1000ms执行了4000ms,第二个任务定时在2000ms开始执行。这里由于第一个任务要执行4000,所以第二个任务实际在5000ms开始执行。这是由于Timer是单线程,且顺序执行提交的任务
2. 如果执行任务抛出异常,Timer是不会执行会后面的任务的
使用ScheduledExecutorService可以避免上面两种情况,因为ScheduledExecutorService是线程池,有多个线程执行。
下面是一个使用ScheduledExecutorService实现定时任务的Demo
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; /** * Created by gxf on 2017/6/26. */ public class TestSchedule { public static void main(String[] args) { ScheduledExecutorService scheduledExecutorService = new ScheduledThreadPoolExecutor(5); Task task = new Task(); scheduledExecutorService.scheduleAtFixedRate(task, -10, 1, TimeUnit.SECONDS); } } class Task implements Runnable{ public void run(){ System.out.println("do task..."); } }
这里每隔1秒,控制台会输出do task
ScheduledExecutorService是一个接口
public interface ScheduledExecutorService extends ExecutorService {
其中一个实现是
public class ScheduledThreadPoolExecutor extends ThreadPoolExecutor implements ScheduledExecutorService {
在Demo中
ScheduledExecutorService scheduledExecutorService = new ScheduledThreadPoolExecutor(5);
跟进ScheduledThreadPoolExecutor
/** * Creates a new {@code ScheduledThreadPoolExecutor} with the * given core pool size. * * @param corePoolSize the number of threads to keep in the pool, even * if they are idle, unless {@code allowCoreThreadTimeOut} is set * @throws IllegalArgumentException if {@code corePoolSize < 0} */ public ScheduledThreadPoolExecutor(int corePoolSize) { super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue()); }
这里Super调用
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), defaultHandler); }
设定了线程池的各个参数,核心线程数,最大线程数,任务队列等。但这里还有线程创建,有任务提交了,才会创建线程池
继续Demo中
Task task = new Task();
这里只是创建了一个Runable对象
继续Demo
scheduledExecutorService.scheduleAtFixedRate(task, -10, 1, TimeUnit.SECONDS);
这里已经把任务提交给线程池了,进入方法
/** * @throws RejectedExecutionException {@inheritDoc} * @throws NullPointerException {@inheritDoc} * @throws IllegalArgumentException {@inheritDoc} */ public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) { if (command == null || unit == null) throw new NullPointerException(); if (period <= 0) throw new IllegalArgumentException(); ScheduledFutureTask<Void> sft = new ScheduledFutureTask<Void>(command, null, triggerTime(initialDelay, unit), unit.toNanos(period)); RunnableScheduledFuture<Void> t = decorateTask(command, sft); sft.outerTask = t; delayedExecute(t); return t; }
前面都是一些条件检查和包装,看最后的delayedExecute(t)
/** * Main execution method for delayed or periodic tasks. If pool * is shut down, rejects the task. Otherwise adds task to queue * and starts a thread, if necessary, to run it. (We cannot * prestart the thread to run the task because the task (probably) * shouldn‘t be run yet.) If the pool is shut down while the task * is being added, cancel and remove it if required by state and * run-after-shutdown parameters. * * @param task the task */ private void delayedExecute(RunnableScheduledFuture<?> task) { if (isShutdown()) reject(task); else { super.getQueue().add(task); if (isShutdown() && !canRunInCurrentRunState(task.isPeriodic()) && remove(task)) task.cancel(false); else ensurePrestart(); } }
ok,这里可以看到任务被添加到了一个队列里面。在看最后,ensurePrestart()
/** * Same as prestartCoreThread except arranges that at least one * thread is started even if corePoolSize is 0. */ void ensurePrestart() { int wc = workerCountOf(ctl.get()); if (wc < corePoolSize) addWorker(null, true); else if (wc == 0) addWorker(null, false); }
这里可以看出,如果线程池里面的线程数,小于核心线程数,会继续添加线程。进入addWork(null, true)
1 private boolean addWorker(Runnable firstTask, boolean core) { 2 retry: 3 for (;;) { 4 int c = ctl.get(); 5 int rs = runStateOf(c); 6 7 // Check if queue empty only if necessary. 8 if (rs >= SHUTDOWN && 9 ! (rs == SHUTDOWN && 10 firstTask == null && 11 ! workQueue.isEmpty())) 12 return false; 13 14 for (;;) { 15 int wc = workerCountOf(c); 16 if (wc >= CAPACITY || 17 wc >= (core ? corePoolSize : maximumPoolSize)) 18 return false; 19 if (compareAndIncrementWorkerCount(c)) 20 break retry; 21 c = ctl.get(); // Re-read ctl 22 if (runStateOf(c) != rs) 23 continue retry; 24 // else CAS failed due to workerCount change; retry inner loop 25 } 26 } 27 28 boolean workerStarted = false; 29 boolean workerAdded = false; 30 Worker w = null; 31 try { 32 w = new Worker(firstTask); 33 final Thread t = w.thread; 34 if (t != null) { 35 final ReentrantLock mainLock = this.mainLock; 36 mainLock.lock(); 37 try { 38 // Recheck while holding lock. 39 // Back out on ThreadFactory failure or if 40 // shut down before lock acquired. 41 int rs = runStateOf(ctl.get()); 42 43 if (rs < SHUTDOWN || 44 (rs == SHUTDOWN && firstTask == null)) { 45 if (t.isAlive()) // precheck that t is startable 46 throw new IllegalThreadStateException(); 47 workers.add(w); 48 int s = workers.size(); 49 if (s > largestPoolSize) 50 largestPoolSize = s; 51 workerAdded = true; 52 } 53 } finally { 54 mainLock.unlock(); 55 } 56 if (workerAdded) { 57 t.start(); 58 workerStarted = true; 59 } 60 } 61 } finally { 62 if (! workerStarted) 63 addWorkerFailed(w); 64 } 65 return workerStarted; 66 }
第32行和33行可以看出,任务已经提交给Work类了,第57行,t.start()这里启动线程,执行提交的任务
到这里,提交的任务已经开始执行了。这里,我们在看一下Worker这个包装类
private final class Worker extends AbstractQueuedSynchronizer implements Runnable { /** * This class will never be serialized, but we provide a * serialVersionUID to suppress a javac warning. */ private static final long serialVersionUID = 6138294804551838833L; /** Thread this worker is running in. Null if factory fails. */ final Thread thread; /** Initial task to run. Possibly null. */ Runnable firstTask;
可以看到有Thread, task,这里,其实就是把task指向要执行的任务,thread作为作为线程执行任务。
在ThreadPoolExecutor中
private final HashSet<Worker> workers = new HashSet<Worker>();
这个保存我们生成的线程,有了这个就不用重新创建和销毁线程了
我们在看一下Worker的run()方法
/** Delegates main run loop to outer runWorker */ public void run() { runWorker(this); }
继续跟进
1 final void runWorker(Worker w) { 2 Thread wt = Thread.currentThread(); 3 Runnable task = w.firstTask; 4 w.firstTask = null; 5 w.unlock(); // allow interrupts 6 boolean completedAbruptly = true; 7 try { 8 while (task != null || (task = getTask()) != null) { 9 w.lock(); 10 // If pool is stopping, ensure thread is interrupted; 11 // if not, ensure thread is not interrupted. This 12 // requires a recheck in second case to deal with 13 // shutdownNow race while clearing interrupt 14 if ((runStateAtLeast(ctl.get(), STOP) || 15 (Thread.interrupted() && 16 runStateAtLeast(ctl.get(), STOP))) && 17 !wt.isInterrupted()) 18 wt.interrupt(); 19 try { 20 beforeExecute(wt, task); 21 Throwable thrown = null; 22 try { 23 task.run(); 24 } catch (RuntimeException x) { 25 thrown = x; throw x; 26 } catch (Error x) { 27 thrown = x; throw x; 28 } catch (Throwable x) { 29 thrown = x; throw new Error(x); 30 } finally { 31 afterExecute(task, thrown); 32 } 33 } finally { 34 task = null; 35 w.completedTasks++; 36 w.unlock(); 37 } 38 } 39 completedAbruptly = false; 40 } finally { 41 processWorkerExit(w, completedAbruptly); 42 } 43 }
第8行getTask()取要执行的任务,第23行执行任务
继续跟进第8行getTask()
1 private Runnable getTask() { 2 boolean timedOut = false; // Did the last poll() time out? 3 4 for (;;) { 5 int c = ctl.get(); 6 int rs = runStateOf(c); 7 8 // Check if queue empty only if necessary. 9 if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { 10 decrementWorkerCount(); 11 return null; 12 } 13 14 int wc = workerCountOf(c); 15 16 // Are workers subject to culling? 17 boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; 18 19 if ((wc > maximumPoolSize || (timed && timedOut)) 20 && (wc > 1 || workQueue.isEmpty())) { 21 if (compareAndDecrementWorkerCount(c)) 22 return null; 23 continue; 24 } 25 26 try { 27 Runnable r = timed ? 28 workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : 29 workQueue.take(); 30 if (r != null) 31 return r; 32 timedOut = true; 33 } catch (InterruptedException retry) { 34 timedOut = false; 35 } 36 } 37 }
这里主要看第27-29,前面我们说过,任务是放到一个队列里面的。其实,是个阻塞队列,在队列为空的时候取,会阻塞线程。这里就用到了这个功能,在29行workQueue.take()如果没有任务,线程就会阻塞。有任务就会取任务进行执行。
简单点:线程池就是一个Set对象 + BlockingQueue对象, Workers + BlockingQueue
以上是关于Java ScheduledExecutorService源码分析的主要内容,如果未能解决你的问题,请参考以下文章