线程池要点分析
Posted 小猪快跑22
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了线程池要点分析相关的知识,希望对你有一定的参考价值。
一、为什么要线程池?直接使用线程不行吗?
- 线程过多会带来额外的开销,包括线程的创建和销毁。线程的运行需要占用 CPU 的时间片,系统中处于运行状态的线程数量越多,那么每个线程单位时间内分配到的时间片就越少,线程调度带来的上下文切换就会越多,最终导致CPU真正用于计算的时间就会越少。
- 直接使用的线程是无法复用的,而线程池是能够实现线程复用的。
二、线程池的好处有哪些?
- 降低资源消耗:线程池能够实现线程复用,那么就可以降低线程的创建和销毁造成的损耗。
- 提高响应速度:当任务到达时,能够被等待任务的线程立即执行。
- 提高线程的可管理性:线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,甚至会降低系统的稳定性,比如 p_thread OOM等问题,使用线程池可以进行统一的分配和监控。
- 线程池的功能更加强大:比如延时定时线程池ScheduledThreadPoolExecutor,就允许任务延期执行或定期执行。
三、线程池的状态
- 线程池的状态是如何表示的?
ThreadPoolExecutor 实际上是通过将一个 32 位的 int 类型变量分割为两段,**高 3 位用来表示线程池的当前的状态,低 29 位表示线程池的当前线程数量。**就是用 ctl 这个变量来表示的,如下:
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// 用来表示线程数量的位数,即 29
private static final int COUNT_BITS = Integer.SIZE - 3;
// 线程池所能表达的最大线程数,即一个“高3位全是0,低29位全是1”的数值
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// 获取线程池状态:CAPACITY 取反后得到的就是高3位是1,低29位都是0,
// 然后和 c 做与运算,得到的就是 c 的高3位,即表示的是线程池的状态
private static int runStateOf(int c) return c & ~CAPACITY;
//获取工作线程数,原理和 runStateOf 方法产不多
private static int workerCountOf(int c) return c & CAPACITY;
// rs,即 runState,线程池的状态
// wc,即 workerCount,工作线程的数量
// 通过按位**或**运算来合并值
private static int ctlOf(int rs, int wc) return rs | wc;
线程池的几种状态:
// 运行状态,线程池正处于运行中
// -1左移29位等于 -536870912,-536870912的二进制表示就是 -536870912 的补码
// 原码: 0010 0000 0000 0000 0000 0000 0000 0000 按位取反得到反码
// 反码: 1101 1111 1111 1111 1111 1111 1111 1111 反码加1得到补码
// 补码: 1110 0000 0000 0000 0000 0000 0000 0000
// -536870912 的二进制转为十进制 就是补码减一 然后取反,即上面求二进制的逆过程
private static final int RUNNING = -1 << COUNT_BITS;
// 关闭状态,当调用 shutdown() 方法后处于这个状态,任务队列中的任务会继续处理,但不再接受新任务
private static final int SHUTDOWN = 0 << COUNT_BITS;
// 停止状态,当调用 shutdownNow() 方法后处于这个状态
// 不再接受新的任务,任务队列中的任务也不再处理,会中断正在处理任务的线程
private static final int STOP = 1 << COUNT_BITS;
// 所有的任务都已经终止了,且 workCount = 0 ,即有效线程都等于0
private static final int TIDYING = 2 << COUNT_BITS;
//终止状态,在处于 TIDYING 状态后会立即调用 terminated() 方法,调用完成就会马上转到此状态
private static final int TERMINATED = 3 << COUNT_BITS;
生命周期转换如下图:
线程池执行的流程图如下:
四、线程池中任务处理类 Worker 为啥要自己实现AQS,而不是使用 ReentrantLock ?
private final class Worker extends AbstractQueuedSynchronizer implements Runnable
先给出结论:因为 Worker
想要实现的是非重入锁,那么问题来了,为什么要非重入锁呢?目的是为了不中断正在执行任务的线程。看如下的分析:
- 线程的中断调用的是
interruptIdleWorkers
方法
private void interruptIdleWorkers(boolean onlyOne)
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try
for (Worker w : workers)
Thread t = w.thread
// 如果线程还没执行过中断,那么 调用 w.tryLock() 去获取锁
// 如果能获取到锁,那么就调用 interrupt() 方法去中断正在执行任务的线程
if (!t.isInterrupted() && w.tryLock())
try
t.interrupt();
catch (SecurityException ignore)
finally
w.unlock();
if (onlyOne)
break;
finally
mainLock.unlock();
// w.tryLock()方法如下:
public boolean tryLock()
// 调用 tryAcquire 方法
return tryAcquire(1);
/**
* 为什么 Worker 类要实现AQS,而不是直接用 ReentrantLock 即这里的this.mainLock呢?
* 因为 ReentrantLock 是可重入的,而调用 shutDown 会调用 tryLock 方法,而 tryLock 会调用此方法 ,
* 这个方法通过 CAS 把 state 的值从0设置为1 就是加锁的意思。0 表示的是 未上锁的状态, 1 表示已经加锁了。
* 什么时候加锁的呢? runTask 方法中 会调用 lock 方法加锁。
* 也就是说 state = 1的 worker 肯定是正在执行任务的线程,不可以被中断。
*
* 这里就是打破可重入锁,如果可重入的话,那么会中断正在执行的线程。
* 也可以看 setCorePoolSize 方法,方法中如果我们想动态减少 核心线程数量,
* 那么会走到 interruptIdleWorkers(); 就是中断空闲的线程。
*
* @param unused 这个变量就没用到
* @return
*/
protected boolean tryAcquire(int unused)
// 通过 CAS 将 state 从0设置为1,如果设置成功,表示成功获取了锁
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
return true;
return false;
state
值为0 表示未获取锁,1 表示成功获取了锁。
那是什么时候把这个 state
的值设置为 1 的呢?是在线程执行任务的时候设置的。
w.lock()
最终也是调用上面所说的 tryAcquire
来把 state
的值置为1。所以后续如调用 shutdown()
或者 tryTerminate()
等方法时,不会中断正在执行任务的线程。而如果是使用 ReentrantLock 的话,那么就会中断正在执行任务的线程,因为 ReentrantLock 是可重入的。
五、线程池中的线程执行任务过程中抛出异常,线程池是如何处理的?
先看看执行任务的代码如下:
final void runWorker(Worker w)
......
// 正常执行完成后,会重置为false,异常则不重置
boolean completedAbruptly = true;
try
while (task != null || (task = getTask()) != null)
......
try
beforeExecute(wt, task);
Throwable thrown = null;
try
task.run();
catch (RuntimeException x)
thrown = x;
throw x;
......
finally
task = null;
w.completedTasks++;
w.unlock();
completedAbruptly = false;
finally
// 处理线程退出
processWorkerExit(w, completedAbruptly);
private void processWorkerExit(Worker w, boolean completedAbruptly)
// 由于该线程执行任务发生异常,那么线程数减一
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
decrementWorkerCount(); // 注释【1】
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try
completedTaskCount += w.completedTasks;
workers.remove(w); // 移除此线程
finally
mainLock.unlock();
tryTerminate();
int c = ctl.get();
if (runStateLessThan(c, STOP))
if (!completedAbruptly)
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && !workQueue.isEmpty())
min = 1;
if (workerCountOf(c) >= min) // 注释【2】
return; // replacement not needed
// 这行代码什么时候会走到呢 ? 由于执线程执行任务是发生异常,
// 所以会走到注释【1】导致线程数减一,就有可能导致注释【2】的条件不成立
// 这时候就准备创建一个线程去处理任务队列中的任务,
// addWorker 中会判断这个线程是否能够创建,具体可看 addWorker 方法
addWorker(null, false);
也就是说,当线程执行任务时发生异常会有如下几步操作:
- 线程池中的线程数量减一
- 从hash表 workers 中移除引用,即此次执行任务出错的线程被回收
- 判断线程池中当前的线程数量,如果下雨
min
,就会调用addWorker(null, false)
添加一个线程。
以上是关于线程池要点分析的主要内容,如果未能解决你的问题,请参考以下文章