线程池任务异常处理

Posted 苦咖啡-coffe

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了线程池任务异常处理相关的知识,希望对你有一定的参考价值。

当线程池中执行的业务任务出现异常后是怎么处理的,这个点没有详细看过,因为我们都会在业务中捕获异常,但是频繁被人问道,看了一下源代码,有RuntimeException、Error、Throwable 三个异常捕获,然后做了一些后续的操作,下面分析具体异常处理流程。

一、ThreadPoolExceute.execute() 方法中业务代码异常分析


1.1 线程池线程工作入口runWorker() 方法

该方法是在Wroker类的run方法中直接调用,也就是线程池中任务线程run方法触发。

1.2 runWorker方法核心代码如下

final void runWorker(Worker w) 
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        //1标识是正常结束还是异常结束,默认是异常中断,正常结束后面单独赋值
        boolean completedAbruptly = true;
        try 
            while (task != null || (task = getTask()) != null) 
                w.lock();
                try 
                    //2预留的一个入口,目前是空的,没干什么
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try 
                        //3任务run方法执行,因为runWorker是执行在线程的run方法中,这里直接调用执行task.run没问题!
                        //4任务执行异常则会在run方法中抛异常。
                        task.run();
                     catch (RuntimeException x) 
                        //6抛异常
                        thrown = x; throw x;
                     catch (Error x) 
                        //6抛异常
                        thrown = x; throw x;
                     catch (Throwable x) 
                        //6抛异常
                        thrown = x; throw new Error(x);
                     finally 
                       //5当业务流程抛异常后,将异常抛出去前会优先执行这里的afterExecute方法,该方法也是一个扩展点,目前是空方法
                        afterExecute(task, thrown);
                    
                 finally 
                    //7捕获6中抛出来的异常,将任务结束
                    task = null;
                    //8当前worker线程完成任务
w.completedTasks++;
                    //9释放锁, finally执行完之后异常再抛到外层即while自旋结束。
                    w.unlock();
                
            
            //10因为是异常结束,所以这段代码不被执行
            completedAbruptly = false;
         finally 
            //11内部异常被finally拦截,执行processWorkerExit方法,当该方法执行结束后该工作线程结束即一个工作线程因为业务异常结束。具体操作看看processWorkerExit方法。
            processWorkerExit(w, completedAbruptly);
        
    

      标记4中调用业务任务run方法执行任务,如果业务抛出异常则标记6中会捕获各种异常,包括JVM错误,并且thrown变量存储异常信息。

   异常被捕获后执行try、catch、finally中钩子方法afterExecute(Runnable task,Throwable throw),而目前该钩子方法为空。到这一步业务异常会被抛出。

    在标记7、8、9捕获,释放任务、释放当前工作线程锁,因为这层只有finally没有catch所以异常继续被抛出,到这里runWorker方法自旋结束。最外层finally代码块执行,即processWorkerExit(Worker w,boolean completedAbruptly)方法执行,该方法执行完后,工作线程任务执行结束,同样该线程也结束。

1.3 processWorkerExit方法源码

private void processWorkerExit(Worker w, boolean completedAbruptly) 
        //1按照外面传递的参数是true,所以执行decrementWorkerCount方法。
        if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
        //2方法逻辑很简单就是将工作线程数减一
            decrementWorkerCount();
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try 
            //3修改线程池工作完成任务数量即累加当前工作线程已完成任务
            completedTaskCount += w.completedTasks;
            //4将当前工作线程对象从HashSet即工作队列中删除
            workers.remove(w);
         finally 
            mainLock.unlock();
        

        tryTerminate();

        int c = ctl.get();
        if (runStateLessThan(c, STOP)) 
            //5因为异常情况这里传递参数为true所以不执行
            if (!completedAbruptly) 
                int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
                if (min == 0 && ! workQueue.isEmpty())
                    min = 1;
                if (workerCountOf(c) >= min)
                    return; // replacement not needed
            
            //6添加一个空任务工作线程
            addWorker(null, false);
        
    

        processWorkerExit(Worker w, boolean completedAbruptly)方法执行中使用mainLock加锁,因为这里要操作工作线程集合即HashSet<Worker> 集合。

        方法执行到标记2更新ctl变量减少工作线程数,执行到标记4将当前工作线程从HashSet<Worker>集合删除。这是如果调用getPoolSize()则工作线程数相比异常之前会减一。

        当方法执行到标记6时会添加一个空任务来新增一个工作线程,具体代码见下面分析。

