处理来自 Java ExecutorService 任务的异常

Posted

技术标签:

【中文标题】处理来自 Java ExecutorService 任务的异常【英文标题】:Handling exceptions from Java ExecutorService tasks 【发布时间】:2011-01-15 22:17:56 【问题描述】:

我正在尝试使用 Java 的 ThreadPoolExecutor 类以固定数量的线程运行大量重量级任务。每个任务都有很多地方可能会因为异常而失败。

我已经继承了ThreadPoolExecutor 并且我已经覆盖了afterExecute 方法,该方法应该提供在运行任务时遇到的任何未捕获的异常。但是,我似乎无法让它工作。

例如:

public class ThreadPoolErrors extends ThreadPoolExecutor 
    public ThreadPoolErrors() 
        super(  1, // core threads
                1, // max threads
                1, // timeout
                TimeUnit.MINUTES, // timeout units
                new LinkedBlockingQueue<Runnable>() // work queue
        );
    

    protected void afterExecute(Runnable r, Throwable t) 
        super.afterExecute(r, t);
        if(t != null) 
            System.out.println("Got an error: " + t);
         else 
            System.out.println("Everything's fine--situation normal!");
        
    

    public static void main( String [] args) 
        ThreadPoolErrors threadPool = new ThreadPoolErrors();
        threadPool.submit( 
                new Runnable() 
                    public void run() 
                        throw new RuntimeException("Ouch! Got an error.");
                    
                
        );
        threadPool.shutdown();
    

这个程序的输出是“一切正常——情况正常!”即使提交给线程池的唯一 Runnable 抛出异常。有什么线索知道这里发生了什么吗?

谢谢!

【问题讨论】:

你从来没有问过任务的未来,那里发生了什么。整个服务执行器或程序不会崩溃。异常被捕获并包装在 ExecutionException 下。如果你调用future.get(),他会重新抛出。 PS:future.isDone() [请阅读真正的 api 名称] 将返回 true,即使可运行对象错误地完成。因为任务是真实完成的。 【参考方案1】:

警告:需要注意的是这种解决方案会阻塞调用线程。


如果你想处理任务抛出的异常,那么通常使用Callable而不是Runnable更好。

Callable.call() 被允许抛出检查异常,这些异常被传播回调用线程:

Callable task = ...
Future future = executor.submit(task);
try 
   future.get();
 catch (ExecutionException ex) 
   ex.getCause().printStackTrace();

如果Callable.call() 抛出异常,这将被包裹在ExecutionException 中并由Future.get() 抛出。

这可能比继承ThreadPoolExecutor 更可取。如果异常是可恢复的,它还使您有机会重新提交任务。

【讨论】:

> Callable.call() 允许抛出已检查异常,这些异常会传播回调用线程: 请注意,抛出的异常只会传播到调用线程如果 future.get() 或其重载版本被调用。 很完美,但是如果我并行运行任务并且不想阻塞执行怎么办? 不要使用这个解决方案,因为它破坏了使用 ExecutorService 的整个目的。 ExecutorService 是一种异步执行机制,能够在后台执行任务。如果你在执行后立即调用future.get(),它将阻塞调用线程,直到任务完成。 这个解决方案的评价不应该那么高。 Future.get() 同步工作,并将充当阻止程序,直到 Runnable 或 Callable 被执行,并且如上所述破坏了使用 Executor Service 的目的 正如#nhylated 所指出的,这值得一个jdk BUG。如果未调用 Future.get(),则任何来自 Callable 的未捕获异常都会被静默忽略。非常糟糕的设计.... 只花了 1 天以上的时间来找出一个使用它的库,而 jdk 默默地忽略了异常。而且,这在 jdk12 中仍然存在。【参考方案2】:

来自docs:

注意:当动作包含在 任务(例如 FutureTask) 显式地或通过诸如 提交,这些任务对象捕获并 维护计算异常,以及 所以它们不会导致突然 终止,和内部 异常不会传递给这个 方法。

当您提交 Runnable 时,它​​会被包裹在 Future 中。

你的 afterExecute 应该是这样的:

