Java 线程池抛 RejectedExecutionException 异常时 active threads 值分析

Posted 回归心灵

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Java 线程池抛 RejectedExecutionException 异常时 active threads 值分析相关的知识,希望对你有一定的参考价值。

背景介绍

系统日志有 RejectedExecutionException 异常,抛异常的线程池是业务程序为了某类处理过程比较慢的请求创建的,线程资源隔离,避免阻塞这个系统。异常信息类似下面输出:

java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.FutureTask@56235b8e rejected from 
java.util.concurrent.ThreadPoolExecutor@49c2faae
[Running, pool size = 80, active threads = 3, queued tasks = 3, completed tasks = 1674]

发现 active threads 的值为3,队列是满的,线程池的大小是 80,已经达到最大线程数。由于对 active threads 值确切的含义还不是很清晰,因此产生了疑问:活跃线程数才 3 个就触发了拒绝策略,其他的 77 个线程在干嘛呢?为什么 active threads 不等于 pool size?

源码分析

在官方文档上有说明 getActiveCount() 方法获取的值是活跃线程的大致值,不是准确值。只看文档说明还是不能有清晰的认识,活跃线程代表的是什么意思,值又是怎么获取的。带着这样的疑问就去JDK源码中找答案。

线程池对象调用 submit 方法提交任务后,会执行 ThreadPoolExecutor 类中的 execute 方法:

public void execute(Runnable var1) 
        if (var1 == null) 
            throw new NullPointerException();
         else 
            int var2 = this.ctl.get();
            if (workerCountOf(var2) < this.corePoolSize) 
                if (this.addWorker(var1, true)) 
                    return;
                

                var2 = this.ctl.get();
            

            if (isRunning(var2) && this.workQueue.offer(var1)) 
                int var3 = this.ctl.get();
                if (!isRunning(var3) && this.remove(var1)) 
                    this.reject(var1);
                 else if (workerCountOf(var3) == 0) 
                    this.addWorker((Runnable)null, false);
                
             else if (!this.addWorker(var1, false)) 
                this.reject(var1);
            

        
    

从日志输出情况来看,提交任务时队列已满,就会走最下面的 else if (!this.addWorker(var1, false))语句,由于线程池已经达到最大线程线程数,很明显 addWorker 方法会返回 false,从而执行拒绝逻辑抛出异常。

异常信息是 ThreadPoolExecutor 类的 toString 方法获取的

public String toString() 
        ReentrantLock var5 = this.mainLock;
        var5.lock();

        long var1;
        int var3;
        int var4;
        try 
            var1 = this.completedTaskCount;
            var4 = 0;
            var3 = this.workers.size();
            Iterator var6 = this.workers.iterator();

            while(var6.hasNext()) 
                ThreadPoolExecutor.Worker var7 = (ThreadPoolExecutor.Worker)var6.next();
                var1 += var7.completedTasks;
                if (var7.isLocked()) 
                    ++var4;
                
            
         finally 
            var5.unlock();
        

        int var11 = this.ctl.get();
        String var12 = runStateLessThan(var11, 0) ? "Running" : (runStateAtLeast(var11, 1610612736) ? "Terminated" : "Shutting down");
        return super.toString() + "[" + var12 + ", pool size = " + var3 + ", active threads = " + var4 + ", queued tasks = " + this.workQueue.size() + ", completed tasks = " + var1 + "]";
    

从代码中可以看出 pool size 的值是成员变量 this.workers 的元素数量大小,其类型是 HashSet。而 active threads 的值是 this.workers 中元素是 isLocked 状态的数量总和。isLocked 方法是判断元素(ThreadPoolExecutor.Worker 类型)的 state 变量是否为 0 ,ThreadPoolExecutor.Worker 继承了 AbstractQueuedSynchronizer 类,state 是 AQS 用于实现同步的一个状态变量。那在 ThreadPoolExecutor.Worker 中是代表什么含义呢?

在 runWorker 方法中有对 state 变量状态进行改变, runWorker 方法主要逻辑就是线程不断从队列中获取任务然后执行任务。当线程获取到任务后就会改变 state 状态,获取锁 var1.lock();,以及执行完任务后释放锁 var1.unlock();

final void runWorker(ThreadPoolExecutor.Worker var1) 
        Thread var2 = Thread.currentThread();
        Runnable var3 = var1.firstTask;
        var1.firstTask = null;
        var1.unlock();
        boolean var4 = true;

        try 
            while(var3 != null || (var3 = this.getTask()) != null) 
                var1.lock();
                if ((runStateAtLeast(this.ctl.get(), 536870912) || Thread.interrupted() && runStateAtLeast(this.ctl.get(), 536870912)) && !var2.isInterrupted()) 
                    var2.interrupt();
                

                try 
                    this.beforeExecute(var2, var3);
                    Object var5 = null;

                    try 
                        var3.run();
                     catch (RuntimeException var28) 
                        var5 = var28;
                        throw var28;
                     catch (Error var29) 
                        var5 = var29;
                        throw var29;
                     catch (Throwable var30) 
                        var5 = var30;
                        throw new Error(var30);
                     finally 
                        this.afterExecute(var3, (Throwable)var5);
                    
                 finally 
                    var3 = null;
                    ++var1.completedTasks;
                    var1.unlock();
                
            

            var4 = false;
         finally 
            this.processWorkerExit(var1, var4);
        

    

因此也就知道 active threads 表示的是正在执行任务中线程数。

程序复现

package org.yrs.jvm;

import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
 * @Author: yangrusheng
 * @Description:
 * @Date: Created in 9:19 2020/12/2
 * @Modified By:
 */
public class ThreadTest 

    public static void main(String[] args) 
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(10, 80, 10, TimeUnit.SECONDS, new LinkedBlockingQueue<>(3), new ThreadPoolExecutor.AbortPolicy());
        for (int i=0; i< 500000; i++) 
            try 
                threadPoolExecutor.submit(new Thread( () -> 
                    try 
                        Thread.sleep(1000L);
                     catch (InterruptedException e) 
                        e.printStackTrace();
                    
                ));
             catch (Exception e) 
                System.out.println(e);
            
            if (i % 100 == 0 ) 
                try 
                    Thread.sleep(2000L);
                 catch (InterruptedException e) 
                    e.printStackTrace();
                
            
        
    

总结

active thread 表示的是线程获取任务后,正在执行任务中的一个状态。在执行任务完成后会改变其状态为“not active thread”,然后会不停的去获取任务,直到下次成功获取新的任务,状态才又变为“active thread”。pool size 表示的是线程池中总的线程数,它们两个值之所以不相等或者说相差很大,是因为线程除了正在执行任务的状态,还有成功获取任务前的一段时间。

以上是关于Java 线程池抛 RejectedExecutionException 异常时 active threads 值分析的主要内容,如果未能解决你的问题,请参考以下文章

线程池中某个线程执行有异常,该如何处理?

线程池中某个线程执行有异常,该如何处理?

HttpClient连接池抛出大量ConnectionPoolTimeoutException: Timeout waiting for connection异常排查

面试官:线程池中线程抛了异常,该如何处理?

京东二面:线程池中的线程抛出了异常,该如何处理?大部分人都会答错!

AsyncTask、RejectedExecutionException 和任务限制