1.4 addWorker方法

//1新建任务线程
Worker w = null;
try 
    w = new Worker(firstTask);
    final Thread t = w.thread;
    if (t != null) 
        //2要操作HashSet<Worker>集合所以要加锁
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try 
            //3线程池运行状态
            int rs = runStateOf(ctl.get());
            //4如果运行状态小于shutdown则执行if,因为当前是Running状态所以满足
            if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) 
                if (t.isAlive()) // precheck that t is startable
                    throw new IllegalThreadStateException();
                //5HashSet<Worker>类添加工作线程
                workers.add(w);
                int s = workers.size();
                if (s > largestPoolSize)
                    largestPoolSize = s;
                //6工作线程添加成功
                workerAdded = true;
            
         finally 
            //7锁释放
            mainLock.unlock();
        
        if (workerAdded) 
           //8工作线程启动,又开始执行runWorker方法
            t.start();
            workerStarted = true;
        
    
 finally 
    if (! workerStarted)
        addWorkerFailed(w);

        addWorker方法就干了一件事即新添加一个Worker对象并启动其线程执行Worker类run方法。这样processWorkerExit方法执行完毕,也就是业务异常后的逻辑都处理完毕,即Worker类run方法中的自旋方法runWorker结束,之前执行业务任务异常的工作线程就此结束,新创建工作线程添加成功并启动。

1.5 总结

        通过上面分析发现,当线程任务报异常后会被runWorker中的try、catch、finally捕获,但是在抛出捕获异常前会调用processWorkerExit方法,该方法中做具体处理,其中就包括新生成一个工作线程添加到工作队列中即HashSet<Worker>,并将当前工作线程即当前Worker类从HashSet<Worker>中移除。这些事情处理完后会将当前执行线程异常抛出即runWorker 方法中的while自旋结束,这样当前工作线程run方法结束,线程结束。因为没有异常处理器所以异常会被抛出并打印栈异常信息。

二、ThreadPoolExceute.submit() 方法中业务代码异常分析


        使用submit添加的任务都是有返回值的任务,也就是返回Future接口实现类即FutureTask,然后从FutureTask中获得执行结果。

2.1 submit方法任务提交

//Callable任务被封装为RunnableFuture任务类并调用execute方法,同时吧RunnableFuture类返回给应用程序
public <T> Future<T> submit(Callable<T> task) 
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task);
        execute(ftask);
        return ftask;
    

        上面submit方法中会将Callable接口封装为FutureTask类而FutureTask类是实现了RunnableFuture所以这个类既可以获得执行结果,又可以作为线程任务。

2.2 submit方法中调用execute方法

        submit方法中调用execute方法,最后执行的还是ThreadPoolExecute组件内部Worker类的runWorker方法而这个方法中会直接调用FutureTask中run方法,那我们看一下这个类中run方法执行流程。

2.3 FutureTask类中run方法

//FutureTask任务类中run方法包装,其实线程中run方法中会直接调用FutureTask任务类的run方法即当前方法
public void run() 
            //1判断FutureTask任务状态,如果任务状态不是NEW即新建状态则结束
        if (state != NEW ||
            !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                         null, Thread.currentThread()))
            return;
        try 
            //2任务赋值,将执行的任务类赋值给变量
            Callable<V> c = callable;
            if (c != null && state == NEW) 
                V result;
                boolean ran;
                try 
                    //3调用应用程序实现Callable接口类任务并返回结果
                    result = c.call();
                    ran = true;
                 catch (Throwable ex) 
                    //5如果业务任务异常则将异常复制给结果,这里后续再详细看
                    result = null;
                    ran = false;
                    setException(ex);
                
                //4如果任务无异常即正常执行完则将结果赋值给变量
                if (ran)
                    set(result);
            
         finally 
            // runner must be non-null until state is settled to
            // prevent concurrent calls to run()
            runner = null;
            // state must be re-read after nulling runner to prevent
            // leaked interrupts
            int s = state;
            if (s >= INTERRUPTING)
                handlePossibleCancellationInterrupt(s);
        
    

        通过上面代码分析可以看出任务执行后有两步操作,要么是正常执行完,要么是执行异常,处理异常结果。下面分析一下正常执行完后结果赋值处理。