public final class ExtendedExecutor extends ThreadPoolExecutor 

    // ...

    protected void afterExecute(Runnable r, Throwable t) 
        super.afterExecute(r, t);
        if (t == null && r instanceof Future<?>) 
            try 
                Future<?> future = (Future<?>) r;
                if (future.isDone()) 
                    future.get();
                
             catch (CancellationException ce) 
                t = ce;
             catch (ExecutionException ee) 
                t = ee.getCause();
             catch (InterruptedException ie) 
                Thread.currentThread().interrupt();
            
        
        if (t != null) 
            System.out.println(t);
        
    

【讨论】:

谢谢,我最终使用了这个解决方案。此外,如果有人感兴趣:其他人建议不要继承 ExecutorService,但我还是这样做了,因为我想在任务完成时监控它们,而不是等待所有任务终止,然后在所有返回的 Futures 上调用 get() . 另一种子类化执行器的方法是继承 FutureTask 并覆盖其“完成”方法 Tom >> 您能否发布您的示例 sn-p 代码,其中您将 ExecutorService 子类化以在任务完成时监控它们... 如果您使用 ComplableFuture.runAsync,此答案将不起作用,因为 afterExecute 将包含一个包私有的对象,并且无法访问 throwable。我通过结束通话来解决它。请参阅下面的答案。 我们是否必须使用future.isDone()检查未来是否完成?由于afterExecuteRunnable 完成后运行,我假设future.isDone() 总是返回true【参考方案3】:

这种行为的解释就在javadoc for afterExecute:

注意:当动作包含在 任务(例如 FutureTask) 显式地或通过诸如 提交,这些任务对象捕获并 维护计算异常,以及 所以它们不会导致突然 终止,和内部 异常不会传递给这个 方法。

【讨论】:

【参考方案4】:

我通过包装提交给执行程序的提供的可运行文件来解决它。

CompletableFuture.runAsync(() -> 
        try 
              runnable.run();
         catch (Throwable e) 
              Log.info(Concurrency.class, "runAsync", e);
        
, executorService);

【讨论】:

可以使用CompletableFuturewhenComplete()方法提高可读性。 @EduardWirch 这行得通,但你不能从 whenComplete() 中抛出异常【参考方案5】:

我正在使用来自jcabi-log 的VerboseRunnable 类,它会吞下所有异常并记录它们。很方便,例如:

import com.jcabi.log.VerboseRunnable;
scheduler.scheduleWithFixedDelay(
  new VerboseRunnable(
    Runnable() 
      public void run()  
        // the code, which may throw
      
    ,
    true // it means that all exceptions will be swallowed and logged
  ),
  1, 1, TimeUnit.MILLISECONDS
);

【讨论】:

【参考方案6】:

另一个解决方案是使用 ManagedTaskManagedTaskListener

您需要一个 CallableRunnable 来实现接口 ManagedTask

方法getManagedTaskListener返回你想要的实例。

public ManagedTaskListener getManagedTaskListener() 

您在 ManagedTaskListener 中实现 taskDone 方法:

@Override
public void taskDone(Future<?> future, ManagedExecutorService executor, Object task, Throwable exception) 
    if (exception != null) 
        LOGGER.log(Level.SEVERE, exception.getMessage());
    

更多关于managed task lifecycle and listener的细节。

【讨论】:

【参考方案7】:

这行得通

它是从 SingleThreadExecutor 派生的,但你可以很容易地适应它 Java 8 lamdas 代码,但易于修复

它会创建一个单线程的Executor,可以处理很多任务;并将等待当前一个结束执行以开始下一个

如果发生未捕获的错误或异常,uncaughtExceptionHandler 将捕获它

