高并发多线程基础之ThreadPoolExecutor源代码分析
Posted 踩踩踩从踩
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了高并发多线程基础之ThreadPoolExecutor源代码分析相关的知识,希望对你有一定的参考价值。
前言
这篇主要讲述ThreadPoolExecutor的源码分析,贯穿整篇文章主要就 线程池的创建、参数分析、提交任务,通过源码及注释充分理解线程池的工作原理。
线程池
为什么需要需要线程池
首先线程并不是越多越好,过多的创建线程消耗大量的资源,反而达到适得其反的效果;如何正确创建线程并且去控制。
线程池的组成部分
线程池管理器:用于创建并管理线程池,包括创建线程池,销毁线程池,添加新任务;
工作线程:线程池中的线程、可以循环的执行任务、在没有任务时处于等待状态
任务接口:每个任务必须实现的接口,以供工作线程调度任务的执行,它主要规定了任务的入口,任务收尾工作,任务执行的状态等
任务队列:用于存放没有处理的任务。提供一种缓存机制
下面是我写的另一篇文章包括为什么使用线程池,有哪些优点,以及几种Executors中提供给我们的工厂创建方法等
ThreadPoolExecutor源码解析
源码注释
因为太长了因此我把中文的意思整理了一下,大概介绍的是几个方面
- 为什么使用线程池
线程池解决两个不同的问题:它们通常在执行大量任务时提供改进的性能异步任务,由于减少了每个任务的调用开销,它们提供了一种界定和管理资源的手段,包括线程,在执行任务集合时使用。每个{@code ThreadPoolExecutor}还维护一些基本的统计信息,例如已完成任务的数量。
- 提供给我们使用的Executors的工厂方法
更方便的{@link Executors}工厂方法{@link执行器#newCachedThreadPool}(无限线程池,具有自动线程回收),{@link Executors#newFixedThreadPool}(固定大小的线程池)和{@linkExecutors#newSingleThreadExecutor}(单后台线程),即最常见用法的预配置设置情景。
- 核心和最大池大小
ThreadPoolExecutor自动调整大小,依据我们设置的corePoolSize(核心线程)和maximumPoolSize(最大线程大小);在方法{@link#execute(Runnable)}中提交新任务时,如果运行的线程少于corePoolSize,则会创建一个新线程创建以处理请求,即使其他工作线程闲散的。如果超过corePoolSize但小于maximumPoolSize线程运行时,将仅创建一个新线程,如果队列已满。通过设置corePoolSize和maximumPoolSize。并且这两个参数可以动态改变的
- 按需进行线程操作
默认情况下,即使是核心线程也会在最初创建并仅在新任务到达时启动,但这可以被覆盖,动态使用方法prestartCoreThread 或者 prestartAllCoreThreads 。如果需要,您可能需要预启动线程,使用非空队列构造池
- 创建新线程
使用{@link ThreadFactory}创建新线程,如果未指定{@link Executors#defaultThreadFactory}则用于创建所有线程都位于同一ThreadGroup 和具有相同{@code NORM_PRIORITY}优先级和非守护进程状态。通过提供不同的ThreadFactory,您可以更改线程的名称、线程组、优先级、守护进程状态等。如果{@code ThreadFactory}在请求时未能创建线程,执行器将继续,但可能无法执行任何任务。
- 保持活跃时间
如果池当前有多个corePoolSize线程,如果多余的线程空闲了更长的时间,那么它们将被终止;这提供了一种在以下情况下减少资源消耗的方法:池未被积极使用。如果池变得更活跃,稍后,将构造新线程。在关闭之前终止。默认情况下,“保持活动”策略仅当存在多个corePoolSize线程时适用。仅当存在多个corePoolSize线程时适用。但是方法{@link#allowCoreThreadTimeOut(布尔值)}可用于只要keepAliveTime值不为零
在后面的源代码中就可以看出 默认核心线程是不会超时,只有空闲的线程达到keepAliveTime 才会被杀掉,但是源代码中提供了allowCoreThreadTimeOut 方法只要keepAliveTime 不为0,并且设置allowCoreThreadTimeOut 则可以超时杀死线程
- 任务队列
任何{@link BlockingQueue}都可用于传输和保持提交的任务。此队列的使用与池大小交互:
如果运行的线程少于corePoolSize,则执行器始终更喜欢添加新线程而不是排队
如果corePoolSize或多个线程正在运行,则执行器总是更喜欢将请求排队,而不是添加新请求线程
如果请求无法排队,则会创建一个新线程,除非这将超过maximumPoolSize,在这种情况下,任务将拒绝
排队的一般策略有三种:
直接切换。对于工作来说是一个不错的默认选择队列是一个{@link SynchronousQueue},它将任务交给线程而不是拿着它们。这里是对任务进行排队的尝试 如果没有线程可立即运行,则将失败,因此将构造新线程。此政策可避免在以下情况下锁定:处理可能具有内部依赖关系的请求集。直接切换通常需要无限的MaximumPoolSize来实现避免拒绝新提交的任务。这反过来又承认了
无界队列。LinkedBlockingQueue 不超过corePoolSize 线程将永远不会被创建。(以及maximumPoolSize的值因此没有任何影响。)这可能是适当的每个任务都完全独立于其他任务,因此任务不能相互影响执行;例如,在网页服务器中。而这种排队方式有助于消除短暂的请求爆发,它承认命令继续到达时的无限工作队列增长平均速度比处理速度快
有界队列。有界队列,例如ArrayBlockingQueue 有助于防止资源耗尽 与有限的MaximumPoolSize一起使用,但可能更难,调整和控制。队列大小和最大池大小可以交易,互相关闭:使用大型队列和小型池可以最大限度地减少CPU使用率、操作系统资源和上下文切换开销
- 拒绝策略
方法{@link#execute(Runnable)}中提交的新任务将被删除,当执行者被关闭时,以及执行器对最大线程和工作队列使用有限边界容量,并且已饱和。在这两种情况下,{@code execute}方法,则会出现拒绝策略。RejectedExecutionHandler rejectedExecution 提供了四个预定义的处理程序包括:
在默认的{@link ThreadPoolExecutor.AbortPolicy}中处理程序在拒绝
在{@link ThreadPoolExecutor.callerRunPolicy}中,线程 调用{@code execute}本身的运行任务。这提供了一个简单的反馈控制机制,将降低提交新任务
在{@link ThreadPoolExecutor.DiscardPolicy}中,一个任务无法执行的命令被简单地删除
在{@link ThreadPoolExecutor.DiscardOldestPolicy}中,如果执行器未关闭,任务位于工作队列的最前面删除,然后重试执行可能再次失败,导致重复此操作。
可以定义和使用其他类型的RejectedExecutionHandler 类特别是当政策设计为仅在特定条件下有效时
- 钩子方法
此类提供了{@code protected}可重写的 {@link#beforeExecute(线程,可运行)}和 {@link#afterExecute(Runnable,Throwable)}调用的方法,在执行每个任务之前和之后。这些可以用来操纵执行环境;例如,重新初始化线程局部变量、收集统计信息或添加日志项。此外,可以重写方法{@link#terminated}以执行人完成后需要进行的任何特殊处理完全终止。
- 队列维护
方法{@link#getQueue()}允许访问工作队列,用于监视和调试。将此方法用于强烈反对任何其他目的。提供了两种方法,{@link#remove(Runnable)}和{@link#purge}可用于当有大量排队的任务时,协助进行存储回收取消
- 终止线程池
程序中不再引用的池没有剩余的线程将自动{@code shutdown}。如果您希望确保回收未引用的池,可以调用shutdown
- 最后包括Doug Lea 写的一个扩展实例
重写一个或多个受保护的挂钩方法,例如下面是一个子类,它添加了一个简单的暂停/恢复功能:
class PausableThreadPoolExecutor extends ThreadPoolExecutor {
private boolean isPaused;
private ReentrantLock pauseLock = new ReentrantLock();
private Condition unpaused = pauseLock.newCondition();
public PausableThreadPoolExecutor(...) { super(...); }
protected void beforeExecute(Thread t, Runnable r) {
super.beforeExecute(t, r);
pauseLock.lock();
try {
while (isPaused) unpaused.await();
} catch (InterruptedException ie) {
t.interrupt();
} finally {
pauseLock.unlock();
}
}
public void pause() {
pauseLock.lock();
try {
isPaused = true;
} finally {
pauseLock.unlock();
}
}
public void resume() {
pauseLock.lock();
try {
isPaused = false;
unpaused.signalAll();
} finally {
pauseLock.unlock();
}
}
}}
成员属性
主要是主池控制状态 中各个状态, ctl 是个 Integer 的原子变量用来记录线程池状态 和 线程池中线程个数 ,
//默认是RUNNING状态,线程个数为0
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
//线程个数
private static final int COUNT_BITS = Integer.SIZE - 3;
//线程最大个数(低29位)00011111111111111111111111111111
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
//运行状态存储在高阶位中
//(高3位):11100000000000000000000000000000
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
这里描述的是两个概念领域:
- workerCount,指示有效线程数
workerCount是已注册的任务数,
- 运行状态提供主要的生命周期控制,具有以下值
正在运行(RUNNING):接受新任务和处理排队的任务
关机(SHUTDOWN):不接受新任务,但处理排队的任务
停止(STOP):不接受新任务,不处理排队的任务,并中断正在进行的任务
整理(TIDYING):所有任务都已终止,workerCount为零,正在转换为状态整理的线程将运行终止的钩子方法
终止(TERMINATED):terminated方法调用完成以后的状态
- 线程池状态之间进行转换
RUNNING ->shutdown:显式调用 shutdown() 方法,或者隐式调用了 finalize(),它里面调用了 shutdown() 方法。
RUNNING or SHUTDOWN -> STOP:显式调用 shutdownNow() 方法时候。
SHUTDOWN -> TIDYING:当线程池和任务队列都为空的时候。
STOP -> TIDYING:当线程池为空的时候。
TIDYING -> TERMINATED:当 terminated() hook 方法执行完成时候。
任务队列属性,用作存放注册的任务
private final BlockingQueue<Runnable> workQueue;
在添加任务时,可以添加callable,这是在AbstractExecutorService 抽象方法中做了统一的封装,然后统一调用的是execute方法
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}
然后包括主线程锁,存储工作线程的集合
//这里用来锁定工作线程和相关记录的安全性
private final ReentrantLock mainLock = new ReentrantLock();
private final HashSet<Worker> workers = new HashSet<Worker>();
private final Condition termination = mainLock.newCondition();
后面的包括拒绝handler ,以及活跃时间,是否允许核心线程超时杀死、核心线程数、最大线程数这些属性在核心关键方法中充分使用到。
private volatile RejectedExecutionHandler handler;
private volatile ThreadFactory threadFactory;
private volatile long keepAliveTime;
private volatile boolean allowCoreThreadTimeOut;
private volatile int corePoolSize;
private volatile int maximumPoolSize;
属性这里设置了默认拒绝策略, 默认的拒绝策略就是执行时抛出异常exception包括下面线程池为我们自定义好了集中拒绝策略可供我们使用的
private static final RejectedExecutionHandler defaultHandler =
new AbortPolicy();
public static class AbortPolicy implements RejectedExecutionHandler {
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
}
public static class DiscardPolicy implements RejectedExecutionHandler {
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
}
}
public static class DiscardOldestPolicy implements RejectedExecutionHandler {
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
e.getQueue().poll();
e.execute(r);
}
}
}
构造方法
默认构造方法,需要传递几个重要的参数
- 第一个参数为核心线程数,表示默认会创建核心线程去处理任务
- 第二个参数为最大线程数,表示当任务队列满了过后,会继续创建线程数为
- 第三和第四个参数一起用的,超时时间,当大于核心线程空置超过5秒则销毁线程
- 第五个参数,则是创建一个任务队列。
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
默认的拒绝策略和创建线程的工厂,可以使用的两种线程工厂
DefaultThreadFactory就是创建一个普通的线程,非守护线程,优先级为5。
PrivilegedThreadFactory 创建的线程工厂,增加了两个特性:ClassLoader和AccessControlContext,从而使运行在此类线程中的任务具有与当前线程相同的访问控制和类加载器。
/**
*线程工厂捕获访问控制上下文和类加载器
*/
static class PrivilegedThreadFactory extends DefaultThreadFactory {
private final AccessControlContext acc;
private final ClassLoader ccl;
PrivilegedThreadFactory() {
super();
SecurityManager sm = System.getSecurityManager();
if (sm != null) {
// Calls to getContextClassLoader from this class
// never trigger a security check, but we check
// whether our callers have this permission anyways.
sm.checkPermission(SecurityConstants.GET_CLASSLOADER_PERMISSION);
// Fail fast
sm.checkPermission(new RuntimePermission("setContextClassLoader"));
}
this.acc = AccessController.getContext();
this.ccl = Thread.currentThread().getContextClassLoader();
}
public Thread newThread(final Runnable r) {
return super.newThread(new Runnable() {
public void run() {
AccessController.doPrivileged(new PrivilegedAction<Void>() {
public Void run() {
Thread.currentThread().setContextClassLoader(ccl);
r.run();
return null;
}
}, acc);
}
});
}
}
execute方法
执行添加任务方法
用于提交任务到线程池中
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
*分三步进行:
*
* 1. 如果正在运行的线程少于corePoolSize,请尝试
*以给定命令作为第一个线程启动新线程
*任务。对addWorker的调用以原子方式检查运行状态和
*workerCount,从而防止可能增加
*在不应该的情况下,通过返回false执行线程。
*
* 2. 如果任务可以成功排队,那么我们仍然需要
*再次检查是否应该添加线程
*(因为自上次检查以来,已有的已死亡)或
*自进入此方法后,池已关闭。所以我们
*重新检查状态,如有必要,在以下情况下回滚排队
*已停止,如果没有线程,则启动新线程。
*
* 3. 如果我们无法将任务排队,那么我们将尝试添加一个新任务
*线。如果失败了,我们知道我们已经被关闭或饱和了
*所以拒绝这个任务。
*/
//第一步
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
//第二步
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
//第三步
else if (!addWorker(command, false))
reject(command);
}
这里分为下面三部分
- 如果正在运行的线程少于corePoolSize,请尝试以给定命令作为第一个线程启动新线程任务。对addWorker的调用以原子方式检查运行状态和workerCount,从而防止可能增加在不应该的情况下,通过返回false执行线程。
- 如果任务可以成功排队,那么我们仍然需要再次检查是否应该添加线程(因为自上次检查以来,已有的已死亡)或自进入此方法后,池已关闭。所以我们重新检查状态,如有必要,在以下情况下回滚排队已停止,如果没有线程,则启动新线程。
addWorker方法
检查是否可以添加与当前工作线程相关的新工作线程池状态和给定的界限(核心或最大值)。如果是,相应地调整工人数量,如有可能,调整创建并启动新的辅助进程,运行firstTask作为其第一项任务。如果池停止或停止,此方法返回false有资格关闭。如果线程工厂在被询问时无法创建线程。如果线程由于线程工厂返回,创建失败null,或由于异常(通常是中的OutOfMemoryError)而导致Thread.start()),我们可以干净地回滚。
private boolean addWorker(Runnable firstTask, boolean core) {
//第一部分
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// 仅在必要时检查队列是否为空。
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// 其他CAS由于workerCount更改而失败;重试内部循环
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
//第二部分
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
主要分两部分
- 第一部分通过cas操作 增加线程池线程数,利用双重循环防止出现异常的情况
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// 仅在必要时检查队列是否为空。
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// 其他CAS由于workerCount更改而失败;重试内部循环
}
}
会返回false的情况,当前线程池状态为 STOP,TIDYING,TERMINATED;
当前线程池状态为 SHUTDOWN 并且已经有了第一个任务则会直接返回false;
- 第二部分发安全的把任务添加到 workers 里面,启动任务执行。
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//在保持锁定的同时重新检查。
//在ThreadFactory出现故障或
//在获得锁之前关闭。
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
- 最后在做了一个当工作线程启动失败时,会将任务删除掉,并尝试看是否终止掉线程池
private void addWorkerFailed(Worker w) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (w != null)
workers.remove(w);
decrementWorkerCount();
tryTerminate();
} finally {
mainLock.unlock();
}
}
工作任务Worker
工作任务主要维护中断控制状态线程运行任务,以及其他次要簿记。此类有机会扩展AbstractQueuedSynchronizer为了简化获取和释放每个锁周围的锁任务执行。这可以防止发生以下情况的中断:用于唤醒等待来自的任务的工作线程而是中断正在运行的任务。我们实现了一个简单的非重入互斥锁而非使用ReentrantLock,因为我们不希望辅助任务能够当调用池控制方法时重新获取锁,如setCorePoolSize。此外,为了抑制中断,直到线程实际上开始运行任务,我们初始化锁将状态设置为负值,并在启动时将其清除(在运行工人)。
run方法
将主运行循环委托给外部runWorker
public void run() {
runWorker(this);
}
runworker方法中做的循环触发
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
//如果池正在停止,请确保线程被中断;
//如果没有,请确保线程没有中断。这
//在第二种情况下需要重新检查才能处理
//清除中断时立即关闭循环
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
主工作运行循环。重复地从队列和执行它们,同时处理一些问题:
- 我们可以从最初的任务开始,在这种情况下不需要第一个。否则,只要池是运行时,我们从getTask获取任务。如果返回null,则工作进程由于池状态或配置更改而退出参数。其他退出由异常抛出导致外部代码,在这种情况下completedjustly有效通常导致processWorkerExit替换此线程。
- 在运行任何任务之前,将获取锁以防止其他池在任务执行时中断,然后确保除非池停止,否则此线程没有它的中断设置。
- 每个任务运行之前都会调用beforeExecute,这可能会引发异常,在这种情况下,我们会导致线程死亡(使用completedjustly true中断循环)未经处理这项任务。
- 假设在PreExecute正常完成之前,我们运行任务,正在收集任何抛出的异常以发送到afterExecute。我们分别处理RuntimeException和Error(两者都是规格保证了我们的陷阱)和任意丢弃。因为我们无法在Runnable.run中重新播放丢弃的内容,所以在退出时将它们包装在错误中(到线程的UncaughtExceptionHandler)。也会引发任何异常保守地导致线程死亡。
- task.run完成后,我们调用afterExecute,它可能还将引发异常,这也将导致线程死。根据JLS第14.20节,此例外情况是即使task.run抛出,也将生效。
- 异常机制的净效果是afterExecute线程的UncaughtExceptionHandler具有同样的准确性
我们可以提供的关于客户遇到的任何问题的信息用户代码。
interruptIdleWorkers方法
该方法用于中断可能正在等待任务的线程,并根据传入的参数 onlyone判断是否只中止一个参数
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers) {
Thread t = w.thread;
if (!t.isInterrupted() && w.tryLock()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
if (onlyOne)
break;
}
} finally {
mainLock.unlock();
}
}
shutdown()方法和shutdownNow()方法
showdown方法 这里会将当前所有的线程状态更改为shutdown状态,线程池会在执行完正在执行的任务和队列中等待的任务后才彻底关闭。正在执行的任务会继续执行下去,没有被执行的则中断。
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(SHUTDOWN);
interruptIdleWorkers();
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
tryTerminate();
}
调用的 interruptIdleWorkers方法作用则是终止可能正在等待任务的线程
shutdownNow方法会将线程状态更改为STOP状态,正在执行的任务则被停止,没被执行任务的则返回。
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(STOP);
interruptWorkers();
tasks = drainQueue();
} finally {
mainLock.unlock();
}
tryTerminate();
return tasks;
}
调用的interruptWorkers的作用则是 将所有的正在运行的任务终止掉,
private void interruptWorkers() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers)
w.interruptIfStarted();
} finally {
mainLock.unlock();
}
}
并将当前的任务队列删除并返回
tasks = drainQueue();
线程池日志打印线程名称
这个名称默认是在线程创建工厂中自动给生成的,如果想要他打印的名称换一种,可以自定义个线程工厂,例如下面的,然后在用构造参数放进去,这就能达到自定义的效果了
static class MyThreadFactory implements ThreadFactory {
private static final AtomicInteger poolNumber = new AtomicInteger(1);
private final ThreadGroup group;
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;
DefaultThreadFactory() {
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() :
Thread.currentThread().getThreadGroup();
namePrefix = "my-pool-" +
poolNumber.getAndIncrement() +
"-thread-";
}
public Thread newThread(Runnable r) {
Thread t = new Thread(group, r,
namePrefix + threadNumber.getAndIncrement(),
0);
if (t.isDaemon())
t.setDaemon(false);
if (t.getPriority() != Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY);
return t;
}
}
总结
最后整个线程池,ThreadPoolExecutor 主要就分为几个模块,包括工作线程任务、任务队列、拒绝策略模块、线程创建工厂模块,整个线程池的生命周期贯穿整个线程池的运行。
以上是关于高并发多线程基础之ThreadPoolExecutor源代码分析的主要内容,如果未能解决你的问题,请参考以下文章