线程池的原理
Posted Tomcat那些事儿
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了线程池的原理相关的知识,希望对你有一定的参考价值。
前面的文章中(),我们提到Connector在处理请求时,会使用到线程池这样一个概念,这样在处理请求时就可以避免每次重新生成并销毁线程带来的开销。
线程池的概念我们搞开发的可能常听到,甚至都能说出个一二三。本文将以Tomcat的线程池为例,来分析一下线程池的工作原理。
线程的创建及销毁
我们首先来看,为什么说每次处理任务的时候再创建并销毁线程效率不高。
线程的创建
在Java中,创建一个线程,直观上我们可以感受到的是这样一行代码
Thread t = new Thread();
当然,在其构造方法中,会对线程的各项参数进行初始化,这个时候,只是在java层面创建了一个对象而已。
创建线程后,为了让它干活,我们会执行线程的start方法。此方法内部会调用到一个native方法start0,下面是其对于的native代码。
JVM_ENTRY(void, JVM_StartThread(JNIEnv* env, jobject jthread))
JVMWrapper("JVM_StartThread");
JavaThread *native_thread = NULL;
{
MutexLocker mu(Threads_lock);
if (java_lang_Thread::is_stillborn(JNIHandles::resolve_non_null(jthread)) ||
java_lang_Thread::thread(JNIHandles::resolve_non_null(jthread)) != NULL) {
throw_illegal_thread_state = true;
} else {
jlong size =
java_lang_Thread::stackSize(JNIHandles::resolve_non_null(jthread));
size_t sz = size > 0 ? (size_t) size : 0;
native_thread = new JavaThread(&thread_entry, sz);
if (native_thread->osthread() != NULL) {
// Note: the current thread is not being used within "prepare".
native_thread->prepare(jthread);
}} }
Thread::start(native_thread);
JVM_END
我们看到代码中至少进行了native线程的创建,根据线程栈配置大小并分配内存,再对应到操作系统的线程上。
如果有大量的请求需要处理,就涉及到频繁的创建和销毁,而且线程数量太多的时候,还会有上下文切换影响执行效率的问题。
线程池
线程池,故名思意,就是存放线程的池子,在需要使用线程的时候,可以方便快捷的获取。
《Java并发编程实战》中对线程池概括如下
线程池,从字面含义来看,是指管理一组同构工作线程的资源池。线程池是与工作队列密切相关的,其中在工作队列中保存了所有等待执行的任务。工作者线程的任务很简单: 从工作队列中获取一个任务,执行任务,然后返回线程池并等待下一个任务。
使用线程池执行任务,比为每个任务分配一个线程的优势有:
通过重用现有的线程而不是创建新线程,可以在处理多个请求时分摊在线程创建和销毁过程中产生的巨大开销。
另一个额外好处是,当请求到达时,工作线程通常已经存在,因此不会由于等待创建线程而延迟任务执行
同时还可以防止过多线程相互竞争资源而使应用程序耗尽内存或失败。
我们来看Tomcat中对于线程池是如何使用的。每个Connector都是通过Executor来处理请求的,可能是共享的,也可能是独立的。
Connector的独立Executor创建
public void createExecutor() {
TaskQueue taskqueue = new TaskQueue();
TaskThreadFactory tf = new TaskThreadFactory(getName() + "-exec-", daemon, getThreadPriority());
executor = new ThreadPoolExecutor(getMinSpareThreads(), getMaxThreads(), 60, TimeUnit.SECONDS,taskqueue, tf);
taskqueue.setParent( (ThreadPoolExecutor) executor);
}========================================================
StandardThreadExecutor的Executor创建
protected void startInternal() throws LifecycleException {
taskqueue = new TaskQueue(maxQueueSize);
TaskThreadFactory tf = new TaskThreadFactory(namePrefix,daemon,getThreadPriority());
executor = new ThreadPoolExecutor(getMinSpareThreads(), getMaxThreads(), maxIdleTime, TimeUnit.MILLISECONDS,taskqueue, tf);
executor.setThreadRenewalDelay(threadRenewalDelay);
if (prestartminSpareThreads) {
executor.prestartAllCoreThreads();
}
taskqueue.setParent(executor);
}
通过上面的代码,我们可以归纳下创建一个Executor的大致步骤:
指定一个taskQueue
声明一个ThreadFactory
根据前两步创建的对象,来声明一个Executor对象
指定taskQueue的parent,这一步主要用于在特定情况下条件判断
声明了Executor之后,对于任务的执行,就直接交给它就OK了,看就这么简单:
Executor executor = getExecutor(); //此处无论是否配置executor,都有值,因为没配置的话每个Connector会创建自己的Executor,而不使用共享的
if (dispatch && executor != null) {
executor.execute(sc);以上是NIO通道的任务处理
BIO的处理也一个样:
getExecutor().execute(new SocketProcessor(wrapper));
我们再来看Tomcat的Executor对于任务执行是如何实现的:
每个任务都是一个实现了Runnable接口的类
public void execute(Runnable command) {
execute(command,0,TimeUnit.MILLISECONDS);
}
execute包含一个重写的方法:
public void execute(Runnable command, long timeout, TimeUnit unit) {
submittedCount.incrementAndGet();
try {
super.execute(command);
} catch (RejectedExecutionException rx) {
if (super.getQueue() instanceof TaskQueue) {
final TaskQueue queue = (TaskQueue)super.getQueue();
try {
if (!queue.force(command, timeout, unit)) {
submittedCount.decrementAndGet();
throw new RejectedExecutionException("Queue capacity is full.");
}
} catch (InterruptedException x) {
submittedCount.decrementAndGet();
} }
而这些任务在提交之后,最终会由到JDK定义的ThreadPoolExecutor的逻辑进行执行。由于JDK自带的ThreadPoolExecutor实现功能已经很好,大部分情况下可以直接拿来用,而特别的情况下,可以基于此进行定制。Tomcat的线程池实现也是这样的,在其之上增加了线程的统计等逻辑。
在这个execute方法内部,代码如下:
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
任务被执行的逻辑分以下几种情况:
在少于核心线程数的情况下,创建新的工作线程执行任务。
否则直接放到等待队列中
如果队列已满同时不能创建工作线程,就会拒绝任务
我们再来看创建工作线程和拒绝这两个流程。
首先来看addWorker中的最核心代码逻辑如下,可以简要概括为以下二点
使用自定义线程工厂创建线程
启动新创建的线程(线程启动后会执行传入的任务)
w = new Worker(firstTask);
final Thread t = w.thread;
if (workerAdded) {
t.start();
workerStarted = true;
}
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
//使用前面自定义的ThreadFactory进行线程的
this.thread = getThreadFactory().newThread(this);
}
同时,上面的代码提到另外几个概念
等待队列
拒绝策略
等待队列是上面指定的TaskQueue,其使用LinkedBlockingQueue实现,可以指定队列的最大容量。在线程达到最大线程数,同时queue也满了的时候,任务的执行就会被拒绝。
我们来看,任务不能被处理时的拒绝策略
final void reject(Runnable command) {
handler.rejectedExecution(command, this);
}
此时会提到一个的RejectedExecutionHandler接口,其对应提供了多种适用于不同场景的拒绝方式。
默认使用的是AbortPolicy,
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException();
}
会直接抛出RejectedExecutionException。
而对于DiscardOldestPolicy就会把队列中旧的任务丢弃,转而执行新的。
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
e.getQueue().poll();
e.execute(r);
} }
而Executor中的另一个重要的参数超时时间,是在超过核心线程数的时候,如果有空闲且超时的线程就会被销毁,从而使池维持在指定的核心线程数上。
有了前面的内容后,我们来看JDK自带的ExecutorService提供了几种便捷的创建线程池的方式:
newFixedThreadPool
newCachedThreadPool
newSingleThreadExecutor
...
实质上其实就是指定了上面提到的几个参数。例如,固定大小的线程池其实就是最大线程数和核心线程数一致的。
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
而可缓存的线程池其实就是指定核心线程数,最大线程数是Integer的最大值的一种Executor。
Executor同时还支持另外一种可以返回结果的任务调用,即将实现了Callable接口的任务提交到Executor中,并使用Future再获取返回值。
<T> Future<T> submit(Callable<T> task);
在Tomcat的组件启动中,也有用到这种形式的任务,例如下面的代码
List<Future<Void>> results = new ArrayList<>();
for (int i = 0; i < children.length; i++) {
results.add(startStopExecutor.submit(new StartChild(children[i])));
}
for (Future<Void> result : results) {
try {
result.get();// 获取结果
} catch (Exception e) {
log.error(sm.getString("containerBase.threadedStartFailed"), e);
fail = true;
}}
以上,是线程池的基本原理,以及其在Tomcat中的实际应用情况,快用它替换掉你开发工作中手动new出来的线程吧。
以上是关于线程池的原理的主要内容,如果未能解决你的问题,请参考以下文章
newCacheThreadPool()newFixedThreadPool()newScheduledThreadPool()newSingleThreadExecutor()自定义线程池(代码片段