意外的异步行为:Springs 的 @Async 与 RxJava
Posted
技术标签:
【中文标题】意外的异步行为:Springs 的 @Async 与 RxJava【英文标题】:Unexpected async behavior: Springs's @Async vs RxJava 【发布时间】:2015-11-24 01:46:37 【问题描述】:我在玩 Spring、RxJava 和非阻塞数据处理。在我的测试应用程序中,我想实现以下测试工作流程:
-
[RT] 接收请求
[RT] 在工作线程中开始异步处理
[WT] 做一些(昂贵的)初始化工作
[WT] 异步调用远程系统获取值
[HT] 执行对远程系统的请求
[HT] 将响应结果转发到工作线程
[WT] 使用远程系统的结果做更多(昂贵的)工作
[WT] 返回最终结果
RT:请求线程(Tomcat NIO)
WT : Worker Thread(固定大小为 1 且队列大小为 5 的线程池)
HT : Hystrix Thread(Hystrix 线程池,默认设置)
(这只是一个示例,结合对远程资源的依赖来模拟昂贵的数据处理)
我有两种代码变体:
-
使用
@Async
调用 WT(第 2 步)和 Rx 的 Observable
调用其余部分 (http://localhost:9001/value
)
仅使用 Rx 的 Observables (http://localhost:9001/value-rx
)
(http://localhost:9002/value
是远程资源)
变体 2 运行良好,但变体 1(带有@Async
)遇到了一些问题。通过分析异常、线程转储、线程池状态和日志文件,看起来ListenableFuture
(由步骤2中的@Async
服务方法返回)无限阻塞线程池,线程本身正在等待。因此,RxJava 无法在给定的线程池中运行回调代码(步骤 6)。 30秒后抛出异常,整个进程失败,因为线程池仍然被阻塞,我不明白为什么。
如果我多次使用变体 1,第二个(以及所有后续请求)在步骤 2(而不是 6)中失败,因为线程池(大小 = 1)仍然被 ListenableFuture
(堆栈跟踪如下)。
变体 2 能够“同时”处理多个请求而不会出现问题,直到队列已满,即使只有 1 个请求线程和 1 个工作线程。
在这两种情况下,我都使用this 的修改版本将Observable
的实例映射到ListenableFuture
。
我已向控制器和服务类添加了额外的日志记录。这样可以更轻松地查看代码部分在哪个线程中执行。
为什么 @Async
会导致此问题,我该如何解决?
代码如下:
App1Controller
@Slf4j
@RestController
public class App1Controller
@Autowired
private App1Service app1Service;
@ResponseBody
@RequestMapping("/value")
public ListenableFuture<String> value()
final ListenableFuture<String> future;
log.info("before invoke 'app1Service'");
future = this.app1Service.value();
log.info("after invoke 'app1Service'");
return future;
@ResponseBody
@RequestMapping("/value-rx")
public ListenableFuture<String> valueRx()
final Observable<String> observable;
log.info("before invoke 'app1Service'");
observable = this.app1Service.valueRx();
log.info("after invoke 'app1Service'");
return new ObservableListenableFuture<>(observable);
App1Service
@Slf4j
@Service
public class App1Service
@Autowired
private TaskExecutor taskExecutor;
@Autowired
private App2Service app2Service;
@Async
public ListenableFuture<String> value()
final ListenableFuture<String> future;
log.info("before start processing");
this.doSomeStuff();
future = new ObservableListenableFuture<>(this.valueFromApp2Service());
log.info("after start processing");
return future;
public Observable<String> valueRx()
final Observable<String> observable;
log.info("before start processing");
observable = Observable.<String>create(s ->
this.doSomeStuff();
this.valueFromApp2Service().subscribe(
result ->
log.info("next (processing)");
s.onNext(result);
,
throwable ->
log.info("error (processing)");
s.onError(throwable);
,
() ->
log.info("completed (processing)");
s.onCompleted();
);
).subscribeOn(Schedulers.from(this.taskExecutor));
log.info("after start processing");
return observable;
private Observable<String> valueFromApp2Service()
final AsyncSubject<String> asyncSubject;
log.info("before invoke 'app2Service'");
asyncSubject = AsyncSubject.create();
this.app2Service.value().observeOn(Schedulers.from(this.taskExecutor)).subscribe(
result ->
log.info("next (from 'app2Service')");
asyncSubject.onNext(this.doSomeMoreStuff(result));
, throwable ->
log.info("error (from 'app2Service')");
asyncSubject.onError(throwable);
, () ->
log.info("completed (from 'app2Service')");
asyncSubject.onCompleted();
);
log.info("after invoke 'app2Service'");
return asyncSubject;
private void doSomeStuff()
log.info("do some expensive stuff");
this.sleep(1000);
log.info("finish some expensive stuff");
private String doSomeMoreStuff(final String valueFromRemote)
log.info("do some more expensive stuff with ''", valueFromRemote);
this.sleep(2000);
log.info("finish some more expensive stuff with ''", valueFromRemote);
return "MODIFIED " + valueFromRemote;
private void sleep(final long milliSeconds)
try
Thread.sleep(milliSeconds);
catch (final InterruptedException e)
e.printStackTrace();
App2Service
@Slf4j
@Service
public class App2Service
@HystrixCommand(commandKey = "app2")
public Observable<String> value()
Observable<String> observable;
log.info("before invoke remote service");
observable = new ObservableResult<String>()
@Override
public String invoke()
log.info("invoke");
return new RestTemplate().getForEntity("http://localhost:9002/value", String.class).getBody();
;
log.info("after invoke remote service");
return observable;
配置
应用程序(主/配置类):
@EnableCircuitBreaker
@SpringBootApplication
public class Application
public static void main(final String[] args)
SpringApplication.run(Application.class, args);
@Configuration
@EnableAsync
public static class AsyncConfiguration
@Bean
public TaskExecutor taskExecutor()
final ThreadPoolTaskExecutor taskExecutor;
taskExecutor = new ThreadPoolTaskExecutor();
taskExecutor.setCorePoolSize(1);
taskExecutor.setMaxPoolSize(1);
taskExecutor.setQueueCapacity(5);
taskExecutor.setThreadNamePrefix("worker-");
return taskExecutor;
application.properties:
server.port=9001
server.tomcat.max-threads=1
hystrix.command.app2.fallback.enabled=false
hystrix.command.app2.execution.isolation.thread.timeoutInMilliseconds=15000
变体 1 的日志输出(第一次调用)
16:06:31.871 [nio-9001-exec-1] before invoke 'app1Service'
16:06:31.879 [nio-9001-exec-1] after invoke 'app1Service'
16:06:31.887 [ worker-1] before start processing
16:06:31.888 [ worker-1] do some expensive stuff
16:06:32.890 [ worker-1] finish some expensive stuff
16:06:32.891 [ worker-1] before invoke 'app2Service'
16:06:33.135 [x-App2Service-1] before invoke remote service
16:06:33.136 [x-App2Service-1] after invoke remote service
16:06:33.137 [x-App2Service-1] invoke
16:06:33.167 [ worker-1] after invoke 'app2Service'
16:06:33.172 [ worker-1] after start processing
16:07:02.816 [nio-9001-exec-1] Exception Processing ErrorPage[errorCode=0, location=/error]
java.lang.IllegalStateException: Cannot forward after response has been committed
at org.apache.catalina.core.ApplicationDispatcher.doForward(ApplicationDispatcher.java:328)
at org.apache.catalina.core.ApplicationDispatcher.forward(ApplicationDispatcher.java:318)
at org.apache.catalina.core.StandardHostValve.custom(StandardHostValve.java:439)
at org.apache.catalina.core.StandardHostValve.status(StandardHostValve.java:305)
at org.apache.catalina.core.StandardHostValve.throwable(StandardHostValve.java:399)
at org.apache.catalina.core.AsyncContextImpl.setErrorState(AsyncContextImpl.java:438)
at org.apache.catalina.connector.CoyoteAdapter.asyncDispatch(CoyoteAdapter.java:291)
at org.apache.coyote.http11.AbstractHttp11Processor.asyncDispatch(AbstractHttp11Processor.java:1709)
at org.apache.coyote.AbstractProtocol$AbstractConnectionHandler.process(AbstractProtocol.java:649)
at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1521)
at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.run(NioEndpoint.java:1478)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61)
at java.lang.Thread.run(Thread.java:745)
变体 2 的日志输出(第一次调用)
16:07:54.465 [nio-9001-exec-1] before invoke 'app1Service'
16:07:54.472 [nio-9001-exec-1] before start processing
16:07:54.500 [nio-9001-exec-1] after start processing
16:07:54.500 [nio-9001-exec-1] after invoke 'app1Service'
16:07:54.517 [ worker-1] do some expensive stuff
16:07:55.522 [ worker-1] finish some expensive stuff
16:07:55.522 [ worker-1] before invoke 'app2Service'
16:07:55.684 [x-App2Service-1] before invoke remote service
16:07:55.685 [x-App2Service-1] after invoke remote service
16:07:55.686 [x-App2Service-1] invoke
16:07:55.717 [ worker-1] after invoke 'app2Service'
16:08:05.786 [ worker-1] next (from 'app2Service')
16:08:05.786 [ worker-1] do some more expensive stuff with 'value from app2 service'
16:08:07.791 [ worker-1] finish some more expensive stuff with 'value from app2 service'
16:08:07.791 [ worker-1] completed (from 'app2Service')
16:08:07.791 [ worker-1] next (processing)
16:08:07.792 [ worker-1] completed (processing)
WT 的线程转储(使用变体 1 后)
"worker-1" #24 prio=5 os_prio=31 tid=0x00007fe2be8cf000 nid=0x5e03 waiting on condition [0x0000000123413000]
java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x00000006c0d68fb0> (a org.springframework.util.concurrent.ListenableFutureTask)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at java.util.concurrent.FutureTask.awaitDone(FutureTask.java:429)
at java.util.concurrent.FutureTask.get(FutureTask.java:191)
at org.springframework.util.concurrent.SettableListenableFuture.get(SettableListenableFuture.java:122)
at org.springframework.aop.interceptor.AsyncExecutionInterceptor$1.call(AsyncExecutionInterceptor.java:110)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Locked ownable synchronizers:
- <0x00000006c0d68170> (a java.util.concurrent.ThreadPoolExecutor$Worker)
WT 的线程转储(使用变体 2 后)
"worker-1" #24 prio=5 os_prio=31 tid=0x00007fc6136dd800 nid=0x5207 waiting on condition [0x000000012d638000]
java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x00000006c02f5388> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1067)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1127)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Locked ownable synchronizers:
- None
解决方案
异步拦截器使用简单的Future
,无法处理ListenableFuture
。在我再次查看线程转储后,我注意到FutureTask.get
获取。这是一个阻塞调用。这意味着当仅使用 1 个线程时,变体 1 是一个内置死锁。
此代码有效:
控制器
@ResponseBody
@RequestMapping("/value")
public ListenableFuture<String> value()
final SettableListenableFuture<String> future;
this.app1Service.value(future);
return future;
服务
@Async
public void value(final SettableListenableFuture<String> future)
this.doSomeStuff();
this.valueFromApp2Service().subscribe(future::set, future::setException);
【问题讨论】:
在代码示例中添加导入会很好...... 【参考方案1】:我想你已经回答了你的问题(或者我遗漏了一些东西):
如果我多次使用变体 1,则第二个(以及以下所有 请求),在第 2 步(而不是第 6 步)中失败,因为线程 pool (size = 1) 仍然被 ListenableFuture (stack 跟踪如下)。
您的TaskExecutor
只有 1 个线程可用,用于@Async
。然后,从该线程中,您想再次将 TaskExecutor
用于 Observable:
this.app2Service.value().observeOn(Schedulers.from(this.taskExecutor)).subscribe(
但是,您的池中没有更多线程可用。如果您增加 coreSize,或为 RxJava 的东西定义不同的 TaskExecutor,它应该可以工作。
编辑
如果你真的需要异步执行app1Service.value()
,可以去掉@Asynch,将任务显式提交给taskExecutor,这样就可以给@987654326添加回调@。如果将结果类型更改为DeferredResult
,则可以在执行ListenableFuture
的回调时设置它的结果:
@Autowired
private TaskExecutor taskExecutor;
@ResponseBody
@RequestMapping("/value")
public DeferredResult<String> value()
final DeferredResult<String> dr = new DeferredResult<String>();
taskExecutor.execute(() ->
final ListenableFuture<String> future = app1Service.value();
future.addCallback(new ListenableFutureCallback<String>()
@Override
public void onSuccess(String result)
dr.setResult(result);
@Override
public void onFailure(Throwable ex)
dr.setErrorResult(ex);
);
);
return dr;
或者,当然,使用您的解决方案,这样会更好。
【讨论】:
我还描述了变体 2 在相同条件下工作,并且可以同时处理更多请求。简单的解决方案是:我不能使用 ListenableFuture 作为 @Async 方法的返回类型。 Spring 只是使用 Future 功能并调用阻塞的 'get' 方法。 对。 Spring 在AsyncExecutionInterceptor
中调用future.get()
,因为这是它唯一能做的事情,它根本没有其他方法可以得到app1Service.value()
的计算结果来调度请求。实际上,我认为您根本不需要@Async
,因为您已经在使用 RxJava。以上是关于意外的异步行为:Springs 的 @Async 与 RxJava的主要内容,如果未能解决你的问题,请参考以下文章
Electron - 服务内容中的Async关键字会导致意外令牌