Java中的线程池——ThreadPoolExecutor源代码分析
Posted WSYW126
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Java中的线程池——ThreadPoolExecutor源代码分析相关的知识,希望对你有一定的参考价值。
线程池ThreadPoolExecutor的使用说明和变量的定义
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); // 使用一个ctl同时维护线程池的状态和线程数量,不仅仅是为了通过位运算提高效率,能够有效避免两者不一致的情况,如果2个地方存储,可能需要锁去保证一致性。(因为线程池源码同时操作线程池状态和线程数量的地方挺多的)(volatile)
private static final int COUNT_BITS = Integer.SIZE - 3; //32-3=29
private static final int CAPACITY = (1 << COUNT_BITS) - 1; // 00028个11
// runState is stored in the high-order bits 把运行状态保存在最高的三位
private static final int RUNNING = -1 << COUNT_BITS; //11128个00
private static final int SHUTDOWN = 0 << COUNT_BITS; //00028个00
private static final int STOP = 1 << COUNT_BITS; //00128个00
private static final int TIDYING = 2 << COUNT_BITS; //01028个00
private static final int TERMINATED = 3 << COUNT_BITS; //01128个00
// Packing and unpacking ctl
private static int runStateOf(int c) return c & ~CAPACITY; //~CAPACITY = 11128个00 ,位运算取最高三位
private static int workerCountOf(int c) return c & CAPACITY; //位运算取除去最高三位后的29位。
private static int ctlOf(int rs, int wc) return rs | wc; //或运算,拼装ctl
/*
* Bit field accessors that don't require unpacking ctl.
* These depend on the bit layout and on workerCount being never negative.
*/
private static boolean runStateLessThan(int c, int s)
return c < s; //比较运行状态
private static boolean runStateAtLeast(int c, int s)
return c >= s; //比较运行状态
private static boolean isRunning(int c)
return c < SHUTDOWN; //不能用等于,因为只有最高三位是状态位。
/**
* Attempts to CAS-increment the workerCount field of ctl.
*/
private boolean compareAndIncrementWorkerCount(int expect)
return ctl.compareAndSet(expect, expect + 1); //CAS
/**
* Attempts to CAS-decrement the workerCount field of ctl.
*/
private boolean compareAndDecrementWorkerCount(int expect)
return ctl.compareAndSet(expect, expect - 1); //CAS
/**
* Decrements the workerCount field of ctl. This is called only on
* abrupt termination of a thread (see processWorkerExit). Other
* decrements are performed within getTask.
*/
private void decrementWorkerCount()
do while (! compareAndDecrementWorkerCount(ctl.get())); //自旋等待
/**
* The queue used for holding tasks and handing off to worker
* threads. We do not require that workQueue.poll() returning
* null necessarily means that workQueue.isEmpty(), so rely
* solely on isEmpty to see if the queue is empty (which we must
* do for example when deciding whether to transition from
* SHUTDOWN to TIDYING). This accommodates special-purpose
* queues such as DelayQueues for which poll() is allowed to
* return null even if it may later return non-null when delays
* expire.
*/
private final BlockingQueue<Runnable> workQueue; //任务队列用于放提交到线程池的任务,并有任务线程处理。
/**
* Lock held on access to workers set and related bookkeeping.
* While we could use a concurrent set of some sort, it turns out
* to be generally preferable to use a lock. Among the reasons is
* that this serializes interruptIdleWorkers, which avoids
* unnecessary interrupt storms, especially during shutdown.
* Otherwise exiting threads would concurrently interrupt those
* that have not yet interrupted. It also simplifies some of the
* associated statistics bookkeeping of largestPoolSize etc. We
* also hold mainLock on shutdown and shutdownNow, for the sake of
* ensuring workers set is stable while separately checking
* permission to interrupt and actually interrupting.
*/
private final ReentrantLock mainLock = new ReentrantLock(); //当需要访问任务线程集合和相关的记录需要,加锁。
/**
* Set containing all worker threads in pool. Accessed only when
* holding mainLock.
*/
private final HashSet<Worker> workers = new HashSet<Worker>(); //线程池任务线程集,当持有mainLock锁时,可以访问线程池任务线程集
/**
* Wait condition to support awaitTermination
*/
private final Condition termination = mainLock.newCondition(); //等待线程池结束条件
/**
* Tracks largest attained pool size. Accessed only under
* mainLock.
*/
private int largestPoolSize; //在持有mainLock的情况下,追踪最大线程池
/**
* Counter for completed tasks. Updated only on termination of
* worker threads. Accessed only under mainLock.
*/
private long completedTaskCount; //在持有mainLock的情况下,可以访问,completedTaskCount为完成任务计数器,在任务线程结束时更新。
/*
* All user control parameters are declared as volatiles so that
* ongoing actions are based on freshest values, but without need
* for locking, since no internal invariants depend on them
* changing synchronously with respect to other actions.
*/
/**
* Factory for new threads. All threads are created using this
* factory (via method addWorker). All callers must be prepared
* for addWorker to fail, which may reflect a system or user's
* policy limiting the number of threads. Even though it is not
* treated as an error, failure to create threads may result in
* new tasks being rejected or existing ones remaining stuck in
* the queue.
*
* We go further and preserve pool invariants even in the face of
* errors such as OutOfMemoryError, that might be thrown while
* trying to create threads. Such errors are rather common due to
* the need to allocate a native stack in Thread.start, and users
* will want to perform clean pool shutdown to clean up. There
* will likely be enough memory available for the cleanup code to
* complete without encountering yet another OutOfMemoryError.
*/
private volatile ThreadFactory threadFactory; //创建任务线程的工厂
/**
* Handler called when saturated or shutdown in execute.
*/
private volatile RejectedExecutionHandler handler; //当线程池饱和或线程池关闭时,拒绝任务处理handler
/**
* Timeout in nanoseconds for idle threads waiting for work.
* Threads use this timeout when there are more than corePoolSize
* present or if allowCoreThreadTimeOut. Otherwise they wait
* forever for new work.
*/
private volatile long keepAliveTime; //线程池空闲任务线程,等待任务的时间。如果当前线程数量大于核心线程池数量,且allowCoreThreadTimeOut为true,任务线程空闲,允许等待keepAliveTime时间,以便在这个时间范围内,有任务需要执行。
/**
* If false (default), core threads stay alive even when idle.
* If true, core threads use keepAliveTime to time out waiting
* for work.
*/
private volatile boolean allowCoreThreadTimeOut; //在当前线程数量大于核心线程池数量的情况下,是否允许空闲任务线程等,保活keepAliveTime时间,等待任务的到来。
/**
* Core pool size is the minimum number of workers to keep alive
* (and not allow to time out etc) unless allowCoreThreadTimeOut
* is set, in which case the minimum is zero.
*/
private volatile int corePoolSize; //在不允许空闲等待的情况,核心线程池数量,即保活的任务线程最小数量。如果允许空闲等待,线程池任务线程可能为0。
/**
* Maximum pool size. Note that the actual maximum is internally
* bounded by CAPACITY.
*/
private volatile int maximumPoolSize; // 最大线程池数量,如果容量是有界的,实际为CAPACITY
/**
* The default rejected execution handler
*/
private static final RejectedExecutionHandler defaultHandler =
new AbortPolicy(); //默认的拒绝任务策略,抛出运行时异常
小结
ThreadPoolExecutor的变量主要有:
核心线程池数量corePoolSize。
最大线程池数量maximumPoolSize。
allowCoreThreadTimeOut 允许空闲任务线程。
保活keepAliveTime时间,等待新任务的到来。
线程工厂ThreadFactory用于创建任务线程。
拒绝任务处理器RejectedExecutionHandler,默认的拒绝任务策略为AbortPolicy,抛出运行时异常。还有:直接丢弃策略DiscardPolicy、丢弃旧的任务DiscardOldestPolicy、调用者执行任务策略CallerRunsPolicy。
上面的变量为volatile,保证可见性,以便线程池执行操作时,可以使用最新的变量。
阻塞的任务队列final BlockingQueue<Runnable> workQueue。
AtomicInteger的ctl用于包装线程状态runState和任务线程数workerCount。
任务线程集final HashSet<Worker> workers。
largestPoolSize记录线程池的最大任务线程数。
completedTaskCount为完成任务计数器,在任务线程结束时更新。
可重入锁mainLock,用于保护非线程安全的变量如workers,largestPoolSize,completedTaskCount。
等待线程池结束条件termination,用于控制超时等待线程池关闭。
CTL
使用一个ctl同时维护线程池的状态和线程数量,不仅仅是为了通过位运算提高效率,能够有效避免两者不一致的情况,如果2个地方存储,可能需要锁去保证一致性。(因为线程池源码同时操作线程池状态和线程数量的地方挺多的)
状态解释
- RUNNING是运行状态,线程池可以接收新任务
- SHUTDOWN是在调用shutdown()方法以后处在的状态。表示不再接收新任务,但队列中的任务可以执行完毕
- STOP是在调用shutdownNow()方法以后的状态。不再接收新任务,中断正在执行的任务,抛弃队列中的任务
- TIDYING表示所有任务都执行完毕
- TERMINATED为中止状态,调用terminated()方法后,尝试更新为此状态
状态变迁过程
* RUNNING -> SHUTDOWN
* On invocation of shutdown(), perhaps implicitly in finalize()
* (RUNNING or SHUTDOWN) -> STOP
* On invocation of shutdownNow()
* SHUTDOWN -> TIDYING
* When both queue and pool are empty
* STOP -> TIDYING
* When pool is empty
* TIDYING -> TERMINATED
* When the terminated() hook method has completed
线程池执行提交任务和执行任务
执行任务的方法
public void execute(Runnable command)
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
1.如果工作线程小于核心线程池数量,尝试新建一个工作线程执行任务addWorker。addWorker将会自动检查线程池状态和工作线程数,以防在添加工作线程的过程中,线程池被关闭。
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
2.如果创建工作线程执行任务失败,则任务入队列,如果入队列成功,我们仍需要二次检查线程池状态,以防在入队列的过程中,线程池关闭。如果线程池关闭,则回滚任务。
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
如果任务入队列失败,则尝试创建一个工作线程执行任务
int c = ctl.get();
if (workerCountOf(c) < corePoolSize)
if (addWorker(command, true))
return;
c = ctl.get();
if (isRunning(c) && workQueue.offer(command))
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
else if (!addWorker(command, false))
reject(command);
- 判断command是否为空
- 计算线程池中的线程数量,如果数量小于corePoolSize,就创建一个新线程执行任务
- 如果线程池正在运行状态,且写入队列成功。
- 再次获取线程池状态判断,如果线程状态变成了非运行状态,就从队列中移除任务,调用reject()方法执行饱和策略handler
- 如果线程池为空,就创建一个新线程执行任务。【在合适的时机,队列中的任务会被调度】
- 如果第3步判断没有通过,尝试建立线程执行任务,若没有成功,就执行饱和策略handler
添加线程池worker
private boolean addWorker(Runnable firstTask, boolean core)
retry:
for (;;)
int c = ctl.get(); //获取ctl
int rs = runStateOf(c);//获取运行状态
// Check if queue empty only if necessary. //判断运行状态和是否为core size=0,添加一个线程。
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;)
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null)
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null))
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
finally
mainLock.unlock();
if (workerAdded)
t.start();
workerStarted = true;
finally
if (! workerStarted)
addWorkerFailed(w);
return workerStarted;
- 获取当前线程池的状态,如果是STOP,TIDYING,TERMINATED状态的话,则会返回false。如果现在状态是SHUTDOWN,但是firstTask不为空或者workQueue为空的话,那么直接返回false。【防止core size为0,添加一个线程】
- 通过自旋的方式,来添加一个ctl计数。
- 判断要添加的Worker是否是corePool,如果是的话,那么判断当前的workerCount是否大于corePoolsize,否则判断是否大于maximumPoolSize。
- 如果workerCount超出了线程池大小,直接返回false。
- 如果小于的话,那么判断是否成功将WorkerCount通过CAS操作增加1,如果增加成功的话。则break retry,进行到第3步。
- 否则判断当前线程池的状态,如果现在获取到的状态与进入自旋的状态不一致的话,那么则通过continue retry重新进行状态的判断。
- 如果满足了的话,那么则创建一个新的Worker对象,然后获取线程池的重入锁后,判断当前线程池的状态,如果当前线程池状态为STOP,TIDYING,TERMINATED的话,那么调用decrementWorkerCount将workerCount减一,然后调用tryTerminate停止线程池,并且返回false。
- 如果状态满足的话,那么则在workers中将新创建的worker添加,并且重新计算largestPoolSize,然后启动Worker中的线程开始执行任务。
- 在finally中检查workerStarted,如果为false则走添加worker失败的逻辑。
- 加锁,判空。从工作线程集移除工作线程 ,然后工作线程数减-1。
- 检查是否线程池关闭,关闭则执行相关工作。
添加线程池worker失败
private void addWorkerFailed(Worker w)
final ReentrantLock mainLock = this.mainLock;
//加锁
mainLock.lock();
try
//判空
if (w != null)
//从工作线程集移除工作线程
workers.remove(w);
//然后工作线程数减-1
decrementWorkerCount();
//检查是否线程池关闭
tryTerminate();
finally
mainLock.unlock();
final void tryTerminate()
//自旋尝试关闭线程池
for (;;)
int c = ctl.get();
//如果线程池正在运行,或正在关闭且队列不为空,则返回
if (isRunning(c) ||
runStateAtLeast(c, TIDYING) ||
(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
return;
//如果工作线程不为空,则中断空闲工作线程
if (workerCountOf(c) != 0) // Eligible to terminate
interruptIdleWorkers(ONLY_ONE);
return;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try
//线程池已关闭,任务队列为空,工作线程为0,更新线程池状态为TIDYING
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0)))
try
//执行结束工作
terminated();
finally
//线程池已结束
ctl.set(ctlOf(TERMINATED, 0));
//唤醒等待线程池结束的线程
termination.signalAll();
return;
finally
mainLock.unlock();
// else retry on failed CAS
//如果onlyOne为true,只中断最多一个空闲工作线程
private void interruptIdleWorkers(boolean onlyOne)
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try
//遍历工作线程集
for (Worker w : workers)
Thread t = w.thread;
if (!t.isInterrupted() && w.tryLock()) //锁打开说明,工作线程空闲
try
//如果工作线程非中断,且空闲,尝试获取锁,获取锁成功,则中断工作线程
t.interrupt();
catch (SecurityException ignore)
finally
w.unlock();
//如果是只中断一个空闲线程,则结束本次中断空闲线程任务
if (onlyOne)
break;
finally
mainLock.unlock();
线程池执行任务
调用时机:addWorker里面会调用work.start,work.start即work.run被调用,runWorker被work.run调用,所以worker开始工作了。
final void runWorker(Worker w)
Thread wt = Thread.currentThread(); //当前线程
Runnable task = w.firstTask; //工作线程任务
w.firstTask = null;
//任务线程的锁状态默认为-1,此时解锁+1,变为0,即锁打开状态,允许中断,在任务未执行之前,不允许中断。
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try
//如果任务不为null,即创建工作线程成功,并执行任务,如果为null(即在线程池执行任务的时候,创建工作线程失败,任务入队列),从任务队列取一个任务。
while (task != null || (task = getTask()) != null)
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
//如果线程池正在Stop,则确保线程中断;
//如果非处于Stop之后的状态,则判断是否中断,如果中断则判断线程池是否已关闭
//如果线程池正在关闭,但没有中断,则中断线程池
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try
beforeExecute(wt, task);
Throwable thrown = null;
try
//执行任务
task.run();
catch (RuntimeException x)
thrown = x; throw x;
catch (Error x)
thrown = x; throw x;
catch (Throwable x)
thrown = x; throw new Error(x);
finally
afterExecute(task, thrown);
finally
//任务线程完成任务数量加1,释放锁
task = null;
w.completedTasks++;
w.unlock();
//任务已执行完不可以中断
completedAbruptly = false;
finally
processWorkerExit(w, completedAbruptly);
有两个关键点:取队列的任务逻辑、所有任务都执行完毕的处理逻辑。
取队列的任务逻辑
private Runnable getTask()
boolean timedOut = false; // Did the last poll() time out?
for (;;)
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
//如果线程池处于STOP状态,或者SHUTDOWN且等待队列为空,则工作线程数-1
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty()))
decrementWorkerCount();
return null;
int wc = workerCountOf(c);
// Are workers subject to culling?
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
//同时满足下面条件,则工作线程数-1
//1. 线程数大于最大线程数 或者 有时间限制且超时。
//2. 线程数大于1 或者 等待队列为空
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty()))
if (compareAndDecrementWorkerCount(c))
return null;
//减少工作线程数量失败
continue;
try
//如果非超时则直接take,否则等待keepAliveTime时间,poll任务
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
catch (InterruptedException retry)
timedOut = false;
所有任务都执行完毕的处理逻辑
private void processWorkerExit(Worker w, boolean completedAbruptly)
//如果是被突然中断的,需要对线程数-1。正常结束的,都已经在gettask中-1了。
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try
//汇总各个worker完成的任务
completedTaskCount += w.completedTasks;
//从工作线程集移除工作线程
workers.remove(w);
finally
mainLock.unlock();
//检查是否线程池关闭
tryTerminate();
int c = ctl.get();
if (runStateLessThan(c, STOP))
if (!completedAbruptly)
//设置最小线程数
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
//如果等待队列不为空,则min设置为1
if (min == 0 && ! workQueue.isEmpty())
min = 1;
//检查现有线程数是否大于需要的线程数。
if (workerCountOf(c) >= min)
return; // replacement not needed
//添加线程
addWorker(null, false);
小结
这章节所有的函数调用关系,如下图:
函数调用关系,也可以当做思维导图来记忆~
线程池关闭
shutdown
/**
* Initiates an orderly shutdown in which previously submitted
* tasks are executed, but no new tasks will be accepted.
* Invocation has no additional effect if already shut down.
*
先前提交的任务将会被工作线程执行,新的线程将会被拒绝。这个方法
不会等待提交的任务执行完,我们可以用awaitTermination来等待任务执行完。
* <p>This method does not wait for previously submitted tasks to
* complete execution. Use @link #awaitTermination awaitTermination
* to do that.
*
* @throws SecurityException @inheritDoc
*/
public void shutdown()
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try
//检查线程访问权限
checkShutdownAccess();
//更新线程池状态为SHUTDOWN
advanceRunState(SHUTDOWN);
//中断空闲工作线程
interruptIdleWorkers();
//线程池关闭hook
onShutdown(); // hook for ScheduledThreadPoolExecutor
finally
mainLock.unlock();
tryTerminate();
private void checkShutdownAccess()
SecurityManager security = System.getSecurityManager();
if (security != null)
security.checkPermission(shutdownPerm);
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try
for (Worker w : workers)
//遍历工作线程集,检查任务线程访问权限
security.checkAccess(w.thread);
finally
mainLock.unlock();
private void advanceRunState(int targetState)
for (;;)
int c = ctl.get();
if (runStateAtLeast(c, targetState) ||
ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))
break;
private void interruptIdleWorkers()
interruptIdleWorkers(false);
private void interruptIdleWorkers(boolean onlyOne)
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try
for (Worker w : workers)
//遍历工作线程集
Thread t = w.thread;
if (!t.isInterrupted() && w.tryLock()) //锁打开说明,工作线程空闲
try
//如果工作线程非中断,且空闲,尝试获取锁,获取锁成功,则中断工作线程
t.interrupt();
catch (SecurityException ignore)
finally
w.unlock();
if (onlyOne)
//如果是只中断一个空闲线程,则结束本次中断空闲线程任务
break;
finally
mainLock.unlock();
shutdownNow
/**
* Attempts to stop all actively executing tasks, halts the
* processing of waiting tasks, and returns a list of the tasks
* that were awaiting execution. These tasks are drained (removed)
* from the task queue upon return from this method.
*
尝试停止正在执行的任务,停止等待任务线程的处理,任务队列将会被排空,
并返回任务队列中的任务集。
这个方法不会等待已执行的任务结束,我们用awaitTermination来等待任务执行完
* <p>This method does not wait for actively executing tasks to
* terminate. Use @link #awaitTermination awaitTermination to
* do that.
*
* <p>There are no guarantees beyond best-effort attempts to stop
* processing actively executing tasks. This implementation
* cancels tasks via @link Thread#interrupt, so any task that
* fails to respond to interrupts may never terminate.
*
* @throws SecurityException @inheritDoc
*/
public List<Runnable> shutdownNow()
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try
//检查工作线程权限
checkShutdownAccess();
//更新线程池状态为STOP
advanceRunState(STOP);
//中断空闲工作线程
interruptWorkers();
//清空任务队列,并放到tasks集合中
tasks = drainQueue();
finally
mainLock.unlock();
//尝试结束线程池
tryTerminate();
return tasks;
/**
* Drains the task queue into a new list, normally using
* drainTo. But if the queue is a DelayQueue or any other kind of
* queue for which poll or drainTo may fail to remove some
* elements, it deletes them one by one.
*/
private List<Runnable> drainQueue()
//这个方法很简单,不再说了
BlockingQueue<Runnable> q = workQueue;
List<Runnable> taskList = new ArrayList<Runnable>();
q.drainTo(taskList);
if (!q.isEmpty())
for (Runnable r : q.toArray(new Runnable[0]))
if (q.remove(r))
taskList.add(r);
return taskList;
立即关闭线程与关闭线程池的不同是,对于关闭线程池,先前提交的任务将会被工作线程执行,新的线程将会被拒绝;而立即关闭线程,尝试停止正在执行的任务,停止等待任务线程的处理,任务队列将会被排空,并返回任务队列中的任务集。这两个方法都不会等待任务执行完或任务结束。
awaitTermination
当前线程阻塞,直到
1. 等所有已提交的任务(包括正在跑的和队列中等待的)执行完
2. 或者等超时时间到
3. 或者线程被中断,抛出InterruptedException
然后返回true(shutdown请求后所有任务执行完毕)或false(已超时)
public boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException
long nanos = unit.toNanos(timeout);
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try
//自旋等待线程线程结束条件
for (;;)
if (runStateAtLeast(ctl.get(), TERMINATED))
return true;
if (nanos <= 0)
return false;
nanos = termination.awaitNanos(nanos);
finally
mainLock.unlock();
小结
shutdownNow()能立即停止线程池,正在跑的和正在等待的任务都停下了。这样做立即生效,但是风险也比较大;
shutdown()只是关闭了提交通道,用submit()是无效的;而内部该怎么跑还是怎么跑,跑完再停。
shutdown()后,不能再提交新的任务进去;但是awaitTermination()后,可以继续提交。
awaitTermination()是阻塞的,返回结果是线程池是否已停止(true/false);shutdown()不阻塞。
参考资料:
网络资料
备注:
转载请注明出处:https://blog.csdn.net/WSYW126/article/details/105206243
作者:WSYW126
以上是关于Java中的线程池——ThreadPoolExecutor源代码分析的主要内容,如果未能解决你的问题,请参考以下文章