Java 线程池抛 RejectedExecutionException 异常时 active threads 值分析
Posted 回归心灵
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Java 线程池抛 RejectedExecutionException 异常时 active threads 值分析相关的知识,希望对你有一定的参考价值。
背景介绍
系统日志有 RejectedExecutionException 异常,抛异常的线程池是业务程序为了某类处理过程比较慢的请求创建的,线程资源隔离,避免阻塞这个系统。异常信息类似下面输出:
java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.FutureTask@56235b8e rejected from
java.util.concurrent.ThreadPoolExecutor@49c2faae
[Running, pool size = 80, active threads = 3, queued tasks = 3, completed tasks = 1674]
发现 active threads 的值为3,队列是满的,线程池的大小是 80,已经达到最大线程数。由于对 active threads 值确切的含义还不是很清晰,因此产生了疑问:活跃线程数才 3 个就触发了拒绝策略,其他的 77 个线程在干嘛呢?为什么 active threads 不等于 pool size?
源码分析
在官方文档上有说明 getActiveCount() 方法获取的值是活跃线程的大致值,不是准确值。只看文档说明还是不能有清晰的认识,活跃线程代表的是什么意思,值又是怎么获取的。带着这样的疑问就去JDK源码中找答案。
线程池对象调用 submit 方法提交任务后,会执行 ThreadPoolExecutor 类中的 execute 方法:
public void execute(Runnable var1)
if (var1 == null)
throw new NullPointerException();
else
int var2 = this.ctl.get();
if (workerCountOf(var2) < this.corePoolSize)
if (this.addWorker(var1, true))
return;
var2 = this.ctl.get();
if (isRunning(var2) && this.workQueue.offer(var1))
int var3 = this.ctl.get();
if (!isRunning(var3) && this.remove(var1))
this.reject(var1);
else if (workerCountOf(var3) == 0)
this.addWorker((Runnable)null, false);
else if (!this.addWorker(var1, false))
this.reject(var1);
从日志输出情况来看,提交任务时队列已满,就会走最下面的 else if (!this.addWorker(var1, false))
语句,由于线程池已经达到最大线程线程数,很明显 addWorker 方法会返回 false,从而执行拒绝逻辑抛出异常。
异常信息是 ThreadPoolExecutor 类的 toString 方法获取的
public String toString()
ReentrantLock var5 = this.mainLock;
var5.lock();
long var1;
int var3;
int var4;
try
var1 = this.completedTaskCount;
var4 = 0;
var3 = this.workers.size();
Iterator var6 = this.workers.iterator();
while(var6.hasNext())
ThreadPoolExecutor.Worker var7 = (ThreadPoolExecutor.Worker)var6.next();
var1 += var7.completedTasks;
if (var7.isLocked())
++var4;
finally
var5.unlock();
int var11 = this.ctl.get();
String var12 = runStateLessThan(var11, 0) ? "Running" : (runStateAtLeast(var11, 1610612736) ? "Terminated" : "Shutting down");
return super.toString() + "[" + var12 + ", pool size = " + var3 + ", active threads = " + var4 + ", queued tasks = " + this.workQueue.size() + ", completed tasks = " + var1 + "]";
从代码中可以看出 pool size 的值是成员变量 this.workers 的元素数量大小,其类型是 HashSet。而 active threads 的值是 this.workers 中元素是 isLocked 状态的数量总和。isLocked 方法是判断元素(ThreadPoolExecutor.Worker 类型)的 state 变量是否为 0 ,ThreadPoolExecutor.Worker 继承了 AbstractQueuedSynchronizer 类,state 是 AQS 用于实现同步的一个状态变量。那在 ThreadPoolExecutor.Worker 中是代表什么含义呢?
在 runWorker 方法中有对 state 变量状态进行改变, runWorker 方法主要逻辑就是线程不断从队列中获取任务然后执行任务。当线程获取到任务后就会改变 state 状态,获取锁 var1.lock();
,以及执行完任务后释放锁 var1.unlock();
。
final void runWorker(ThreadPoolExecutor.Worker var1)
Thread var2 = Thread.currentThread();
Runnable var3 = var1.firstTask;
var1.firstTask = null;
var1.unlock();
boolean var4 = true;
try
while(var3 != null || (var3 = this.getTask()) != null)
var1.lock();
if ((runStateAtLeast(this.ctl.get(), 536870912) || Thread.interrupted() && runStateAtLeast(this.ctl.get(), 536870912)) && !var2.isInterrupted())
var2.interrupt();
try
this.beforeExecute(var2, var3);
Object var5 = null;
try
var3.run();
catch (RuntimeException var28)
var5 = var28;
throw var28;
catch (Error var29)
var5 = var29;
throw var29;
catch (Throwable var30)
var5 = var30;
throw new Error(var30);
finally
this.afterExecute(var3, (Throwable)var5);
finally
var3 = null;
++var1.completedTasks;
var1.unlock();
var4 = false;
finally
this.processWorkerExit(var1, var4);
因此也就知道 active threads 表示的是正在执行任务中线程数。
程序复现
package org.yrs.jvm;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* @Author: yangrusheng
* @Description:
* @Date: Created in 9:19 2020/12/2
* @Modified By:
*/
public class ThreadTest
public static void main(String[] args)
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(10, 80, 10, TimeUnit.SECONDS, new LinkedBlockingQueue<>(3), new ThreadPoolExecutor.AbortPolicy());
for (int i=0; i< 500000; i++)
try
threadPoolExecutor.submit(new Thread( () ->
try
Thread.sleep(1000L);
catch (InterruptedException e)
e.printStackTrace();
));
catch (Exception e)
System.out.println(e);
if (i % 100 == 0 )
try
Thread.sleep(2000L);
catch (InterruptedException e)
e.printStackTrace();
总结
active thread 表示的是线程获取任务后,正在执行任务中的一个状态。在执行任务完成后会改变其状态为“not active thread”,然后会不停的去获取任务,直到下次成功获取新的任务,状态才又变为“active thread”。pool size 表示的是线程池中总的线程数,它们两个值之所以不相等或者说相差很大,是因为线程除了正在执行任务的状态,还有成功获取任务前的一段时间。
以上是关于Java 线程池抛 RejectedExecutionException 异常时 active threads 值分析的主要内容,如果未能解决你的问题,请参考以下文章
HttpClient连接池抛出大量ConnectionPoolTimeoutException: Timeout waiting for connection异常排查