公共最终类 SingleThreadExecutorWithExceptions 公共静态 ExecutorService newSingleThreadExecutorWithExceptions(final Thread.UncaughtExceptionHandler uncaughtExceptionHandler) ThreadFactory 工厂 = (Runnable runnable) -> 最终线程 newThread = new Thread(runnable, "SingleThreadExecutorWithExceptions"); newThread.setUncaughtExceptionHandler((final Thread cugthThread,final Throwable throwable) -> uncaughtExceptionHandler.uncaughtException(caughthThread, throwable); ); 返回新线程; ; 返回新的 FinalizableDelegatedExecutorService (新的线程池执行器(1, 1, 0L,时间单位.毫秒, 新的 LinkedBlockingQueue(), 工厂) protected void afterExecute(Runnable runnable, Throwable throwable) super.afterExecute(runnable, throwable); if (throwable == null && runnable instanceof Future) 尝试 未来的未来=(未来)可运行; 如果 (future.isDone()) 未来.get(); 捕捉(CancellationException ce) 可投掷 = ce; 捕捉(ExecutionException ee) throwable = ee.getCause(); 捕捉(InterruptedException 即) Thread.currentThread().interrupt(); // 忽略/重置 if (throwable != null) uncaughtExceptionHandler.uncaughtException(Thread.currentThread(),throwable); ); 私有静态类 FinalizableDelegatedExecutorService 扩展 DelegatedExecutorService FinalizableDelegatedExecutorService(ExecutorService 执行者) 超级(执行者); 受保护的无效finalize() super.shutdown(); /** * 仅公开 ExecutorService 方法的包装类 * 的 ExecutorService 实现。 */ 私有静态类 DelegatedExecutorService 扩展 AbstractExecutorService 私有最终 ExecutorService e; DelegatedExecutorService(ExecutorService 执行者) e = 执行者; 公共无效执行(可运行命令) e.execute(命令); 公共无效关机() e.shutdown(); 公共列表 shutdownNow() 返回 e.shutdownNow(); public boolean isShutdown() return e.isShutdown(); public boolean isTerminated() return e.isTerminated(); public boolean awaitTermination(long timeout, TimeUnit unit) 抛出 InterruptedException 返回 e.awaitTermination(超时,单位); 公共未来提交(可运行任务) 返回 e.submit(task); 公共未来提交(可调用任务) 返回 e.submit(task); 公共未来提交(可运行任务,T结果) 返回 e.submit(任务,结果); 公共列表>invokeAll(集合>任务) 抛出 InterruptedException 返回 e.invokeAll(任务); 公共列表>调用所有(集合>任务, 长超时,TimeUnit 单位) 抛出 InterruptedException return e.invokeAll(tasks, timeout, unit); 公共T调用Any(集合>任务) 抛出 InterruptedException, ExecutionException 返回 e.invokeAny(tasks); 公共 T 调用任何(集合>任务, 长超时,TimeUnit 单位) 抛出 InterruptedException、ExecutionException、TimeoutException 返回 e.invokeAny(任务,超时,单位); 私有 SingleThreadExecutorWithExceptions()

【讨论】:

不幸的是,使用 finalize 有点不稳定,因为它只会在“稍后在垃圾收集器收集它时”被调用(或者在线程的情况下可能不会,不知道)......跨度> 【参考方案8】:

如果您想监控任务的执行,您可以旋转 1 或 2 个线程(可能更多取决于负载)并使用它们从 ExecutionCompletionService 包装器中获取任务。

【讨论】:

【参考方案9】:

如果您的 ExecutorService 来自外部源(即,不能继承 ThreadPoolExecutor 并覆盖 afterExecute()),您可以使用动态代理来实现所需的行为:

public static ExecutorService errorAware(final ExecutorService executor) 
    return (ExecutorService) Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(),
            new Class[] ExecutorService.class,
            (proxy, method, args) -> 
                if (method.getName().equals("submit")) 
                    final Object arg0 = args[0];
                    if (arg0 instanceof Runnable) 
                        args[0] = new Runnable() 
                            @Override
                            public void run() 
                                final Runnable task = (Runnable) arg0;
                                try 
                                    task.run();
                                    if (task instanceof Future<?>) 
                                        final Future<?> future = (Future<?>) task;

                                        if (future.isDone()) 
                                            try 
                                                future.get();
                                             catch (final CancellationException ce) 
                                                // Your error-handling code here
                                                ce.printStackTrace();
                                             catch (final ExecutionException ee) 
                                                // Your error-handling code here
                                                ee.getCause().printStackTrace();
                                             catch (final InterruptedException ie) 
                                                Thread.currentThread().interrupt();
                                            
                                        
                                    
                                 catch (final RuntimeException re) 
                                    // Your error-handling code here
                                    re.printStackTrace();
                                    throw re;
                                 catch (final Error e) 
                                    // Your error-handling code here
                                    e.printStackTrace();
                                    throw e;
                                
                            
                        ;
                     else if (arg0 instanceof Callable<?>) 
                        args[0] = new Callable<Object>() 
                            @Override
                            public Object call() throws Exception 
                                final Callable<?> task = (Callable<?>) arg0;
                                try 
                                    return task.call();
                                 catch (final Exception e) 
                                    // Your error-handling code here
                                    e.printStackTrace();
                                    throw e;
                                 catch (final Error e) 
                                    // Your error-handling code here
                                    e.printStackTrace();
                                    throw e;
                                
                            
                        ;
                    
                
                return method.invoke(executor, args);
            );

