深入理解ThreadPoolExecutor第二弹
Posted 风在哪
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了深入理解ThreadPoolExecutor第二弹相关的知识,希望对你有一定的参考价值。
从源头解析ThreadPoolExecutor第二弹—ThreadPoolExecutor的内部类
ThreadPoolExecutor主要包括如下内部类:
其中AbortPolicy、CallerRunsPolicy、DiscardOldestPolicy、DiscardPolicy表示任务的拒绝策略,当线程池的线程数量达到最大值并且阻塞队列已满时,根据这些不同的策略对新提交的任务进行不同的处理。它们都实现了RejectedExecutionHandler接口。
而Worker代表我们执行的任务,我们提交的任务会被包装成Worker对象,然后再执行我们的任务或者加入阻塞队列中。
首先来看看RejectedExecutionHandler接口。
RejectedExecutionHandler接口
定义了用于处理不能被ThreadPoolExecutor执行的任务的方法。
该接口只包含了一个方法:rejectedExecution主要用于拒绝执行提交的任务。
public interface RejectedExecutionHandler {
/*
当ThreadPoolExecutor不能执行新任务的时候该方法将会被调用。
当没有更多的线程或者阻塞队列已满时会发生这样的情况
在没有其他替代方法的情况下,该方法可能会抛出未经检查的RejectedExecutionException,
该异常将被传播到execute的调用方
*/
void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}
接下来看看它的四个实现。
CallerRunsPolicy类
拒绝任务策略的实现之一:直接在调用execute方法的线程中运行被拒绝的任务,除非执行器已关闭,在这种情况下,该任务被丢弃
public static class CallerRunsPolicy implements RejectedExecutionHandler {
public CallerRunsPolicy() { }
/**
* 在调用线程中执行任务,除非执行器已经被关闭,此时这个任务将被丢弃
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
// 如果线程池没有关闭,直接在该线程中运行此方法
if (!e.isShutdown()) {
r.run();
}
}
}
AbortPolicy类
当线程和阻塞队列均到达最大容量时,直接拒绝执行任务,并抛出RejectedExecutionException异常
public static class AbortPolicy implements RejectedExecutionHandler {
public AbortPolicy() { }
/*
总是直接抛出RejectedExecutionException异常
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
}
DiscardPolicy
不做任何事情,相当于直接丢弃任务。
public static class DiscardPolicy implements RejectedExecutionHandler {
public DiscardPolicy() { }
/**
* 不做任何事情,这也具有丢弃任务r的效果
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
}
}
DiscardOldestPolicy
从名字就可以看出是丢弃最老的任务,并运行最新提交的任务。
public static class DiscardOldestPolicy implements RejectedExecutionHandler {
public DiscardOldestPolicy() { }
/*
获取并且忽略线程池下一个要执行的任务
如果此时线程池可用,就调用execute方法执行任务r
除非线程池关闭,在这种情况下,任务r将被丢弃
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
e.getQueue().poll();
e.execute(r);
}
}
}
总结
ThreadPoolExecutor的四个内部类通过实现RejectedExecutionHandler接口,实现了不同的拒绝策略,帮助我们在线程池满时拒绝执行任务。
通过这个接口,将具体拒绝的策略抽象出来,做到了解耦。所以在ThreadPoolExecutor构造函数中使用RejectedExecutionHandler作为参数,让程序员自己选择具体的拒绝策略。
Worker
Worker其实是ThreadPoolExecutor线程池中保存的工作线程,线程池主要是使用Worker来执行任务。
Worker类继承自AQS,并且实现了Runnable接口。它主要维持运行任务线程的中断控制状态,以及其他次要的信息。
它继承自AQS主要用于每次执行任务时简化锁的获取和释放。这可以防止中断用于唤醒等待任务的工作线程,而不是中断正在运行的任务。
我们实现了简单的不可重入互斥锁而不是使用ReentrantLock,因为作者不想这个Worker task在setCorePoolSize这种线程池控制方法调用时能重新获取到锁。
此外,为了压制中断直到开始执行任务,我们将锁的状态初始化为一个负值,并且直到它开始运行才清除这个值。
AQS作用
这里为什么实现了AQS呢,主要有以下两点:
- 将锁的粒度细化到每个Worker
- 如果多个worker使用同一个锁的话,当一个worker running持有锁的时候,其他worker就无法执行
- 直接使用cas获取,避免阻塞
- 如果这个锁使用阻塞获取,那么在多worker的情况下执行shutdown,如果某个worker此时处于running状态,无法获取锁,那么shutdown线程就阻塞住了,显然是不合理的。
注意 Worker 实现锁的目的与传统锁的意义不太一样。其主要是为了控制线程是否可interrupt(这点可以在runWorker方法中看出来,后续会讲到),以及其他的监控,如线程是否 active(正在执行任务)。
实现Runnable接口
这里实现Runnable接口是为了复用线程,减少创建线程带来的性能损耗。
因为Worker中有Thread属性,这个属性可以用于运行runnable任务,当一个任务运行完成以后,再提交
Worker属性
首先来看看它包含哪些字段:
/** 当前worker的执行线程,如果ThreadFactory执行失败,那么其为null */
// thread的作用就是执行worker中的run方法,也就是线程池中的真正执行任务的线程。
final Thread thread;
/** 当前工作线程要执行的任务,可能为null*/
Runnable firstTask;
/** 线程的任务计数器,记录线程已经完成多少任务 */
volatile long completedTasks;
构造函数
设置AQS的state值为-1,表示禁止中断任务直到运行为止,也就是在创建worker期间不响应中断
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
// 调用ThreadFactory产生新的线程
this.thread = getThreadFactory().newThread(this);
}
AQS方法实现
这里实现的是tryAcquire和tryRelease,所以这里是线程独占的锁。
// 判断线程是否正在独占资源
protected boolean isHeldExclusively() {
return getState() != 0;
}
// 尝试获取线程对应的锁
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
// 尝试释放获取的锁
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
锁相关方法
调用AQS的方法进行加锁与解锁。
public void lock() { acquire(1); }
public boolean tryLock() { return tryAcquire(1); }
public void unlock() { release(1); }
public boolean isLocked() { return isHeldExclusively(); }
其他
如果获取到了线程对应的锁,并且线程不为空,线程没有被中断,那么就可以中断正在运行的线程。
void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
委托给外部的runWorker方法运行任务。
public void run() {
runWorker(this);
}
总结
本篇文章介绍了ThreadPoolExecutor的几个内部类,包括定义拒绝策略的内部类以及包装Runnable的Worker类。
通过了解ThreadPoolExecutor内部的拒绝策略,我们在使用线程池的过程中可以选择与业务相关的合适的拒绝策略,或者我们也可以模仿ThreadPoolExecutor的拒绝策略来自定义适合我们自己业务的拒绝策略。
通过解析Worker类,我们可以更清楚的了解ThreadPoolExecutor是如何工作的,它为什么继承自AQS,继承AQS的作用等,只有了解了这些知识,才能更深入的了解线程池,并且根据业务需要定义自己的线程池!
以上是关于深入理解ThreadPoolExecutor第二弹的主要内容,如果未能解决你的问题,请参考以下文章