2.4 FutureTask中的set方法

protected void set(V v) 
        //1设置FutureTask任务状态从new变为compile
        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) 
            //2如果状态修改成功则将执行结果赋值给outcome变量
            outcome = v;
           //3执行结果赋值给变量后FutureTask任务状态达到终态
            UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
            finishCompletion();
        
    

        如果业务任务不出现异常则FutureTask类中run方法执行完之后调用set方法修改FutureTask任务类状态并将结果赋值给变量。

        注意:上面方法中finishCompletion() 方法中有任务队列这个概念,这个要深入追究一下具体在什么场景下回出现。

2.5 FutureTask类中setException异常类

//FutureTask任务类run方法中异常处理
 protected void setException(Throwable t) 
        //1更新任务状态为处理中
        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) 
            //2执行结果赋值,这里的结果为异常
            outcome = t;
            //3更新任务状态为异常
            UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
            finishCompletion();
        
    

从上面方法看,当业务任务执行异常后不会像execute中执行的Runnable任务一样将异常直接抛出来而是将异常赋值给返回值。

2.6 FutureTask类中get方法

//调用FutureTask类get方法获取执行结果
 public V get() throws InterruptedException, ExecutionException 
        int s = state;
        //1如果任务还没结束则等待,否则调用report方法返回结果
        if (s <= COMPLETING)
            s = awaitDone(false, 0L);
        //2结果返回
        return report(s);
    

2.7 report方法返回结果

//FutureTask任务类执行结果获取
    private V report(int s) throws ExecutionException 
        //1outcome就是任务执行结果
        Object x = outcome;
        //2判断FutureTask任务状态,如果是终态即NORMAL就返回结果即如果业务任务没有异常则正常返回结果
        if (s == NORMAL)
            return (V)x;
        //3如果不是NORMAL则抛异常,如果任务是被取消、中断则抛一个取消异常
        if (s >= CANCELLED)
            throw new CancellationException();
        //4如果业务中出现异常则新生成一个异常并抛出来,也就是说FutureTask任务类的业务异常会在get时体现出来。    
        throw new ExecutionException((Throwable)x);
    

2.8 总结

2.8.1 FutureTask任务异常处理

从上面代码分析可以看出,如果任务是FutureTask类则业务任务异常后不会影响到线程池工作线程而是run方法中有捕获检测异常并发异常赋值给结果存储变量中,当调用FutureTask类的get方法获取执行结果时会判断FutureTask任务当前状态,如果是NORMAL则正常返回执行结果,如果是取消则返回取消异常,否则返回业务中具体异常。

2.8.2 Runnable 任务异常处理

Runnable类中run方法中没有对异常做处理所以Worker类的runWorker类中做处理。而FutureTask任务类run方法中对业务异常做了处理所以不会影响到任务工作类。

三、延伸思考


3.1 ThreadPoolExecute线程池的实现除了核心的几个成员变量,主要处理逻辑在Worker类和runWorker方法中。这里的Worker内部类巧妙使用完成了线程任务执行、外部类方法调用、外部类私有属性访问,如果不使用Worker内部类能否实现线程池。

3.2 shutdown方法中空闲工作线程被标记为中断,而shutdownNow方中所有工作线程被标记为中断,在线程池这个场景下他们有什么实质性区别。

以上是关于线程池任务异常处理的主要内容,如果未能解决你的问题,请参考以下文章

线程池任务异常处理

线程池任务异常处理

线程池任务异常处理

JAVA 线程池 其中一个线程执行失败 则线程重新执行或者重新提交任务 急

Java并发程序设计(10)线程池之任务结果的异常处理

pythonThreadpool线程池任务终止简单示例