浅析ThreadPoolExecutor
Posted 乔不思
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了浅析ThreadPoolExecutor相关的知识,希望对你有一定的参考价值。
ThreadPoolExecutor是jdk自带的线程池实现,其中他有四种常用的线程池模式通过ExecutorService获取:newSingleThreadExecutor,newSingleThreadExecutor,newCachedThreadPool,newFixedThreadPool,这四中是jdk自己经过对ThreadPoolExecutor的封装实现不同的线程池类型,今天我们来大概理解下ThreadPoolExecutor。
线程池的构建与接收任务
这块就大概阐述下,好多博客都在说这个。
线程初始化,通过int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler
这些参数,其中
- corePoolSize 是线程池的核心线程数。
- maxPoolSize 是最大线程数
- keepAliveTime 线程池的空闲线程保持活跃的时间,前提是线程池的线程数大于corePoolSize,设置这个的目的是为了让jvm不来回的创建线程,已创建线程的任务已经提交归还给了线程池,那么这个线程不会里面走到terminal,而是会等待keepAliveTime的时间,看看是不是有新的任务进来,如果有新的任务,就直接拿取任务去执行,如果没有,则过KeepAliveTime的时间就自动消亡。
- unit 标识keepAliveTime的时间单位
- workQueue 当线程池的线程数已经大于coolPoolSize但是小于maxPoolSize的时候,就会先把线程先offer进阻塞队列,等到有空闲现成的时候就去阻塞队列中取。
- threadFactory 线程工厂,可不传,设定现成的创建方式 ,有默认的线程工厂
- handler 线程池的拒绝策略或者也叫抛弃策略,当线程数达到最大线程数或者线程池已经执行了shutDown或者shutDownNow方法到达shutdown,stop或者terminated状态的时候,就会执行这个策略,有四种:AbortPolicy 直接抛RejectedExecutionException;DiscardPolicy 直接丢弃该任务;DiscardOldestPolicy 丢弃等待队列中最老的任务;CallerRunsPolicy 创建一个新的线程执行该任务(当线程池没有执行shutdown()或者shutdownDown()方法的时候,也就是说处于运行中的状态)。
接受线程:
当一个任务到来时,首先会去判断当前线程池的poolSize是不是大于corePoolSize,如果小于,则直接创建新的线程执行;当poolSize达到corePoolSize,就会直接往workQueue中放,当workQueue存放满了,就会判断poolSize是不是小于maxPoolSize,如果小于,则创建新的线程,如果大于,则执行抛弃策略。
具体看ThreadPoolExecutor的execute方法jdk1.8
“`java
public void execute(Runnable command)
if (command == null)
throw new NullPointerException();
//先去判断poolSize和corePoolSize的大小,|| 是短路运算符,首次添加肯定会走后面的addIfUnderCorePoolSize方法
//这个方法就是党poolSize小于corePoolSize的时候直接创建线程执行任务
//当poolSize >=corePoolSize的时候就直接执行workQueue.offer(command) 这个方法,如果存放失败,则直接返回
//false,则执行addIfUnderMaximumPoolSize方法,这个会判断是不是poolSize是不是大于maxPoolSize,如果大于则
//返回false,不大于则创建线程执行任务返回true.
if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command))
if (runState == RUNNING && workQueue.offer(command))
if (runState != RUNNING || poolSize == 0)
//保证线程池执行了shutDown或者shutDownNow,不接受新的任务。如果是shutDownNow则直接执行抛弃策略,如果是shutDown则让保证至少有一个线程,把队列中待执行的任务执行完。
ensureQueuedTaskHandled(command);
else if (!addIfUnderMaximumPoolSize(command))
reject(command); // is shutdown or saturated
这里就不帖 具体的方法代码了,可以自己看下。
execute方法和submit方法
execute方法是ThreadPoolExecutor实现接口Executor的方法,而submit则是实现接口ExecutorService的方法,ExecutorService描述了线程池的生命周期 ExecutorService继承了Executor。有点绕。。 自己看源码
在ThreadPoolExecutor中是找不到submit方法的,发现是在其父抽象类AbstractExecutorService中实现的,见下面代码:
“`java
public Future<?> submit(Runnable task)
if (task == null) throw new NullPointerException();
RunnableFuture<Object> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
public <T> Future<T> submit(Runnable task, T result)
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task, result);
execute(ftask);
return ftask;
public <T> Future<T> submit(Callable<T> task)
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
可以看见submit是支持Callable和Runnable参数的,并且有返回值。这个返回值刚好是一个RunnableFutrue,RunnableFutrue具有Runnable和Future的特性,也就是说能作为线程入参并且可以带返回值,返回值通过Futrue的get方法得到,代码中使用newTaskFor方法实现Callable,Runable到RunnableFutrue的转换,可以理解为newTaskFor方法就是一个适配器,将Runnable和Callable参数适配成为RunnableFutrue对象,转换完成后然后调用ThreadPoolExecutor的execute方法,则为上面execute方法,可见 execute没有返回值,并且接受的参数是Runnable对象。
shutDown和shutDownNow方法
ThreadPoolExecutor有四种状态:
-static final int RUNNING = 0;
-static final int SHUTDOWN = 1;
-static final int STOP = 2;
-static final int TERMINATED = 3;
转化关系:
* RUNNING -> SHUTDOWN
* On invocation of shutdown(), perhaps implicitly in finalize()
* (RUNNING or SHUTDOWN) -> STOP
* On invocation of shutdownNow()
* SHUTDOWN -> TERMINATED
* When both queue and pool are empty
* STOP -> TERMINATED
* When pool is empty
当线程池调用了shutDown方法,线程池不会里面停止,只是将线程状态转化为SHUTDOWN状态,并且尝试中断正在执行中的线程,线程中断调用的是Thread的interrupt方法(该方法只有在中断监听方法 wait,park,sleep等才能中断线程,其他时候不会中断线程,只是修改了现成的中断标识为true,而不会停止线程的执行) 然后会等到workQueue中的任务都被执行完才会到达TERMINATED状态。
具体代码:
“`java
public void shutdown()
/*
* Conceptually, shutdown is just a matter of changing the
* runState to SHUTDOWN, and then interrupting any worker
* threads that might be blocked in getTask() to wake them up
* so they can exit. Then, if there happen not to be any
* threads or tasks, we can directly terminate pool via
* tryTerminate. Else, the last worker to leave the building
* turns off the lights (in workerDone).
*
* But this is made more delicate because we must cooperate
* with the security manager (if present), which may implement
* policies that make more sense for operations on Threads
* than they do for ThreadPools. This requires 3 steps:
*
* 1. Making sure caller has permission to shut down threads
* in general (see shutdownPerm).
*
* 2. If (1) passes, making sure the caller is allowed to
* modify each of our threads. This might not be true even if
* first check passed, if the SecurityManager treats some
* threads specially. If this check passes, then we can try
* to set runState.
*
* 3. If both (1) and (2) pass, dealing with inconsistent
* security managers that allow checkAccess but then throw a
* SecurityException when interrupt() is invoked. In this
* third case, because we have already set runState, we can
* only try to back out from the shutdown as cleanly as
* possible. Some workers may have been killed but we remain
* in non-shutdown state (which may entail tryTerminate from
* workerDone starting a new worker to maintain liveness.)
*/
SecurityManager security = System.getSecurityManager();
if (security != null)
security.checkPermission(shutdownPerm);
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try
if (security != null) // Check if caller can modify our threads
for (Worker w : workers)
security.checkAccess(w.thread);
int state = runState;
if (state < SHUTDOWN)
//修改状态为SHUTDOWN
runState = SHUTDOWN;
try
for (Worker w : workers)
//尝试中断线程
w.interruptIfIdle();
catch (SecurityException se) // Try to back out
runState = state;
// tryTerminate() here would be a no-op
throw se;
//如果poolSize为0,但是workQueue中还有任务,则修改状态为RUNNING,开启一个线程执行workQueue中的任务,如果workQueue为空,则直接修改线程池状态为TERMINATED
tryTerminate(); // Terminate now if pool and queue empty
finally
mainLock.unlock();
可以看出shutDown没有返回值,并且是优雅停线程池
shutDownNow的方法:
“`java
public List shutdownNow()
/*
* shutdownNow differs from shutdown only in that
* 1. runState is set to STOP,
* 2. all worker threads are interrupted, not just the idle ones, and
* 3. the queue is drained and returned.
*/
SecurityManager security = System.getSecurityManager();
if (security != null)
security.checkPermission(shutdownPerm);
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try
if (security != null) // Check if caller can modify our threads
for (Worker w : workers)
security.checkAccess(w.thread);
int state = runState;
if (state < STOP)
//修改状态为STOP
runState = STOP;
try
for (Worker w : workers)
//尝试中断
w.interruptNow();
catch (SecurityException se) // Try to back out
runState = state;
// tryTerminate() here would be a no-op
throw se;
//得到workQueue的所有任务
List<Runnable> tasks = drainQueue();
tryTerminate(); // Terminate now if pool and queue empty
//但会任务队列
return tasks;
finally
mainLock.unlock();
则是直接修改线程状态为STOP,尝试中指所有线程,和shutDown一样
得到workQueue的所有任务,并且返回,如果状态大于RUNNING (SHUTDOWN,STOP,TERMIANTED)则线程池不在接收新的任务。显然shutDownNow不够优雅,谨慎选择。
tryTerminal方法:
“`java
private void tryTerminate()
if (poolSize == 0)
int state = runState;
//shutDown走这里会先去判断workQueue是否为空
if (state < STOP && !workQueue.isEmpty())
state = RUNNING; // disable termination check below
Thread t = addThread(null);
if (t != null)
t.start();
//shutDownNow直接走这里,直接TERMINATED
if (state == STOP || state == SHUTDOWN)
runState = TERMINATED;
termination.signalAll();
terminated();
ThreadPoolExecutor的Worker
Worker类可以看作是ThreadPoolExecuter线程池中受理任务的活跃线程。
“`java
private final class Worker implements Runnable
/**
* The runLock is acquired and released surrounding each task
* execution. It mainly protects against interrupts that are
* intended to cancel the worker thread from instead
* interrupting the task being run.
*/
private final ReentrantLock runLock = new ReentrantLock();
/**
* Initial task to run before entering run loop. Possibly null.
*/
private Runnable firstTask;
/**
* Per thread completed task counter; accumulated
* into completedTaskCount upon termination.
*/
volatile long completedTasks;
/**
* Thread this worker is running in. Acts as a final field,
* but cannot be set until thread is created.
*/
Thread thread;
Worker(Runnable firstTask)
this.firstTask = firstTask;
boolean isActive()
return runLock.isLocked();
/**
* Interrupts thread if not running a task.
*/
void interruptIfIdle()
final ReentrantLock runLock = this.runLock;
if (runLock.tryLock())
try
if (thread != Thread.currentThread())
thread.interrupt();
finally
runLock.unlock();
/**
* Interrupts thread even if running a task.
*/
void interruptNow()
thread.interrupt();
/**
* Runs a single task between before/after methods.
*/
private void runTask(Runnable task)
final ReentrantLock runLock = this.runLock;
runLock.lock();
try
/*
* Ensure that unless pool is stopping, this thread
* does not have its interrupt set. This requires a
* double-check of state in case the interrupt was
* cleared concurrently with a shutdownNow -- if so,
* the interrupt is re-enabled.
*/
if (runState < STOP &&
Thread.interrupted() &&
runState >= STOP)
thread.interrupt();
/*
* Track execution state to ensure that afterExecute
* is called only if task completed or threw
* exception. Otherwise, the caught runtime exception
* will have been thrown by afterExecute itself, in
* which case we don't want to call it again.
*/
boolean ran = false;
//待实现方法,子类实现,
beforeExecute(thread, task);
try
task.run();
ran = true;
//待实现方法,子类实现,
afterExecute(task, null);
++completedTasks;
catch (RuntimeException ex)
if (!ran)
afterExecute(task, ex);
throw ex;
finally
runLock.unlock();
/**
* Main run loop
*/
public void run()
try
Runnable task = firstTask;
firstTask = null;
//firstTask保存当前该线程受理的任务,如果没有任务则会执行getTask方法。
//具体参看getTask方法,这块用的while循环,说明如果firstTask为空则一直会执行getTask方法,
//直到getTask方法返回null
while (task != null || (task = getTask()) != null)
runTask(task);
task = null;
finally
//pollSize--
workerDone(this);
“`java
Runnable getTask()
for (;;)
try
int state = runState;
//返回null
if (state > SHUTDOWN)
return null;
Runnable r;
//执行了shutdown方法,则需要吧workQueue中的任务执行完
if (state == SHUTDOWN) // Help drain queue
r = workQueue.poll();
//设置了keepAliveTime或者workQueue中有可能有待处理的
//任务(poolSize > corePoolSize 表示如果有任务出来,会往workQueue中放,或者直接创建线程 –maxPoolSize>poolSize>corePoolSize)
else if (poolSize > corePoolSize || allowCoreThreadTimeOut)
//如果keepAliaveTime的时间没拿到任务,r就为null,则该Worker就销毁了执行workerCanExit方法
r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS);
else
//直接阻塞等到workQueue的任务
r = workQueue.take();
if (r != null)
return r;
if (workerCanExit())
if (runState >= SHUTDOWN) // Wake up others
interruptIdleWorkers();
return null;
// Else retry
catch (InterruptedException ie)
// On interruption, re-check runState
worker的代码相对比较好理解。排版很乱,大概写一下!
线程池经常用,大概写一下,好多代码没贴出来,可以翻看jdk源码
以上是关于浅析ThreadPoolExecutor的主要内容,如果未能解决你的问题,请参考以下文章
Java 线程池 ThreadPoolExecutor 八种拒绝策略浅析