线程池的设计思想
Posted 学习路上的孤独者
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了线程池的设计思想相关的知识,希望对你有一定的参考价值。
项目中在很多场景中都会使用到JAVA的多线程,使用到多线程则势必会用到线程池;接下来我们一起围绕以下几点疑问,一起探讨下线程池的设计思想:
线程池的需求是什么,为什么会有线程池?
线程池中线程怎么实现回收和再利用?
程序中怎么判断线程池是空闲还是繁忙?
多线程的特点
深入线程池之前,先看下多线程的特点:
异步
异步与同步的最大区别是:异步非阻塞
并行
多个线程可以并行执行,提高服务的吞吐量
为什么会有线程池?
设想下我们使用多线程处理业务,每个业务请求过来,则new一个线程,依次下去,每来个请求则创建个线程,会出现什么问题?
线程的创建未做限制,容易超出系统负载,节点机器直接挂掉
线程之间的上下文切换,会带来额外的性能开销
因此,在使用多线程时,需要避免线程的频繁创建、销毁;可以提前创建好N个线程放入池中,存放线程的池化技术则为线程池。
JAVA提供的线程池
JAVA的 java.util.concurrent 包中以提供以下几种线程池:
// 创建一个定时线程池,支持定时及周期性任务执行
Executors.newScheduledThreadPool();
// 创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,
// 若无可回收,则新建线程
Executors.newCachedThreadPool();
// 创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,
// 保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行
Executors.newSingleThreadExecutor()
// 创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待
Executors.newFixedThreadPool()
// JDK1.8 版本加入 创建一个具有抢占式操作的线程池
Executors.newWorkStealingPool();
线程池构造探究
JDK 1.8引入的 newWorkStealingPool 除外,其他几种线程池,内部构造器最终都会 new ThreadPoolExecutor() ,ThreadPoolExecutor是线程池的真正实现;ThreadPoolExecutor构造器设置了线程池关键的参数信息,如下:
public ThreadPoolExecutor(int corePoolSize, //核心线程数
int maximumPoolSize, //最大线程数
long keepAliveTime, //线程存活时间
TimeUnit unit, // 线程存活的时间单位
BlockingQueue<Runnable> workQueue, //阻塞队列
ThreadFactory threadFactory,//创建线程的线程工厂
RejectedExecutionHandler handler //拒绝策略) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
线程池构造的几个关键参数:
-
corePoolSize:核心线程数
-
maximumPoolSize:最大线程数
-
keepAliveTime:临时扩充的线程的存活时间
-
unit:临时扩充的线程的存活时间的单位
-
workQueue:阻塞队列
-
threadFactory:创建线程的线程工厂
-
handler :拒绝策略
如:corePoolSize传5,maximumPoolSize传10,则表示当任务处于繁忙状态时,线程最多会创建10个,当任务处于空闲状态时,非核心线程会被回收,最终常驻在线程池中线程是5个。
核心线程与最大线程如同软件项目人员,比如:
公司有5个JAVA开发人员,而新的项目正好需要5个JAVA开发人员,那么这5个JAVA开发人员就是核心员工;过段时间又中标一个项目,又需要5个JAVA开发人员,于是找外包公司临时借调了5个,则临时借调的5个开发人员就是临时扩充的员工;那么整个公司目前有10个JAVA开发人员,项目结束后,外包的5个开发人员会返回给外包公司,公司中始终保留5个核心的JAVA开发人员。
线程池执行探究
线程池中的线程这么实现回收与再利用?
回收:运行线程,会运行run方法,run方法执行结束,则线程的生命周期就结束了,JVM会自动回收
再利用:唯一能够重复利用的办法就是不结束线程,但是不可让线程一直循环处理,否则会一直占用cpu,应该做到当前有任务的时候执行,没任务的时候阻塞,因此使用队列可以很好的解决这个问题。
程序中怎么判断线程池是空闲还是繁忙?
只需要看队列中是否有记录即可。
ThreadPoolExecutor 的 execute 方法是线程的真正执行方法,代码如下:
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
// ctl存储线程数量,线程状态,3位存储状态,29位存储数量
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
// 创建核心线程
if (addWorker(command, true))
return;
c = ctl.get();
}
// 将任务添加到队列
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
// 拒绝
reject(command);
else if (workerCountOf(recheck) == 0)
//创建非核心检查
addWorker(null, false);
}
else if (!addWorker(command, false))
// 拒绝
reject(command);
}
execute 方法中核心逻辑在 addWorker() 方法中,一起探讨下 addWorker 方法,源码如下:
private boolean addWorker(Runnable firstTask, boolean core) {
// 计数过程
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// CAS 操作
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
// goto 语句
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
// 以下是核心逻辑
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
// worker是一个线程
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
// 加锁保证线程安全
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
// 放入线程池
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
addWorker 方法做了以下几个核心事情:
-
创建线程(是否为核心线程,取决于参数)
-
启动线程
-
判断线程是否超过最大值
线程池的执行逻辑如下图:
以上是关于线程池的设计思想的主要内容,如果未能解决你的问题,请参考以下文章
newCacheThreadPool()newFixedThreadPool()newScheduledThreadPool()newSingleThreadExecutor()自定义线程池(代码片段