处理来自 Java ExecutorService 任务的异常
Posted
技术标签:
【中文标题】处理来自 Java ExecutorService 任务的异常【英文标题】:Handling exceptions from Java ExecutorService tasks 【发布时间】:2011-01-15 22:17:56 【问题描述】:我正在尝试使用 Java 的 ThreadPoolExecutor
类以固定数量的线程运行大量重量级任务。每个任务都有很多地方可能会因为异常而失败。
我已经继承了ThreadPoolExecutor
并且我已经覆盖了afterExecute
方法,该方法应该提供在运行任务时遇到的任何未捕获的异常。但是,我似乎无法让它工作。
例如:
public class ThreadPoolErrors extends ThreadPoolExecutor
public ThreadPoolErrors()
super( 1, // core threads
1, // max threads
1, // timeout
TimeUnit.MINUTES, // timeout units
new LinkedBlockingQueue<Runnable>() // work queue
);
protected void afterExecute(Runnable r, Throwable t)
super.afterExecute(r, t);
if(t != null)
System.out.println("Got an error: " + t);
else
System.out.println("Everything's fine--situation normal!");
public static void main( String [] args)
ThreadPoolErrors threadPool = new ThreadPoolErrors();
threadPool.submit(
new Runnable()
public void run()
throw new RuntimeException("Ouch! Got an error.");
);
threadPool.shutdown();
这个程序的输出是“一切正常——情况正常!”即使提交给线程池的唯一 Runnable 抛出异常。有什么线索知道这里发生了什么吗?
谢谢!
【问题讨论】:
你从来没有问过任务的未来,那里发生了什么。整个服务执行器或程序不会崩溃。异常被捕获并包装在 ExecutionException 下。如果你调用future.get(),他会重新抛出。 PS:future.isDone() [请阅读真正的 api 名称] 将返回 true,即使可运行对象错误地完成。因为任务是真实完成的。 【参考方案1】:警告:需要注意的是这种解决方案会阻塞调用线程。
如果你想处理任务抛出的异常,那么通常使用Callable
而不是Runnable
更好。
Callable.call()
被允许抛出检查异常,这些异常被传播回调用线程:
Callable task = ...
Future future = executor.submit(task);
try
future.get();
catch (ExecutionException ex)
ex.getCause().printStackTrace();
如果Callable.call()
抛出异常,这将被包裹在ExecutionException
中并由Future.get()
抛出。
这可能比继承ThreadPoolExecutor
更可取。如果异常是可恢复的,它还使您有机会重新提交任务。
【讨论】:
> Callable.call() 允许抛出已检查异常,这些异常会传播回调用线程: 请注意,抛出的异常只会传播到调用线程如果future.get()
或其重载版本被调用。
很完美,但是如果我并行运行任务并且不想阻塞执行怎么办?
不要使用这个解决方案,因为它破坏了使用 ExecutorService 的整个目的。 ExecutorService 是一种异步执行机制,能够在后台执行任务。如果你在执行后立即调用future.get(),它将阻塞调用线程,直到任务完成。
这个解决方案的评价不应该那么高。 Future.get() 同步工作,并将充当阻止程序,直到 Runnable 或 Callable 被执行,并且如上所述破坏了使用 Executor Service 的目的
正如#nhylated 所指出的,这值得一个jdk BUG。如果未调用 Future.get(),则任何来自 Callable 的未捕获异常都会被静默忽略。非常糟糕的设计.... 只花了 1 天以上的时间来找出一个使用它的库,而 jdk 默默地忽略了异常。而且,这在 jdk12 中仍然存在。【参考方案2】:
来自docs:
注意:当动作包含在 任务(例如 FutureTask) 显式地或通过诸如 提交,这些任务对象捕获并 维护计算异常,以及 所以它们不会导致突然 终止,和内部 异常不会传递给这个 方法。
当您提交 Runnable 时,它会被包裹在 Future 中。
你的 afterExecute 应该是这样的:
public final class ExtendedExecutor extends ThreadPoolExecutor
// ...
protected void afterExecute(Runnable r, Throwable t)
super.afterExecute(r, t);
if (t == null && r instanceof Future<?>)
try
Future<?> future = (Future<?>) r;
if (future.isDone())
future.get();
catch (CancellationException ce)
t = ce;
catch (ExecutionException ee)
t = ee.getCause();
catch (InterruptedException ie)
Thread.currentThread().interrupt();
if (t != null)
System.out.println(t);
【讨论】:
谢谢,我最终使用了这个解决方案。此外,如果有人感兴趣:其他人建议不要继承 ExecutorService,但我还是这样做了,因为我想在任务完成时监控它们,而不是等待所有任务终止,然后在所有返回的 Futures 上调用 get() . 另一种子类化执行器的方法是继承 FutureTask 并覆盖其“完成”方法 Tom >> 您能否发布您的示例 sn-p 代码,其中您将 ExecutorService 子类化以在任务完成时监控它们... 如果您使用 ComplableFuture.runAsync,此答案将不起作用,因为 afterExecute 将包含一个包私有的对象,并且无法访问 throwable。我通过结束通话来解决它。请参阅下面的答案。 我们是否必须使用future.isDone()
检查未来是否完成?由于afterExecute
在Runnable
完成后运行,我假设future.isDone()
总是返回true
。【参考方案3】:
这种行为的解释就在javadoc for afterExecute:
注意:当动作包含在 任务(例如 FutureTask) 显式地或通过诸如 提交,这些任务对象捕获并 维护计算异常,以及 所以它们不会导致突然 终止,和内部 异常不会传递给这个 方法。
【讨论】:
【参考方案4】:我通过包装提交给执行程序的提供的可运行文件来解决它。
CompletableFuture.runAsync(() ->
try
runnable.run();
catch (Throwable e)
Log.info(Concurrency.class, "runAsync", e);
, executorService);
【讨论】:
可以使用CompletableFuture
的whenComplete()
方法提高可读性。
@EduardWirch 这行得通,但你不能从 whenComplete() 中抛出异常【参考方案5】:
我正在使用来自jcabi-log 的VerboseRunnable
类,它会吞下所有异常并记录它们。很方便,例如:
import com.jcabi.log.VerboseRunnable;
scheduler.scheduleWithFixedDelay(
new VerboseRunnable(
Runnable()
public void run()
// the code, which may throw
,
true // it means that all exceptions will be swallowed and logged
),
1, 1, TimeUnit.MILLISECONDS
);
【讨论】:
【参考方案6】:另一个解决方案是使用 ManagedTask 和 ManagedTaskListener。
您需要一个 Callable 或 Runnable 来实现接口 ManagedTask。
方法getManagedTaskListener
返回你想要的实例。
public ManagedTaskListener getManagedTaskListener()
您在 ManagedTaskListener 中实现 taskDone
方法:
@Override
public void taskDone(Future<?> future, ManagedExecutorService executor, Object task, Throwable exception)
if (exception != null)
LOGGER.log(Level.SEVERE, exception.getMessage());
更多关于managed task lifecycle and listener的细节。
【讨论】:
【参考方案7】:这行得通
它是从 SingleThreadExecutor 派生的,但你可以很容易地适应它 Java 8 lamdas 代码,但易于修复它会创建一个单线程的Executor,可以处理很多任务;并将等待当前一个结束执行以开始下一个
如果发生未捕获的错误或异常,uncaughtExceptionHandler 将捕获它
公共最终类 SingleThreadExecutorWithExceptions 公共静态 ExecutorService newSingleThreadExecutorWithExceptions(final Thread.UncaughtExceptionHandler uncaughtExceptionHandler) ThreadFactory 工厂 = (Runnable runnable) -> 最终线程 newThread = new Thread(runnable, "SingleThreadExecutorWithExceptions"); newThread.setUncaughtExceptionHandler((final Thread cugthThread,final Throwable throwable) -> uncaughtExceptionHandler.uncaughtException(caughthThread, throwable); ); 返回新线程; ; 返回新的 FinalizableDelegatedExecutorService (新的线程池执行器(1, 1, 0L,时间单位.毫秒, 新的 LinkedBlockingQueue(), 工厂) protected void afterExecute(Runnable runnable, Throwable throwable) super.afterExecute(runnable, throwable); if (throwable == null && runnable instanceof Future) 尝试 未来的未来=(未来)可运行; 如果 (future.isDone()) 未来.get(); 捕捉(CancellationException ce) 可投掷 = ce; 捕捉(ExecutionException ee) throwable = ee.getCause(); 捕捉(InterruptedException 即) Thread.currentThread().interrupt(); // 忽略/重置 if (throwable != null) uncaughtExceptionHandler.uncaughtException(Thread.currentThread(),throwable); ); 私有静态类 FinalizableDelegatedExecutorService 扩展 DelegatedExecutorService FinalizableDelegatedExecutorService(ExecutorService 执行者) 超级(执行者); 受保护的无效finalize() super.shutdown(); /** * 仅公开 ExecutorService 方法的包装类 * 的 ExecutorService 实现。 */ 私有静态类 DelegatedExecutorService 扩展 AbstractExecutorService 私有最终 ExecutorService e; DelegatedExecutorService(ExecutorService 执行者) e = 执行者; 公共无效执行(可运行命令) e.execute(命令); 公共无效关机() e.shutdown(); 公共列表 shutdownNow() 返回 e.shutdownNow(); public boolean isShutdown() return e.isShutdown(); public boolean isTerminated() return e.isTerminated(); public boolean awaitTermination(long timeout, TimeUnit unit) 抛出 InterruptedException 返回 e.awaitTermination(超时,单位); 公共未来提交(可运行任务) 返回 e.submit(task); 公共未来提交(可调用任务) 返回 e.submit(task); 公共未来提交(可运行任务,T结果) 返回 e.submit(任务,结果); 公共列表>invokeAll(集合>任务) 抛出 InterruptedException 返回 e.invokeAll(任务); 公共列表>调用所有(集合>任务, 长超时,TimeUnit 单位) 抛出 InterruptedException return e.invokeAll(tasks, timeout, unit); 公共T调用Any(集合>任务) 抛出 InterruptedException, ExecutionException 返回 e.invokeAny(tasks); 公共 T 调用任何(集合>任务, 长超时,TimeUnit 单位) 抛出 InterruptedException、ExecutionException、TimeoutException 返回 e.invokeAny(任务,超时,单位); 私有 SingleThreadExecutorWithExceptions()【讨论】:
不幸的是,使用 finalize 有点不稳定,因为它只会在“稍后在垃圾收集器收集它时”被调用(或者在线程的情况下可能不会,不知道)......跨度> 【参考方案8】:如果您想监控任务的执行,您可以旋转 1 或 2 个线程(可能更多取决于负载)并使用它们从 ExecutionCompletionService 包装器中获取任务。
【讨论】:
【参考方案9】:如果您的 ExecutorService
来自外部源(即,不能继承 ThreadPoolExecutor
并覆盖 afterExecute()
),您可以使用动态代理来实现所需的行为:
public static ExecutorService errorAware(final ExecutorService executor)
return (ExecutorService) Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(),
new Class[] ExecutorService.class,
(proxy, method, args) ->
if (method.getName().equals("submit"))
final Object arg0 = args[0];
if (arg0 instanceof Runnable)
args[0] = new Runnable()
@Override
public void run()
final Runnable task = (Runnable) arg0;
try
task.run();
if (task instanceof Future<?>)
final Future<?> future = (Future<?>) task;
if (future.isDone())
try
future.get();
catch (final CancellationException ce)
// Your error-handling code here
ce.printStackTrace();
catch (final ExecutionException ee)
// Your error-handling code here
ee.getCause().printStackTrace();
catch (final InterruptedException ie)
Thread.currentThread().interrupt();
catch (final RuntimeException re)
// Your error-handling code here
re.printStackTrace();
throw re;
catch (final Error e)
// Your error-handling code here
e.printStackTrace();
throw e;
;
else if (arg0 instanceof Callable<?>)
args[0] = new Callable<Object>()
@Override
public Object call() throws Exception
final Callable<?> task = (Callable<?>) arg0;
try
return task.call();
catch (final Exception e)
// Your error-handling code here
e.printStackTrace();
throw e;
catch (final Error e)
// Your error-handling code here
e.printStackTrace();
throw e;
;
return method.invoke(executor, args);
);
【讨论】:
【参考方案10】:这是因为AbstractExecutorService :: submit
将您的runnable
包装成RunnableFuture
(除了FutureTask
),如下所示
AbstractExecutorService.java
public Future<?> submit(Runnable task)
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null); /////////HERE////////
execute(ftask);
return ftask;
然后execute
将它传递给Worker
和Worker.run()
将调用下面。
ThreadPoolExecutor.java
final void runWorker(Worker w)
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try
while (task != null || (task = getTask()) != null)
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try
beforeExecute(wt, task);
Throwable thrown = null;
try
task.run(); /////////HERE////////
catch (RuntimeException x)
thrown = x; throw x;
catch (Error x)
thrown = x; throw x;
catch (Throwable x)
thrown = x; throw new Error(x);
finally
afterExecute(task, thrown);
finally
task = null;
w.completedTasks++;
w.unlock();
completedAbruptly = false;
finally
processWorkerExit(w, completedAbruptly);
最后
task.run();
在上面的代码调用中会调用FutureTask.run()
。这是异常处理程序代码,因为 这你没有得到预期的异常。
class FutureTask<V> implements RunnableFuture<V>
public void run()
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try
Callable<V> c = callable;
if (c != null && state == NEW)
V result;
boolean ran;
try
result = c.call();
ran = true;
catch (Throwable ex) /////////HERE////////
result = null;
ran = false;
setException(ex);
if (ran)
set(result);
finally
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
【讨论】:
【参考方案11】:这类似于 mmm 的解决方案,但更容易理解。让您的任务扩展一个包含 run() 方法的抽象类。
public abstract Task implements Runnable
public abstract void execute();
public void run()
try
execute();
catch (Throwable t)
// handle it
public MySampleTask extends Task
public void execute()
// heavy, error-prone code here
【讨论】:
【参考方案12】:我会为它提供一个 ThreadFactory 实例来创建新线程并为它们提供一个 UncaughtExceptionHandler,而不是子类化 ThreadPoolExecutor
【讨论】:
我也试过这个,但似乎从未调用 uncaughtException 方法。我相信这是因为 ThreadPoolExecutor 类中的工作线程正在捕获异常。 未调用 uncaughtException 方法,因为 ExecutorService 的 submit 方法将 Callable/Runnable 包装在 Future 中;异常正在那里被捕获。 如果你使用 execute(): void,而不是 submit():Future,它应该可以工作。以上是关于处理来自 Java ExecutorService 任务的异常的主要内容,如果未能解决你的问题,请参考以下文章
Java多线程ExecutorService与ExecutorCompletionService
Jboss Java EE 容器和一个 ExecutorService
使用CompletionService结合ExecutorService批处理调用存储过程任务实例