Java并发编程21线程池ThreadPoolExecutor源码解析
Posted Leon
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Java并发编程21线程池ThreadPoolExecutor源码解析相关的知识,希望对你有一定的参考价值。
一、前言
JUC这部分还有线程池这一块没有分析,需要抓紧时间分析,下面开始ThreadPoolExecutor,其是线程池的基础,分析完了这个类会简化之后的分析,线程池可以解决两个不同问题:由于减少了每个任务调用的开销,它们通常可以在执行大量异步任务时提供增强的性能,并且还可以提供绑定和管理资源(包括执行任务集时使用的线程)的方法。下面开始分析。
二、ThreadPoolExecutor数据结构
在ThreadPoolExecutor的内部,主要由BlockingQueue和AbstractQueuedSynchronizer对其提供支持,BlockingQueue接口有多种数据结构的实现,如LinkedBlockingQueue、ArrayBlockingQueue等,而AbstractQueuedSynchronizer在之前有过详细的分析,有兴趣的读者可以参考。
三、ThreadPoolExecutor源码分析
3.1 类的继承关系
public class ThreadPoolExecutor extends AbstractExecutorService {}
说明:ThreadPoolExecutor继承自AbstractExecutorService,AbstractExecuetorService提供了ExecutorService执行方法的默认实现。
3.2 类的内部类
ThreadPoolExecutor的核心内部类为Worker,其对资源进行了复用,减少创建线程的开销,还有若干个策略类。内部类的类图如下
说明:可以看到Worker继承了AQS抽象类并且实现了Runnable接口,其是ThreadPoolExecutor的核心内部类。而对于AbortPolicy,用于被拒绝任务的处理程序,它将抛出 RejectedExecutionException、CallerRunsPolicy,用于被拒绝任务的处理程序,它直接在 execute 方法的调用线程中运行被拒绝的任务;如果执行程序已关闭,则会丢弃该任务、DiscardPolicy,用于被拒绝任务的处理程序,默认情况下它将丢弃被拒绝的任务、DiscardOldestPolicy,用于被拒绝任务的处理程序,它放弃最旧的未处理请求,然后重试 execute;如果执行程序已关闭,则会丢弃该任务。这些都是拒绝任务提交时的所采用的不同策略。
① Worker类
1. 类的继承关系
private final class Worker extends AbstractQueuedSynchronizer implements Runnable {}
说明:Worker继承了AQS抽象类,其重写了AQS的一些方法,并且其也可作为一个Runnable对象,从而可以创建线程Thread。
2. 类的属性
private final class Worker extends AbstractQueuedSynchronizer implements Runnable { /** * This class will never be serialized, but we provide a * serialVersionUID to suppress a javac warning. */ // 版本号 private static final long serialVersionUID = 6138294804551838833L; /** Thread this worker is running in. Null if factory fails. */ // worker 所对应的线程 final Thread thread; /** Initial task to run. Possibly null. */ // worker所对应的第一个任务 Runnable firstTask; /** Per-thread task counter */ // 已完成任务数量 volatile long completedTasks; }
说明:Worker属性中比较重要的属性如下,Thread类型的thread属性,用来封装worker(因为worker为Runnable对象),表示一个线程;Runnable类型的firstTask,其表示该worker所包含的Runnable对象,即用户自定义的Runnable对象,完成用户自定义的逻辑的Runnable对象;volatile修饰的long类型的completedTasks,表示已完成的任务数量。
3. 类的构造函数
Worker(Runnable firstTask) { // 设置状态为-1 setState(-1); // inhibit interrupts until runWorker // 初始化第一个任务 this.firstTask = firstTask; // 根据当前worker,初始化线程 this.thread = getThreadFactory().newThread(this); }
说明:用于构造一个worker对象,并设置AQS的state为-1,同时初始化了对应的域。
4. 核心函数分析
// 重写了Runnable的run方法 public void run() { runWorker(this); } // Lock methods // // The value 0 represents the unlocked state. // The value 1 represents the locked state. // 是否被独占,0代表未被独占,1代表被独占 protected boolean isHeldExclusively() { return getState() != 0; } // 尝试获取 protected boolean tryAcquire(int unused) { if (compareAndSetState(0, 1)) { // 比较并设置状态成功 // 设置独占线程 setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; } // 尝试释放 protected boolean tryRelease(int unused) { // 设置独占线程为null setExclusiveOwnerThread(null); // 设置状态为0 setState(0); return true; } // 获取锁 public void lock() { acquire(1); } // 尝试获取锁 public boolean tryLock() { return tryAcquire(1); } // 释放锁 public void unlock() { release(1); } // 是否被独占 public boolean isLocked() { return isHeldExclusively(); } // void interruptIfStarted() { Thread t; if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) { // AQS状态大于等于0并且worker对应的线程不为null并且该线程没有被中断 try { // 中断线程 t.interrupt(); } catch (SecurityException ignore) { } } }
说明:Worker的函数主要是重写了AQS的相应函数和重写了Runnable的run函数,重写的函数比较简单,具体的可以参见AQS的分析,这里不再累赘。
3.3 类的属性
public class ThreadPoolExecutor extends AbstractExecutorService { // 线程池的控制状态(用来表示线程池的运行状态(整形的高3位)和运行的worker数量(低29位)) private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); // 29位的偏移量 private static final int COUNT_BITS = Integer.SIZE - 3; // 最大容量(2^29 - 1) private static final int CAPACITY = (1 << COUNT_BITS) - 1; // 线程运行状态,总共有5个状态,需要3位来表示(所以偏移量的29 = 32 - 3) private static final int RUNNING = -1 << COUNT_BITS; private static final int SHUTDOWN = 0 << COUNT_BITS; private static final int STOP = 1 << COUNT_BITS; private static final int TIDYING = 2 << COUNT_BITS; private static final int TERMINATED = 3 << COUNT_BITS; // 阻塞队列 private final BlockingQueue<Runnable> workQueue; // 可重入锁 private final ReentrantLock mainLock = new ReentrantLock(); // 存放工作线程集合 private final HashSet<Worker> workers = new HashSet<Worker>(); // 终止条件 private final Condition termination = mainLock.newCondition(); // 最大线程池容量 private int largestPoolSize; // 已完成任务数量 private long completedTaskCount; // 线程工厂 private volatile ThreadFactory threadFactory; // 拒绝执行处理器 private volatile RejectedExecutionHandler handler; // 线程等待运行时间 private volatile long keepAliveTime; // 是否运行核心线程超时 private volatile boolean allowCoreThreadTimeOut; // 核心池的大小 private volatile int corePoolSize; // 最大线程池大小 private volatile int maximumPoolSize; // 默认拒绝执行处理器 private static final RejectedExecutionHandler defaultHandler = new AbortPolicy(); }
说明:这里着重讲解一下AtomicInteger类型的ctl属性,ctl为线程池的控制状态,用来表示线程池的运行状态(整形的高3位)和运行的worker数量(低29位)),其中,线程池的运行状态有如下几种
/** * RUNNING : 接受新任务并且处理已经进入阻塞队列的任务 * SHUTDOWN : 不接受新任务,但是处理已经进入阻塞队列的任务 * STOP : 不接受新任务,不处理已经进入阻塞队列的任务并且中断正在运行的任务 * TIDYING : 所有的任务都已经终止,workerCount为0, 线程转化为TIDYING状态并且调用terminated钩子函数 * TERMINATED: terminated钩子函数已经运行完成 **/ private static final int RUNNING = -1 << COUNT_BITS; private static final int SHUTDOWN = 0 << COUNT_BITS; private static final int STOP = 1 << COUNT_BITS; private static final int TIDYING = 2 << COUNT_BITS; private static final int TERMINATED = 3 << COUNT_BITS;
说明:由于有5种状态,最少需要3位表示,所以采用的AtomicInteger的高3位来表示,低29位用来表示worker的数量,即最多表示2^29 - 1。
3.4 类的构造函数
1. ThreadPoolExecutor(int, int, long, TimeUnit, BlockingQueue<Runnable>)型构造函数
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), defaultHandler); }
说明:该构造函数用给定的初始参数和默认的线程工厂及被拒绝的执行处理程序创建新的 ThreadPoolExecutor。
2. ThreadPoolExecutor(int, int, long, TimeUnit, BlockingQueue<Runnable>, ThreadFactory)型构造函数
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, defaultHandler); }
说明:该构造函数用给定的初始参数和默认被拒绝的执行处理程序创建新的 ThreadPoolExecutor
3. ThreadPoolExecutor(int, int, long, TimeUnit, BlockingQueue<Runnable>, RejectedExecutionHandler)型构造函数
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), handler); }
说明:该构造函数用给定的初始参数和默认的线程工厂创建新的 ThreadPoolExecutor
4. ThreadPoolExecutor(int, int, long, TimeUnit, BlockingQueue<Runnable>, ThreadFactory, RejectedExecutionHandler)型构造函数
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || // 核心大小不能小于0 maximumPoolSize <= 0 || // 线程池的初始最大容量不能小于0 maximumPoolSize < corePoolSize || // 初始最大容量不能小于核心大小 keepAliveTime < 0) // keepAliveTime不能小于0 throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); // 初始化相应的域 this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }
说明:该构造函数用给定的初始参数创建新的 ThreadPoolExecutor,其他的构造函数都会调用到此构造函数。
3.5 核心函数分析
1. execute函数
public void execute(Runnable command) { if (command == null) // 命令为null,抛出异常 throw new NullPointerException(); /* * Proceed in 3 steps: * * 1. If fewer than corePoolSize threads are running, try to * start a new thread with the given command as its first * task. The call to addWorker atomically checks runState and * workerCount, and so prevents false alarms that would add * threads when it shouldn\'t, by returning false. * * 2. If a task can be successfully queued, then we still need * to double-check whether we should have added a thread * (because existing ones died since last checking) or that * the pool shut down since entry into this method. So we * recheck state and if necessary roll back the enqueuing if * stopped, or start a new thread if there are none. * * 3. If we cannot queue task, then we try to add a new * thread. If it fails, we know we are shut down or saturated * and so reject the task. */ /* * 进行下面三步 * * 1. 如果运行的线程小于corePoolSize,则尝试使用用户定义的Runnalbe对象创建一个新的线程 * 调用addWorker函数会原子性的检查runState和workCount,通过返回false来防止在不应 * 该添加线程时添加了线程 * 2. 如果一个任务能够成功入队列,在添加一个线城时仍需要进行双重检查(因为在前一次检查后 * 该线程死亡了),或者当进入到此方法时,线程池已经shutdown了,所以需要再次检查状态, * 若有必要,当停止时还需要回滚入队列操作,或者当线程池没有线程时需要创建一个新线程 * 3. 如果无法入队列,那么需要增加一个新线程,如果此操作失败,那么就意味着线程池已经shut * down或者已经饱和了,所以拒绝任务 */ // 获取线程池控制状态 int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { // worker数量小于corePoolSize if (addWorker(command, true)) // 添加worker // 成功则返回 return; // 不成功则再次获取线程池控制状态 c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { // 线程池处于RUNNING状态,将命令(用户自定义的Runnable对象)添加进workQueue队列 // 再次检查,获取线程池控制状态 int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) // 线程池不处于RUNNING状态,将命令从workQueue队列中移除 // 拒绝执行命令 reject(command); else if (workerCountOf(recheck) == 0) // worker数量等于0 // 添加worker addWorker(null, false); } else if (!addWorker(command, false)) // 添加worker失败 // 拒绝执行命令 reject(command); }
当在客户端调用submit时,之后会间接调用到execute函数,其在将来某个时间执行给定任务,此方法中并不会直接运行给定的任务。此方法中主要会调用到addWorker函数。
addWorker函数说明:第一个参数firstTask不为null,则创建的线程就会先执行firstTask对象,然后去阻塞队列中取任务,否直接到阻塞队列中获取任务来执行。第二个参数,core参数为真,则用corePoolSize作为池中线程数量的最大值;为假,则以maximumPoolSize作为池中线程数量的最大值。
addWorker函数源码:
private boolean addWorker(Runnable firstTask, boolean core) { retry: for (;;) { // 外层无限循环 // 获取线程池控制状态 int c = ctl.get(); // 获取状态 int rs = runStateOf(c); // Check if queue empty only if necessary. if (rs >= SHUTDOWN && // 状态大于等于SHUTDOWN,初始的ctl为RUNNING,小于SHUTDOWN ! (rs == SHUTDOWN && // 状态为SHUTDOWN firstTask == null && // 第一个任务为null ! workQueue.isEmpty())) // worker队列不为空 // 返回 return false; for (;;) { // worker数量 int wc = workerCountOf(c); if (wc >= CAPACITY || // worker数量大于等于最大容量 wc >= (core ? corePoolSize : maximumPoolSize)) // worker数量大于等于核心线程池大小或者最大线程池大小 return false; if (compareAndIncrementWorkerCount(c)) // 比较并增加worker的数量 // 跳出外层循环 break retry; // 获取线程池控制状态 c = ctl.get(); // Re-read ctl if (runStateOf(c) != rs) // 此次的状态与上次获取的状态不相同 // 跳过剩余部分,继续循环 continue retry; // else CAS failed due to workerCount change; retry inner loop } } // worker开始标识 boolean workerStarted = false; // worker被添加标识 boolean workerAdded = false; // Worker w = null; try { // 初始化worker w = new Worker(firstTask); // 获取worker对应的线程 final Thread t = w.thread; if (t != null) { // 线程不为null // 线程池锁 final ReentrantLock mainLock = this.mainLock; // 获取锁 mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. // 线程池的运行状态 int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN || // 小于SHUTDOWN (rs == SHUTDOWN && firstTask == null)) { // 等于SHUTDOWN并且firstTask为null if (t.isAlive()) // precheck that t is startable // 线程刚添加进来,还未启动就存活 // 抛出线程状态异常 throw new IllegalThreadStateException(); // 将worker添加到worker集合 workers.add(w); // 获取worker集合的大小 int s = workers.size(); if (s > largestPoolSize) // 队列大小大于largestPoolSize // 重新设置largestPoolSize largestPoolSize = s; // 设置worker已被添加标识 workerAdded = true; } } finally { // 释放锁 mainLock.unlock(); } if (workerAdded) { // worker被添加 // 开始执行worker的run方法 t.start(); // 设置worker已开始标识 workerStarted = true; } } } finally { if (! workerStarted) // worker没有开始 // 添加worker失败 addWorkerFailed(w); } return workerStarted; }
说明:此函数可能会完成如下几件任务
① 通过spin和cas增加workcount
② 生成封装新thread和firsttask的work,worker继承了aqs,通过mainLock加锁线程安全的将此worker添加进workers集合中,更新largestPoolSize。
③ 启动worker对应的线程,运行runWorker方法。
④ 如果worker创建失败,回滚,将worker从workers集合中删除,并原子性的减少workerCount。
2. runWorker函数
final void runWorker(Worker w) { // 获取当前线程 Thread wt = Thread.currentThread(); // 获取w的firstTask Runnable task = w.firstTask; // 设置w的firstTask为null w.firstTask = null; // 释放锁(设置state为0,允许中断) w.unlock(); // allow interrupts boolean completedAbruptly = true; try { while (task != null || (task = getTask()) != null) { // 任务不为null或者阻塞队列还存在任务 // 获取锁 w.lock(); // If pool is stopping, ensure thread is interrupted; // if not, ensure thread is not interrupted. This // requires a recheck in second case to deal with // shutdownNow race while clearing interrupt if ((runStateAtLeast(ctl.get(), STOP) || // 线程池的运行状态至少应该高于STOP (Thread.interrupted() && // 线程被中断 runStateAtLeast(ctl.get(), STOP))) && // 再次检查,线程池的运行状态至少应该高于STOP !wt.isInterrupted()) // wt线程(当前线程)没有被中断 wt.interrupt(); // 中断wt线程(当前线程) try { // 在执行之前调用钩子函数 beforeExecute(wt, task); Throwable thrown = null; try { // 运行给定的任务 task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { // 执行完后调用钩子函数 afterExecute(task, thrown); } } finally { task = null; // 增加给worker完成的任务数量 w.completedTasks++; // 释放锁 w.unlock(); } } completedAbruptly = false; } finally { // 处理完成后,调用钩子函数 processWorkerExit(w, completedAbruptly); } }
说明:此函数中会实际执行给定任务(即调用用户重写的run方法),并且当给定任务完成后,会继续从阻塞队列中取任务,直到阻塞队列为空(即任务全部完成)。在执行给定任务时,会调用钩子函数,利用钩子函数可以完成用户自定义的一些逻辑。在runWorker中会调用到getTask函数和processWorkerExit钩子函数,其中,getTask函数源码如下
private Runnable getTask() { boolean timedOut = false; // Did the last poll() time out? for (;;) { // 无限循环,确保操作成功 // 获取线程池控制状态 int c = ctl.get(); // 运行的状态 int rs = runStateOf(c); // Check if queue empty only if necessary. if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { // 大于等于SHUTDOWN(表示调用了shutDown)并且(大于等于STOP(调用了shutDownNow)或者worker阻塞队列为空) // 减少worker的数量 decrementWorkerCount(); // 返回null,不执行任务 return null; } // 获取worker数量 int wc = workerCountOf(c); // Are workers subject to culling? boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; // 是否允许coreThread超时或者workerCount大于核心大小 if ((wc > maximumPoolSize || (timed && timedOut)) // worker数量大于maximumPoolSize && (wc > 1 || workQueue.isEmpty())) { // workerCount大于1或者worker阻塞队列为空(在阻塞队列不为空时,需要保证至少有一个wc) if (compareAndDecrementWorkerCount(c)) // 比较并减少workerCount // 返回null,不执行任务,该worker会退出 return null; // 跳过剩余部分,继续循环 continue; } try { Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : // 等待指定时间 workQueue.take(); // 一直等待,直到有元素 if (r != null) return r; // 等待指定时间后,没有获取元素,则超时 timedOut = true; } catch (InterruptedException retry) { // 抛出了被中断异常,重试,没有超时 timedOut = false; } } }
说明:此函数用于从workerQueue阻塞队列中获取Runnable对象,由于是阻塞队列,所以支持有限时间等待(poll)和无限时间等待(take)。在该函数中还会响应shutDown和、shutDownNow函数的操作,若检测到线程池处于SHUTDOWN或STOP状态,则会返回null,而不再返回阻塞队列中的Runnalbe对象。
processWorkerExit函数是在worker退出时调用到的钩子函数,而引起worker退出的主要因素如下
① 阻塞队列已经为空,即没有任务可以运行了。
② 调用了shutDown或shutDownNow函数
processWorkerExit的源码如下
private void processWorkerExit(Worker w, boolean completedAbruptly) { if (completedAbruptly) // 如果被中断,则需要减少workCount // If abrupt, then workerCount wasn\'t adjusted decrementWorkerCount(); // 获取可重入锁 final ReentrantLock mainLock = this.mainLock; // 获取锁 mainLock.lock(); try以上是关于Java并发编程21线程池ThreadPoolExecutor源码解析的主要内容,如果未能解决你的问题,请参考以下文章android线程与线程池-----线程池《android开发艺术与探索》
Java 并发编程线程池机制 ( 线程池示例 | newCachedThreadPool | newFixedThreadPool | newSingleThreadExecutor )