10问10答:你真的了解线程池吗?

Posted 阿里技术

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了10问10答:你真的了解线程池吗?相关的知识,希望对你有一定的参考价值。


《Java开发手册》中强调,线程资源必须通过线程池提供,而创建线程池必须使用 ThreadPoolExecutor 。手册主要强调利用线程池避免两个问题,一是线程过渡切换,二是避免请求过多时造成OOM。但是如果参数配置错误,还是会引发上面的两个问题。所以本节我们主要是讨论 ThreadPoolExecutor 的一些技术细节,并且给出几个常用的最佳实践建议。

我在查找资料的过程中,发现有些问题存在争议。后面发现,一部分原因是因为不同JDK版本的现实是有差异的。因此,下面的分析是基于当下最常用的版本JDK1.8,并且对于存在争议的问题,我们分析源码,源码才是最准确的。

1  corePoolSize=0会怎么样

这是一个争议点。我发现大部分博文,不论是国内的还是国外的,都是这样回答这个问题的:

  • 提交任务后,先判断当前池中线程数是否小于corePoolSize,如果小于,则创建新线程执行这个任务。


  • 否则,判断等待队列是否已满,如果没有满,则添加到等待队列。


  • 否则,判断当前池中线程数是否大于maximumPoolSize,如果大于则拒绝。


  • 否则,创建一个新的线程执行这个任务。


按照上面的描述,如果corePoolSize=0,则会判断等待队列的容量,如果还有容量,则排队,并且不会创建新的线程。

—— 但其实,这是老版本的实现方式,从1.6之后,实现方式就变了。我们直接看 execute 的源码(submit也依赖它),我备注出了关键一行:
     
int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); // 注意这一行代码,添加到等待队列成功后,判断当前池内线程数是否为0,如果是则创建一个firstTask为null的worker,这个worker会从等待队列中获取任务并执行。 else if (workerCountOf(recheck) == 0) addWorker(null, false); } else if (!addWorker(command, false)) reject(command);

  • 线程池提交任务后,首先判断当前池中线程数是否小于corePoolSize


  • 如果小于则尝试创建新的线程执行该任务;否则尝试添加到等待队列。


  • 如果添加队列成功,判断当前池内线程数是否为0,如果是则创建一个firstTask为null的worker,这个worker会从等待队列中获取任务并执行。


  • 如果添加到等待队列失败,一般是队列已满,才会再尝试创建新的线程。


  • 但在创建之前需要与maximumPoolSize比较,如果小于则创建成功。


  • 否则执行拒绝策略。



上述问题需区分JDK版本。在1.6版本之后,如果 corePoolSize=0 ,提交任务时如果线程池为空,则会立即创建一个线程来执行任务(先排队再获取);如果提交任务的时候,线程池不为空,则先在等待队列中排队,只有队列满了才会创建新线程。

所以,优化在于,在队列没有满的这段时间内,会有一个线程在消费提交的任务;1.6之前的实现是,必须等队列满了之后,才开始消费。

2  线程池创建之后,会立即创建核心线程么

之前有人问过我这个问题,因为他发现应用中有些Bean创建了线程池,但是这个Bean一般情况下用不到,所以咨询我是否需要把这个线程池注释掉,以减少应用运行时的线程数(该应用运行时线程过多。)


不会。从上面的源码可以看出,在刚刚创建 ThreadPoolExecutor 的时候,线程并不会立即启动,而是要等到有任务提交时才会启动,除非调用了 prestartCoreThread / prestartAllCoreThreads 事先启动核心线程。

  • prestartCoreThread:Starts a core thread, causing it to idly wait for work. This overrides the default policy of starting core threads only when new tasks are executed.


  • prestartAllCoreThreads:Starts all core threads.


3  核心线程永远不会销毁么

这个问题有点tricky。首先我们要明确一下概念,虽然在JavaDoc中也使用了“core/non-core threads”这样的描述,但其实这是一个动态的概念,JDK并没有给一部分线程打上“core”的标记,做什么特殊化的处理。这个问题我认为想要探讨的是闲置线程终结策略的问题。

在JDK1.6之前,线程池会尽量保持 corePoolSize 个核心线程,即使这些线程闲置了很长时间。这一点曾被开发者诟病,所以从JDK1.6开始,提供了方法 allowsCoreThreadTimeOut ,如果传参为true,则允许闲置的核心线程被终止。

