Java 并发编程线程池机制 ( 线程池执行任务细节分析 | 线程池执行 execute 源码分析 | 先创建核心线程 | 再放入阻塞队列 | 最后创建非核心线程 )
Posted 韩曙亮
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Java 并发编程线程池机制 ( 线程池执行任务细节分析 | 线程池执行 execute 源码分析 | 先创建核心线程 | 再放入阻塞队列 | 最后创建非核心线程 )相关的知识,希望对你有一定的参考价值。
一、线程池执行任务细节分析
线程池执行细节分析 :
核心线程数 10 10 10 , 最大小成熟 20 20 20 , 非核心线程数 10 10 10 , 非核心线程空闲存活时间 60 60 60 秒 , 阻塞队列大小 10 10 10 个 ;
当有 Runnable
任务进入线程池后 ;
先查看 " 核心线程 " , 如果没有核心线程 , 先 创建核心线程 ;
如果有核心线程 , 则 查看核心线程是否有空闲的 ;
如果有空闲的核心线程 , 直接将该任务分配给该空闲核心线程 ;
如果没有空闲核心线程 , 则 查看核心线程数有没有满 ;
如果核心线程没有满 , 则 创建一个核心线程 , 然后执行该任务 ;
如果核心线程满了 , 将该任务放入 " 阻塞队列 " 中 , 查看阻塞队列是否已满 ;
如果阻塞队列没有满 , 直接 将任务放入阻塞队列中 ;
如果阻塞队列满了 , 则 查看是否能创建 " 非核心线程 " ;
如果能创建非核心线程 , 则 创建非核心线程 , 并执行该任务 ;
如果不能创建非核心线程 , 则 执行 " 拒绝策略 " ;
二、线程池执行 execute 源码分析
查看传入的 Runnable
任务是否为空 , 如果为空 , 就报异常 ;
if (command == null)
throw new NullPointerException();
获取当前线程池的状态 , 根据不同的状态 , 执行不同的操作 ;
/*
* 进行以下三个步骤处理:
*
* 1. 如果当前运行的线程 , 小于核心线程数 , 那么创建一个新的核心线程 ,
* 将传入的任务作为该线程的第一个任务 .
* 调用 addWorker 方法 , 会原子性检查运行状态和任务数量 ;
* 如果在不应该添加线程的情况下执行添加线程操作 , 就会发出错误警报 ;
* 如果该方法返回 false , 说明当前不能添加线程 , 此时就不要执行添加线程的操作了 ;
*
* 2. 如果任务被成功放入 线程池任务 队列 , 不管我们此时是否应该添加线程 , 都需要进行双重验证 ;
* 双重验证 : 添加到任务队列时验证一次 , 添加到线程执行时验证一次 ;
* 可能存在这种情况 , 在上次验证线程运行状态之后 , 有可能该线程就立刻被销毁了 ;
* 也可能存在进入该方法后 , 线程池被销毁的情况 ;
* 因此我们反复验证线程状态 , 如果需要在线程停止时回滚队列 , 如果没有线程就创建新线程 ;
*
* 3. 如果不能将任务放入队列中 , 尝试创建一个新线程 ;
* 如果创建线程失败 , 说明当前线程池关闭 , 或者线程池中线程饱和 , 此时拒绝执行该任务 ;
*/
int c = ctl.get();
上述 AtomicInteger ctl
线程池状态是很关键的原子变量 , 该原子变量中同时包含了线程池的线程数量 , 该值是一个组合的数值 ; 该 int 值
4
4
4 字节
32
32
32 位 , 前
3
3
3 位是线程池的状态位 , 剩下的
29
29
29 位是线程数 ;
/**
* 主池控制状态ctl是一个原子整数
* 两个概念领域
* workerCount,指示有效线程数
* 运行状态,指示是否运行、关闭等
*
* 为了将它们打包成一个整数,我们将workerCount限制为
* (2^29)-1(约5亿)个线程,而不是(2^31)-1(2
* 10亿)否则可代表。如果这曾经是一个问题
* 将来,变量可以更改为原子长度,
* 下面的移位/遮罩常数已调整。但在需要之前
* 因此,此代码使用int更快更简单。
*
* workerCount是已注册的工人数
* 允许启动,不允许停止。该值可能是
* 与活动线程的实际数量暂时不同,
* 例如,ThreadFactory在以下情况下无法创建线程:
* 当退出线程仍在执行时
* 终止前的簿记。用户可见池大小为
* 报告为工作集的当前大小。
*
* 运行状态提供主要的生命周期控制,具有以下值:
*
* 正在运行:接受新任务和处理排队的任务
* 关机:不接受新任务,但处理排队的任务
* 停止:不接受新任务,不处理排队的任务,
* 并中断正在进行的任务
* 整理:所有任务都已终止,workerCount为零,
* 正在转换为状态整理的线程
* 将运行终止的()钩子方法
* 终止:终止()已完成
*
* 这些值之间的数字顺序很重要,以允许
* 有序比较。运行状态随时间单调增加
* 时间,但不需要击中每个状态。这些转变是:
*
* 运行->关机
* 在调用shutdown()时,可能隐式地在finalize()中
* (运行或关闭)->停止
* 在调用shutdownNow()时
* 关机->整理
* 当队列和池都为空时
* 停止->整理
* 当池为空时
* 清理->终止
* 当终止的()钩子方法完成时
*
* 等待终止()的线程将在
* 国家终止。
*
* 检测从关闭到清理的过渡较少
* 比您希望的简单,因为队列可能会
* 非空后为空,关机状态下为空,但
* 只有在看到它是空的之后,我们才能终止
* workerCount为0(有时需要重新检查——请参阅
* 下)。
*/
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
// Packing and unpacking ctl
private static int runStateOf(int c) { return c & ~CAPACITY; }
private static int workerCountOf(int c) { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }
简单的机翻了下 , 如果查看详细的英文注释 , 查看 libcore/ojluni/src/main/java/java/util/concurrent/ThreadPoolExecutor.java 源码 ;
线程池的状态如下 , 有 5 5 5 种状态 ;
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
判断当前的工作线程数 workerCountOf(c)
是否小于核心线程数 corePoolSize
;
如果小于 , 则添加核心线程 addWorker(command, true)
;
这里注意 , 来了新任务后 , 不是先将任务放入阻塞队列 , 而是检查核心线程 , 先尝试将核心线程部署满 ;
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
判断当前的线程池状态 isRunning(c)
是否正在执行处于 RUNNING
状态 , 如果当前线程池处于 RUNNING
状态 , 说明所有的核心线程都满了 , 则将任务队列放入阻塞队列中 workQueue.offer(command)
;
如果可以入队 , 重新检查状态 , 如果必要 回滚排队 ! isRunning(recheck) && remove(command)
, 重新检查状态通过后 , addWorker(null, false)
将任务添加如阻塞队列中 ;
入队失败 , 尝试添加非核心线程 !addWorker(command, false)
, 如果非核心线程也失败 , 则执行拒绝策略 reject(command)
;
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
以上是关于Java 并发编程线程池机制 ( 线程池执行任务细节分析 | 线程池执行 execute 源码分析 | 先创建核心线程 | 再放入阻塞队列 | 最后创建非核心线程 )的主要内容,如果未能解决你的问题,请参考以下文章
Java 并发编程线程池机制 ( ThreadPoolExecutor 线程池构造参数分析 | 核心线程数 | 最大线程数 | 非核心线程存活时间 | 任务阻塞队列 )