Java 1.6 ThreadPoolExecutor源码解析
Posted Mr-yuenkin
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Java 1.6 ThreadPoolExecutor源码解析相关的知识,希望对你有一定的参考价值。
由于本人水平与表达能力有限,有错误的地方欢迎交流与指正。
一、 成员变量介绍
public class ThreadPoolExecutor extends AbstractExecutorService
// 线程状态变迁图:
// RUNNING->SHUTDOWN,通过调用shutdown方法
// RUNNING/SHUTDOWN->STOP,调用shutdwonNow方法
// SHUTDOWN->TERMINATED,当workQueue为空且poolSize=0
// STOP->TERMINATED,当poolSize=0
volatile int runState;
// RUNNING状态能接受新任务且能处理队列里的任务
static final int RUNNING = 0;
// 不能接受新任务,但是能处理队列里的任务
static final int SHUTDOWN = 1;
// 不能接受新任务,不能处理队列里的任务
static final int STOP = 2;
// 和STOP一样,且线程池里所有线程都停止了,已没有可用线程了
static final int TERMINATED = 3;
// 任务队列,当poolSize==corePoolSize时新任务会优先放入这个队列里
private final BlockingQueue<Runnable> workQueue;
/**
* 一个全局的锁用来保护poolSize, corePoolSize,
* maximumPoolSize, runState, and workers set等变量的修改。
*/
private final ReentrantLock mainLock = new ReentrantLock();
/**
* awaitTermination方法里会用到这个条件
*/
private final Condition termination = mainLock.newCondition();
/**
* 用来执行用户提交的任务,可以理解为线程池中的线程
*/
private final HashSet<Worker> workers = new HashSet<Worker>();
/**
* worker线程从workQueue里poll任务的超时时间;
* 如果当前线程个数>corePoolSize,且allowCoreThreadTimeOut=true线程池会动态收缩线程的数目,注意:单位是纳秒
*/
private volatile long keepAliveTime;
/**
* 是否允许线程池动态收缩
*/
private volatile boolean allowCoreThreadTimeOut;
/**
* 当池中线程数小于这个值提交新任务时才可能会new一个thread
*/
private volatile int corePoolSize;
/**
* 线程池允许的最大线程数目
*/
private volatile int maximumPoolSize;
/**
* 线程池当前的线程数目
*/
private volatile int poolSize;
二、 shutdown函数
public void shutdown()
SecurityManager security = System.getSecurityManager();
if (security != null)
security.checkPermission(shutdownPerm);
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try
if (security != null) // Check if caller can modify our threads
for (Worker w : workers)
security.checkAccess(w.thread);
int state = runState;
if (state < SHUTDOWN)
// 一、runState设置成SHUTDOWN
runState = SHUTDOWN;
try
for (Worker w : workers)
// 二、中断所有空闲的workder线程
w.interruptIfIdle();
catch (SecurityException se) // Try to back out
runState = state;
// tryTerminate() here would be a no-op
throw se;
// 三、尝试terminate线程池(如果满足条件)
tryTerminate(); // Terminate now if pool and queue empty
finally
mainLock.unlock();
void interruptIfIdle()
final ReentrantLock runLock = this.runLock;
if (runLock.tryLock())
try
if (thread != Thread.currentThread())
thread.interrupt();
finally
runLock.unlock();
private void tryTerminate()
if (poolSize == 0)
int state = runState;
if (state < STOP && !workQueue.isEmpty())
state = RUNNING; // disable termination check below
Thread t = addThread(null);
if (t != null)
t.start();
if (state == STOP || state == SHUTDOWN)
runState = TERMINATED;
termination.signalAll();
terminated();
shutdown函数主要做了三个操作: 1、将线程池的状态设置成SHUTDOWN; 2、中断所有空闲的worker线程; 所谓的中断就是调用worker线程的interrupt()方法。如何判断一个worker线程是否空闲?答案就是worker的成员变量runLock,如果interruptIfIdle函数中runLock.tryLock成功,就能说明worker线程一定是空闲的,因为worker的runTask方法在执行任务前也是要获取runLock锁的,执行完毕会释放。 3、尝试终止线程池(把runState状态改成TERMINATED)。 如果当前线程池线程数目为0,两种情况需要考虑: a、线程池没有被STOP且workQueue不空,这种情况是需要处理workQueue里排队的任务的,因此要new一个thread来处理还在排队的任务; b、线程池被STOP或SHUTDOWN且workQueue里没有排队的任务了,此时直接将状态改成TERMINATED,且通知调用awaitTermination方法的线程(线程池TERMINATED了)
三、 shutdownNow函数
public List<Runnable> shutdownNow()
/*
* shutdownNow differs from shutdown only in that
* 1. runState is set to STOP,
* 2. all worker threads are interrupted, not just the idle ones, and
* 3. the queue is drained and returned.
*/
SecurityManager security = System.getSecurityManager();
if (security != null)
security.checkPermission(shutdownPerm);
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try
if (security != null) // Check if caller can modify our threads
for (Worker w : workers)
security.checkAccess(w.thread);
int state = runState;
if (state < STOP)
runState = STOP; // 一、状态改成STOP,和shutdown不同
try
for (Worker w : workers)
w.interruptNow(); // 二、中断所有的worker,不管是不是空闲的
catch (SecurityException se) // Try to back out
runState = state;
// tryTerminate() here would be a no-op
throw se;
// 三、返回workQueue里排队的任务
List<Runnable> tasks = drainQueue(); // 四、尝试关闭线程池
tryTerminate(); // Terminate now if pool and queue empty
return tasks;
finally
mainLock.unlock();
void interruptNow()
thread.interrupt();
shutdownNow函数结构和shutdown结构类似,有些区别: 1、把线程池的状态设置成STOP; 2、调用worker的interruptNow方法,即中断所有worker线程,不管是不是空闲的; 3、这个函数会返回当前workQueue里排队的任务列表,是带返回值的,这些队列里的任务会被删除永远不会被执行了。 小提醒:thread的interrupt方法仅仅是线程的中断标示,某些阻塞方法如thread.sleep join wait及某些IO操作会响应中断并抛出InterruptedException,如果自定义的task是个计算密集的操作或其他不响应中断的操作,shutdownNow调用后还是要等线程执行结束后才能变成TERMINATED状态,要是你的任务是个死循环且不会抛出InterruptedException,那线程池就永远关不掉了。
四、 execute函数
public void execute(Runnable command)
if (command == null)
throw new NullPointerException();
if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command))
if (runState == RUNNING && workQueue.offer(command))
// 防止并发情况,比如另一个线程在本线程调用玩workQueue.offer后就执行了shutdonwNow方法
// 这时需要把刚提交的任务从workQueue中remove掉
if (runState != RUNNING || poolSize == 0)
ensureQueuedTaskHandled(command);
else if (!addIfUnderMaximumPoolSize(command))
reject(command); // is shutdown or saturated
private boolean addIfUnderCorePoolSize(Runnable firstTask)
Thread t = null;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try
// 如果当前池中线程数量小于corePoolSize且池是RUNNING的,则new一个thread立即执行提交的任务
if (poolSize < corePoolSize && runState == RUNNING)
t = addThread(firstTask);
finally
mainLock.unlock();
if (t == null)
return false;
t.start();
return true;
private boolean addIfUnderMaximumPoolSize(Runnable firstTask)
Thread t = null;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try
// 线程会优先入workQueue的,如果入队失败就会执行到这里;
// 如果池中线程数量小于最大值且线程是RUNNING的,则立马new个thread立即执行刚提交的任务
if (poolSize < maximumPoolSize && runState == RUNNING)
t = addThread(firstTask);
finally
mainLock.unlock();
if (t == null)
return false;
t.start();
return true;
private void ensureQueuedTaskHandled(Runnable command)
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
boolean reject = false;
Thread t = null;
try
int state = runState;
// 感觉这里条件state != RUNNING是有问题的,因为SHUTDOWN状态是可以处理队列里的任务的,不能remove掉;应该改成state>=STOP
if (state != RUNNING && workQueue.remove(command))
reject = true;
// 如果队列不空则需要保留一些线程来处理队列里的任务
else if (state < STOP &&
poolSize < Math.max(corePoolSize, 1) &&
!workQueue.isEmpty())
t = addThread(null);
finally
mainLock.unlock();
if (reject)
reject(command);
else if (t != null)
t.start();
/**
* 如果不指定该参数,则默认为DefaultThreadFactory类,线程名字格式为pool-poolNumber-thread-theadNumber,每创建一个线程池poolNumber就会增1,每创建一个线程,该池的threadNumber就会增1。
* 线程是非后台线程,优先级为NORM_PRIORITY(5)
*/
private Thread addThread(Runnable firstTask)
Worker w = new Worker(firstTask);
Thread t = threadFactory.newThread(w);
if (t != null)
w.thread = t;
workers.add(w);
int nt = ++poolSize;
if (nt > largestPoolSize)
largestPoolSize = nt;
return t;
execute方法是执行提交的任务(不一定立马执行,可能是先放在队列里、也有可能直接拒绝了),分两种情况: 情况1:poolSize<corePoolSize,调用addIfUnderCorePoolSize方法新增一个worker 情况2:poolSize>=corePoolSize, ①尝试调用workQueue.offer方法,如果入队列失败则到步骤②; ②addIfUnderMaximumPoolSize方法扩大线程池中可用线程个数目,如果这步也失败了则到③; ③执行reject方法
五、 内部类Worker
private final class Worker implements Runnable
// 每次运行task都要获取这个锁防止interrupt方法把正在运行的任务也中断了
private final ReentrantLock runLock = new ReentrantLock();
private Runnable firstTask;
// 用来累计这个worker已经完成了多少个任务了,worker退出时可能需要把这个值累计到线程池已经完成的任务上
volatile long completedTasks;
// 运行这个worker的线程
Thread thread;
主循环:
public void run()
try
Runnable task = firstTask;
firstTask = null;
while (task != null || (task = getTask()) != null)
runTask(task);
task = null;
finally
workerDone(this);
Runnable getTask()
for (;;)
try
int state = runState;
if (state > SHUTDOWN)
return null;
Runnable r;
// 如果仅仅是SHUTDOWN,队列里的任务还是要执行的
if (state == SHUTDOWN) // Help drain queue
r = workQueue.poll();
// 如果当前线程数大于corePoolSize或者允许动态收缩,则采用超时获取的方式(因为如果workQueue一直为空线程会一直挂住,就不能动态收缩了)
else if (poolSize > corePoolSize || allowCoreThreadTimeOut)
r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS);
else
// 否则,无限等待任务
r = workQueue.take();
if (r != null)
return r;
// 当前worker线程是否可以退出
if (workerCanExit())
// 如果此时线程池已经SHUTDOWN了,那么需要向其他空闲线程
// 发个中断信号不要一直阻塞在workQueue.take或poll方法中,线程可能需要被回收掉
if (runState >= SHUTDOWN) // Wake up others
interruptIdleWorkers();
return null;
// Else retry
catch (InterruptedException ie)
// 被中断了,继续循环检查线程池的状态
// 当workQueue中没有任务时会调用这个方法
// worker退出的条件:①线程池STOP了;②线程池虽然是RUNNING或者SHUTDOWN,但是任务队列空了;③允许动态收缩且当前线程数大于corePoolSize
private boolean workerCanExit()
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
boolean canExit;
try
canExit = runState >= STOP ||
workQueue.isEmpty() ||
(allowCoreThreadTimeOut &&
poolSize > Math.max(1, corePoolSize));
finally
mainLock.unlock();
return canExit;
private void runTask(Runnable task)
final ReentrantLock runLock = this.runLock;
// 加个锁表明该worker是busy的
runLock.lock();
try
/*
* 除非线程池状态是STOP,否则任务运行前需要清空中断标识;
* 但是万一”runState < STOP && Thread.interrupted()”
* 之间穿插了一个shutdownNow了该怎么办?因此,需要再次检查下
* runState的状态,如果是STOP则要重置下异常标识位
*/
if (runState < STOP &&
Thread.interrupted() &&
runState >= STOP)
thread.interrupt();
/*
* 一个标志位,保证task.run方法抛出RuntimeException后是否需要回调afterExecute方法
*/
boolean ran = false;
beforeExecute(thread, task);
try
task.run();
ran = true;
afterExecute(task, null);
++completedTasks;
catch (RuntimeException ex)
if (!ran)
afterExecute(task, ex);
throw ex;
finally
runLock.unlock();
void workerDone(Worker w)
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try
// 把worker线程完成的任务数累计到线程池中,并从workers中删除掉
completedTaskCount += w.completedTasks;
workers.remove(w);
if (--poolSize == 0)
tryTerminate();
finally
mainLock.unlock();
private void tryTerminate()
// 线程池状态变成TERMINATED的条件是:
// 1、poolSize=0、SHUTDOWN且workQueue为空
// 2、poolSize=0、STOP
// 否则,至少要保证线程池中有一个线程处理队列里的消息
if (poolSize == 0)
int state = runState;
if (state < STOP && !workQueue.isEmpty())
state = RUNNING; // disable termination check below
Thread t = addThread(null);
if (t != null)
t.start();
if (state == STOP || state == SHUTDOWN)
runState = TERMINATED;
termination.signalAll();
terminated();
Worker.run方法很清晰,循环从workQueue里获取task(getTask),然后执行task(runTask),如果获取不到task,线程就退出了,注意:如果你的task里抛出异常,那么这个worker线程也会退出,但是不要紧线程池会再new一个补充的。
以上是关于Java 1.6 ThreadPoolExecutor源码解析的主要内容,如果未能解决你的问题,请参考以下文章
Java 1.6 HttpsURLConnection:java.net.SocketException:连接重置