请注意这种策略和 corePoolSize=0 的区别。我总结的区别是:

  • corePoolSize=0:在一般情况下只使用一个线程消费任务,只有当并发请求特别多、等待队列都满了之后,才开始用多线程。


  • allowsCoreThreadTimeOut=true && corePoolSize>1:在一般情况下就开始使用多线程(corePoolSize个),当并发请求特别多,等待队列都满了之后,继续加大线程数。但是当请求没有的时候,允许核心线程也终止。


所以 corePoolSize=0 的效果,基本等同于 allowsCoreThreadTimeOut=true && corePoolSize=1 ,但实现细节其实不同。


在JDK1.6之后,如果 allowsCoreThreadTimeOut=true ,核心线程也可以被终止。

4  如何保证线程不被销毁

首先我们要明确一下线程池模型。线程池有个内部类 Worker ,它实现了 Runnable 接口,首先,它自己要run起来。然后它会在合适的时候获取我们提交的 Runnable 任务,然后调用 任务的run() 接口。一个 Worker 不终止的话可以不断执行任务。

我们前面说的“线程池中的线程”,其实就是 Worker ;等待队列中的元素,是我们提交的 Runnable 任务。

每一个 Worker 在创建出来的时候,会调用它本身的 run() 方法,实现是 runWorker(this) ,这个实现的核心是一个 while 循环,这个循环不结束, Worker 线程就不会终止,就是这个基本逻辑。

  • 在这个while条件中,有个getTask()方法是核心中的核心,它所做的事情就是从等待队列中取出任务来执行:


  • 如果没有达到corePoolSize,则创建的Worker在执行完它承接的任务后,会用workQueue.take()取任务、注意,这个接口是阻塞接口,如果取不到任务,Worker线程一直阻塞。


  • 如果超过了corePoolSize,或者allowCoreThreadTimeOut,一个Worker在空闲了之后,会用workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS)取任务。注意,这个接口只阻塞等待keepAliveTime时间,超过这个时间返回null,则Workerwhile循环执行结束,则被终止了。


 final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; try { // 看这里,核心逻辑在这里 while (task != null || (task = getTask()) != null) { w.lock(); // If pool is stopping, ensure thread is interrupted; // if not, ensure thread is not interrupted. This // requires a recheck in second case to deal with // shutdownNow race while clearing interrupt if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task); Throwable thrown = null; try { task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); } }

 private Runnable getTask() { boolean timedOut = false; // Did the last poll() time out?
for (;;) { int c = ctl.get(); int rs = runStateOf(c);
// Check if queue empty only if necessary. if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null; }
int wc = workerCountOf(c);
// Are workers subject to culling? boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null; continue; }
try { // 注意,核心中的核心在这里 Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) return r; timedOut = true; } catch (InterruptedException retry) { timedOut = false; } } }


实现方式非常巧妙,核心线程(Worker)即使一直空闲也不终止,是通过 workQueue.take() 实现的,它会一直阻塞到从等待队列中取到新的任务。非核心线程空闲指定时间后终止是通过 workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) 实现的,一个空闲的 Worker 只等待 keepAliveTime ,如果还没有取到任务则循环终止,线程也就运行结束了。

引申思考

Worker 本身就是个线程,它再调用我们传入的 Runnable.run() ,会启动一个子线程么?如果你还没有答案,再回想一下 Runnable Thread 的关系。

5  空闲线程过多会有什么问题

笼统地回答是会占用内存,我们分析一下占用了哪些内存。首先,比较普通的一部分,一个线程的内存模型:

  • 虚拟机栈

  • 本地方法栈

  • 程序计数器


我想额外强调是下面这几个内存占用,需要小心:

  • ThreadLocal:业务代码是否使用了ThreadLocal?就算没有,Spring框架中也大量使用了ThreadLocal,你所在公司的框架可能也是一样。


  • 局部变量:线程处于阻塞状态,肯定还有栈帧没有出栈,栈帧中有局部变量表,凡是被局部变量表引用的内存都不能回收。所以如果这个线程创建了比较大的局部变量,那么这一部分内存无法GC。


  • TLAB机制:如果你的应用线程数处于高位,那么新的线程初始化可能因为Eden没有足够的空间分配TLAB而触发YoungGC。



  • 线程池保持空闲的核心线程是它的默认配置,一般来讲是没有问题的,因为它占用的内存一般不大。怕的就是业务代码中使用ThreadLocal缓存的数据过大又不清理。


  • 如果你的应用线程数处于高位,那么需要观察一下YoungGC的情况,估算一下Eden大小是否足够。如果不够的话,可能要谨慎地创建新线程,并且让空闲的线程终止;必要的时候,可能需要对JVM进行调参。


6  keepAliveTime=0会怎么样

这也是个争议点。有的博文说等于0表示空闲线程永远不会终止,有的说表示执行完立刻终止。还有的说等于-1表示空闲线程永远不会终止。其实稍微看一下源码知道了,这里我直接抛出答案。


在JDK1.8中, keepAliveTime=0 表示非核心线程执行完立刻终止。

默认情况下, keepAliveTime 小于0,初始化的时候才会报错;但如果 allowsCoreThreadTimeOut keepAliveTime 必须大于0,不然初始化报错。

7  怎么进行异常处理

很多代码的写法,我们都习惯按照常见范式去编写,而没有去思考为什么。比如:

  • 如果我们使用execute()提交任务,我们一般要在Runable任务的代码加上try-catch进行异常处理。


  • 如果我们使用submit()提交任务,我们一般要在主线程中,对Future.get()进行try-catch进行异常处理。


—— 但是在上面,我提到过, submit() 底层实现依赖 execute() ,两者应该统一呀,为什么有差异呢?下面再扒一扒 submit() 的源码,它的实现蛮有意思。

首先, ThreadPoolExecutor 中没有 submit 的代码,而是在它的父类 AbstractExecutorService 中,有三个 submit 的重载方法,代码非常简单,关键代码就两行:

 public Future<?> submit(Runnable task) { if (task == null) throw new NullPointerException(); RunnableFuture<Void> ftask = newTaskFor(task, null); execute(ftask); return ftask; } public <T> Future<T> submit(Runnable task, T result) { if (task == null) throw new NullPointerException(); RunnableFuture<T> ftask = newTaskFor(task, result); execute(ftask); return ftask; } public <T> Future<T> submit(Callable<T> task) { if (task == null) throw new NullPointerException(); RunnableFuture<T> ftask = newTaskFor(task); execute(ftask); return ftask; }

正是因为这三个重载方法,都调用了 execute ,所以我才说 submit 底层依赖 execute 。通过查看这里 execute 的实现,我们不难发现,它就是 ThreadPoolExecutor 中的实现,所以,造成 submit execute 的差异化的代码,不在这。那么造成差异的一定在 newTaskFor 方法中。这个方法也就 new 了一个 FutureTask 而已, FutureTask 实现 RunnableFuture 接口, RunnableFuture 接口继承 Runnable 接口和 Future 接口。而 Callable 只是 FutureTask 的一个成员变量。

所以讲到这里,就有另一个Java基础知识点: Callable Future 的关系。我们一般用 Callable 编写任务代码, Future 是异步返回对象,通过它的 get 方法,阻塞式地获取结果。 FutureTask 的核心代码就是实现了 Future 接口,也就是 get 方法的实现:

 public V get() throws InterruptedException, ExecutionException { int s = state; if (s <= COMPLETING) // 核心代码 s = awaitDone(false, 0L); return report(s); }
private int awaitDone(boolean timed, long nanos) throws InterruptedException { final long deadline = timed ? System.nanoTime() + nanos : 0L; WaitNode q = null; boolean queued = false; // 死循环 for (;;) { if (Thread.interrupted()) { removeWaiter(q); throw new InterruptedException(); }
int s = state; // 只有任务的状态是’已完成‘,才会跳出死循环 if (s > COMPLETING) { if (q != null) q.thread = null; return s; } else if (s == COMPLETING) // cannot time out yet Thread.yield(); else if (q == null) q = new WaitNode(); else if (!queued) queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q); else if (timed) { nanos = deadline - System.nanoTime(); if (nanos <= 0L) { removeWaiter(q); return state; } LockSupport.parkNanos(this, nanos); } else LockSupport.park(this); } }

get 的核心实现是有个 awaitDone 方法,这是一个死循环,只有任务的状态是“已完成”,才会跳出死循环;否则会依赖UNSAFE包下的 LockSupport.park 原语进行阻塞,等待 LockSupport.unpark 信号量。而这个信号量只有当运行结束获得结果、或者出现异常的情况下,才会发出来。分别对应方法 set setException 。这就是异步执行、阻塞获取的原理,扯得有点远了。

回到最初我们的疑问,为什么 submit 之后,通过 get 方法可以获取到异常?原因是 FutureTask 有一个 Object 类型的 outcome 成员变量,用来记录执行结果。这个结果可以是传入的泛型,也可以是 Throwable 异常:

 public void run() { if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return; try { Callable<V> c = callable; if (c != null && state == NEW) { V result; boolean ran; try { result = c.call(); ran = true; } catch (Throwable ex) { result = null; ran = false; setException(ex); } if (ran) set(result); } } finally { // runner must be non-null until state is settled to // prevent concurrent calls to run() runner = null; // state must be re-read after nulling runner to prevent // leaked interrupts int s = state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } }
// get方法中依赖的,报告执行结果 private V report(int s) throws ExecutionException { Object x = outcome; if (s == NORMAL) return (V)x; if (s >= CANCELLED) throw new CancellationException(); throw new ExecutionException((Throwable)x); }

FutureTask 的另一个巧妙的地方就是借用 RunnableAdapter 内部类,将 submit Runnable 封装成 Callable 。所以就算你 submit 的是 Runnable ,一样可以用 get 获取到异常。


  • 不论是用execute还是submit,都可以自己在业务代码上加try-catch进行异常处理。我一般喜欢使用这种方式,因为我喜欢对不同业务场景的异常进行差异化处理,至少打不一样的日志吧。


  • 如果是execute,还可以自定义线程池,继承ThreadPoolExecutor并复写其afterExecute(Runnable r, Throwable t)方法。


  • 或者实现Thread.UncaughtExceptionHandler接口,实现void uncaughtException(Thread t, Throwable e);方法,并将该handler传递给线程池的ThreadFactory


  • 但是注意,afterExecuteUncaughtExceptionHandler都不适用submit。因为通过上面的FutureTask.run()不难发现,它自己对Throwable进行了try-catch,封装到了outcome属性,所以底层方法executeWorker是拿不到异常信息的。


8  线程池需不需要关闭


一般来讲,线程池的生命周期跟随服务的生命周期。如果一个服务(Service)停止服务了,那么需要调用 shutdown 方法进行关闭。所以 ExecutorService.shutdown 在Java以及一些中间件的源码中,是封装在Service的 shutdown 方法内的。

如果是Server端不重启就不停止提供服务,我认为是不需要特殊处理的。

9  shutdown和shutdownNow的区别


  • shutdown => 平缓关闭,等待所有已添加到线程池中的任务执行完再关闭。


  • shutdownNow => 立刻关闭,停止正在执行的任务,并返回队列中未执行的任务。


本来想分析一下两者的源码的,但是发现本文的篇幅已经过长了,源码也贴了不少。感兴趣的朋友自己看一下即可。

10  Spring中有哪些和ThreadPoolExecutor类似的工具


SimpleAsyncTaskExecutor 每次请求新开线程,没有最大线程数设置.不是真的线程池,这个类不重用线程,每次调用都会创建一个新的线程。
SyncTaskExecutor 不是异步的线程同步可以用SyncTaskExecutor,但这个可以说不算一个线程池,因为还在原线程执行。这个类没有实现异步调用,只是一个同步操作。
ConcurrentTaskExecutor Executor的适配类,不推荐使用。如果ThreadPoolTaskExecutor不满足要求时,才用考虑使用这个类。
SimpleThreadPoolTaskExecutor 监听Spring’s lifecycle callbacks,并且可以和Quartz的Component兼容.是Quartz的SimpleThreadPool的类。线程池同时被quartz和非quartz使用,才需要使用此类。

这里我想着重强调的就是 SimpleAsyncTaskExecutor ,Spring中使用的 @Async 注解,底层就是基于 SimpleAsyncTaskExecutor 去执行任务,只不过它不是线程池,而是每次都新开一个线程。

另外想要强调的是 Executor 接口。Java初学者容易想当然的以为 Executor 结尾的类就是一个线程池,而上面的都是反例。我们可以在JDK的 execute 方法上看到这个注释:

/*** Executes the given command at some time in the future. The command* may execute in a new thread, in a pooled thread, or in the calling* thread, at the discretion of the {@code Executor} implementation.*/

所以,它的职责并不是提供一个线程池的接口,而是提供一个“将来执行命令”的接口。真正能代表线程池意义的,是 ThreadPoolExecutor 类,而不是 Executor 接口。

最佳实践总结

  • 【强制】使用ThreadPoolExecutor的构造函数声明线程池,避免使用Executors类的 newFixedThreadPoolnewCachedThreadPool


  • 【强制】 创建线程或线程池时请指定有意义的线程名称,方便出错时回溯。即threadFactory参数要构造好。


  • 【建议】建议不同类别的业务用不同的线程池。


  • 【建议】CPU密集型任务(N+1):这种任务消耗的主要是CPU资源,可以将线程数设置为N(CPU核心数)+1,比CPU核心数多出来的一个线程是为了防止线程偶发的缺页中断,或者其它原因导致的任务暂停而带来的影响。一旦任务暂停,CPU就会处于空闲状态,而在这种情况下多出来的一个线程就可以充分利用CPU的空闲时间。


  • 【建议】I/O密集型任务(2N):这种任务应用起来,系统会用大部分的时间来处理I/O交互,而线程在处理I/O的时间段内不会占用CPU来处理,这时就可以将CPU交出给其它线程使用。因此在I/O密集型任务的应用中,我们可以多配置一些线程,具体的计算方法是2N。


  • 【建议】workQueue不要使用无界队列,尽量使用有界队列。避免大量任务等待,造成OOM。


  • 【建议】如果是资源紧张的应用,使用allowsCoreThreadTimeOut可以提高资源利用率。


  • 【建议】虽然使用线程池有多种异常处理的方式,但在任务代码中,使用try-catch最通用,也能给不同任务的异常处理做精细化。


  • 【建议】对于资源紧张的应用,如果担心线程池资源使用不当,可以利用ThreadPoolExecutor的API实现简单的监控,然后进行分析和优化。



线程池初始化示例:

 private static final ThreadPoolExecutor pool;
static { ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("po-detail-pool-%d").build(); pool = new ThreadPoolExecutor(4, 8, 60L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(512), threadFactory, new ThreadPoolExecutor.AbortPolicy()); pool.allowCoreThreadTimeOut(true); }

  • threadFactory:给出带业务语义的线程命名。


  • corePoolSize:快速启动4个线程处理该业务,是足够的。


  • maximumPoolSize:IO密集型业务,我的服务器是4C8G的,所以4*2=8。


  • keepAliveTime:服务器资源紧张,让空闲的线程快速释放。


  • pool.allowCoreThreadTimeOut(true):也是为了在可以的时候,让线程释放,释放资源。


  • workQueue:一个任务的执行时长在100~300ms,业务高峰期8个线程,按照10s超时(已经很高了)。10s钟,8个线程,可以处理10 * 1000ms / 200ms * 8 = 400个任务左右,往上再取一点,512已经很多了。


  • handler:极端情况下,一些任务只能丢弃,保护服务端。




免费领取电子书

《Spring Boot 2.5开发实战》


本书包含了Spring Boot 2.5新特性、自动化配置原理、REST API开发、mysql、Redis高并发缓存、MongoDB、MQ消息队列、安全机制、 性能监控等核心知识点,带你上手实战!


扫码加阿里妹好友,回复“Spring Boot 2.5开发实战”获取吧~(若扫码无效,可通过微信号alimei4、alimei5、alimei6、alimei7直接添加)

以上是关于10问10答:你真的了解线程池吗?的主要内容,如果未能解决你的问题,请参考以下文章

10问10答:你真的了解线程池吗?

你真的熟悉数据连接池吗?手写实现连接池

Fiddler 使用10问10答(一)

你知道如何安全正确的关闭线程池吗?

[转]资深CTO:关于技术团队打造与管理的10问10答

拜托!你真会用线程池吗?