【讨论】:

【参考方案10】:

这是因为AbstractExecutorService :: submit 将您的runnable 包装成RunnableFuture(除了FutureTask),如下所示

AbstractExecutorService.java

public Future<?> submit(Runnable task) 
    if (task == null) throw new NullPointerException();
    RunnableFuture<Void> ftask = newTaskFor(task, null); /////////HERE////////
    execute(ftask);
    return ftask;

然后execute 将它传递给WorkerWorker.run() 将调用下面。

ThreadPoolExecutor.java

final void runWorker(Worker w) 
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try 
        while (task != null || (task = getTask()) != null) 
            w.lock();
            // If pool is stopping, ensure thread is interrupted;
            // if not, ensure thread is not interrupted.  This
            // requires a recheck in second case to deal with
            // shutdownNow race while clearing interrupt
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try 
                beforeExecute(wt, task);
                Throwable thrown = null;
                try 
                    task.run();           /////////HERE////////
                 catch (RuntimeException x) 
                    thrown = x; throw x;
                 catch (Error x) 
                    thrown = x; throw x;
                 catch (Throwable x) 
                    thrown = x; throw new Error(x);
                 finally 
                    afterExecute(task, thrown);
                
             finally 
                task = null;
                w.completedTasks++;
                w.unlock();
            
        
        completedAbruptly = false;
     finally 
        processWorkerExit(w, completedAbruptly);
    

最后task.run();在上面的代码调用中会调用 FutureTask.run()。这是异常处理程序代码,因为 这你没有得到预期的异常。

class FutureTask<V> implements RunnableFuture<V>

public void run() 
    if (state != NEW ||
        !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                     null, Thread.currentThread()))
        return;
    try 
        Callable<V> c = callable;
        if (c != null && state == NEW) 
            V result;
            boolean ran;
            try 
                result = c.call();
                ran = true;
             catch (Throwable ex)    /////////HERE////////
                result = null;
                ran = false;
                setException(ex);
            
            if (ran)
                set(result);
        
     finally 
        // runner must be non-null until state is settled to
        // prevent concurrent calls to run()
        runner = null;
        // state must be re-read after nulling runner to prevent
        // leaked interrupts
        int s = state;
        if (s >= INTERRUPTING)
            handlePossibleCancellationInterrupt(s);
    

【讨论】:

【参考方案11】:

这类似于 mmm 的解决方案,但更容易理解。让您的任务扩展一个包含 run() 方法的抽象类。

public abstract Task implements Runnable 

    public abstract void execute();

    public void run() 
      try 
        execute();
       catch (Throwable t) 
        // handle it  
      
    



public MySampleTask extends Task 
    public void execute() 
        // heavy, error-prone code here
    

【讨论】:

【参考方案12】:

我会为它提供一个 ThreadFactory 实例来创建新线程并为它们提供一个 UncaughtExceptionHandler,而不是子类化 ThreadPoolExecutor

【讨论】:

我也试过这个,但似乎从未调用 uncaughtException 方法。我相信这是因为 ThreadPoolExecutor 类中的工作线程正在捕获异常。 未调用 uncaughtException 方法,因为 ExecutorService 的 submit 方法将 Callable/Runnable 包装在 Future 中;异常正在那里被捕获。 如果你使用 execute(): void,而不是 submit():Future,它应该可以工作。

以上是关于处理来自 Java ExecutorService 任务的异常的主要内容,如果未能解决你的问题,请参考以下文章

Java并发包中的几种ExecutorService

Java多线程ExecutorService与ExecutorCompletionService

Jboss Java EE 容器和一个 ExecutorService

使用CompletionService结合ExecutorService批处理调用存储过程任务实例

Java:在特定队列大小后阻止提交的 ExecutorService [重复]

Java ExecutorService - 处于等待状态的线程