意外的异步行为:Springs 的 @Async 与 RxJava

Posted

技术标签:

【中文标题】意外的异步行为:Springs 的 @Async 与 RxJava【英文标题】:Unexpected async behavior: Springs's @Async vs RxJava 【发布时间】:2015-11-24 01:46:37 【问题描述】:

我在玩 Spring、RxJava 和非阻塞数据处理。在我的测试应用程序中,我想实现以下测试工作流程:

    [RT] 接收请求 [RT] 在工作线程中开始异步处理 [WT] 做一些(昂贵的)初始化工作 [WT] 异步调用远程系统获取值 [HT] 执行对远程系统的请求 [HT] 将响应结果转发到工作线程 [WT] 使用远程系统的结果做更多​​(昂贵的)工作 [WT] 返回最终结果

RT:请求线程(Tomcat NIO)

WT : Worker Thread(固定大小为 1 且队列大小为 5 的线程池)

HT : Hystrix Thread(Hystrix 线程池,默认设置)

(这只是一个示例,结合对远程资源的依赖来模拟昂贵的数据处理)

我有两种代码变体:

    使用 @Async 调用 WT(第 2 步)和 Rx 的 Observable 调用其余部分 (http://localhost:9001/value) 仅使用 Rx 的 Observables (http://localhost:9001/value-rx)

http://localhost:9002/value 是远程资源)

变体 2 运行良好,但变体 1(带有@Async)遇到了一些问题。通过分析异常、线程转储、线程池状态和日志文件,看起来ListenableFuture(由步骤2中的@Async服务方法返回)无限阻塞线程池,线程本身正在等待。因此,RxJava 无法在给定的线程池中运行回调代码(步骤 6)。 30秒后抛出异常,整个进程失败,因为线程池仍然被阻塞,我不明白为什么。

如果我多次使用变体 1,第二个(以及所有后续请求)在步骤 2(而不是 6)中失败,因为线程池(大小 = 1)仍然被 ListenableFuture(堆栈跟踪如下)。

变体 2 能够“同时”处理多个请求而不会出现问题,直到队列已满,即使只有 1 个请求线程和 1 个工作线程。

在这两种情况下,我都使用this 的修改版本将Observable 的实例映射到ListenableFuture。 我已向控制器和服务类添加了额外的日志记录。这样可以更轻松地查看代码部分在哪个线程中执行。

为什么 @Async 会导致此问题,我该如何解决?

代码如下:

App1Controller

@Slf4j
@RestController
public class App1Controller 

    @Autowired
    private App1Service app1Service;

    @ResponseBody
    @RequestMapping("/value")
    public ListenableFuture<String> value() 
        final ListenableFuture<String> future;
        log.info("before invoke 'app1Service'");
        future = this.app1Service.value();
        log.info("after invoke 'app1Service'");
        return future;
    

    @ResponseBody
    @RequestMapping("/value-rx")
    public ListenableFuture<String> valueRx() 
        final Observable<String> observable;

        log.info("before invoke 'app1Service'");
        observable = this.app1Service.valueRx();
        log.info("after invoke 'app1Service'");

        return new ObservableListenableFuture<>(observable);
    

App1Service

@Slf4j
@Service
public class App1Service 

    @Autowired
    private TaskExecutor taskExecutor;

    @Autowired
    private App2Service app2Service;

    @Async
    public ListenableFuture<String> value() 
        final ListenableFuture<String> future;

        log.info("before start processing");
        this.doSomeStuff();
        future = new ObservableListenableFuture<>(this.valueFromApp2Service());
        log.info("after start processing");

        return future;
    

    public Observable<String> valueRx() 
        final Observable<String> observable;

        log.info("before start processing");

        observable = Observable.<String>create(s -> 
            this.doSomeStuff();
            this.valueFromApp2Service().subscribe(
                    result -> 
                        log.info("next (processing)");
                        s.onNext(result);
                    ,
                    throwable -> 
                        log.info("error (processing)");
                        s.onError(throwable);
                    ,
                    () -> 
                        log.info("completed (processing)");
                        s.onCompleted();
                    );
        ).subscribeOn(Schedulers.from(this.taskExecutor));

        log.info("after start processing");

        return observable;
    

    private Observable<String> valueFromApp2Service() 
        final AsyncSubject<String> asyncSubject;

        log.info("before invoke 'app2Service'");

        asyncSubject = AsyncSubject.create();
        this.app2Service.value().observeOn(Schedulers.from(this.taskExecutor)).subscribe(
                result -> 
                    log.info("next (from 'app2Service')");
                    asyncSubject.onNext(this.doSomeMoreStuff(result));
                , throwable -> 
                    log.info("error (from 'app2Service')");
                    asyncSubject.onError(throwable);
                , () -> 
                    log.info("completed (from 'app2Service')");
                    asyncSubject.onCompleted();
                );

        log.info("after invoke 'app2Service'");

        return asyncSubject;
    

    private void doSomeStuff() 
        log.info("do some expensive stuff");
        this.sleep(1000);
        log.info("finish some expensive stuff");
    

    private String doSomeMoreStuff(final String valueFromRemote) 
        log.info("do some more expensive stuff with ''", valueFromRemote);
        this.sleep(2000);
        log.info("finish some more expensive stuff with ''", valueFromRemote);
        return "MODIFIED " + valueFromRemote;
    

    private void sleep(final long milliSeconds) 
        try 
            Thread.sleep(milliSeconds);
         catch (final InterruptedException e) 
            e.printStackTrace();
        
    

App2Service

@Slf4j
@Service
public class App2Service 

    @HystrixCommand(commandKey = "app2")
    public Observable<String> value() 
        Observable<String> observable;

        log.info("before invoke remote service");

        observable = new ObservableResult<String>() 

            @Override
            public String invoke() 
                log.info("invoke");
                return new RestTemplate().getForEntity("http://localhost:9002/value", String.class).getBody();
            

        ;

        log.info("after invoke remote service");

        return observable;
    

配置

应用程序(主/配置类):

@EnableCircuitBreaker
@SpringBootApplication
public class Application 

    public static void main(final String[] args) 
        SpringApplication.run(Application.class, args);
    

    @Configuration
    @EnableAsync
    public static class AsyncConfiguration 

        @Bean
        public TaskExecutor taskExecutor() 
            final ThreadPoolTaskExecutor taskExecutor;

            taskExecutor = new ThreadPoolTaskExecutor();
            taskExecutor.setCorePoolSize(1);
            taskExecutor.setMaxPoolSize(1);
            taskExecutor.setQueueCapacity(5);
            taskExecutor.setThreadNamePrefix("worker-");

            return taskExecutor;
        
    

application.properties:

server.port=9001
server.tomcat.max-threads=1
hystrix.command.app2.fallback.enabled=false
hystrix.command.app2.execution.isolation.thread.timeoutInMilliseconds=15000

变体 1 的日志输出(第一次调用)

16:06:31.871 [nio-9001-exec-1] before invoke 'app1Service'
16:06:31.879 [nio-9001-exec-1] after invoke 'app1Service'
16:06:31.887 [       worker-1] before start processing
16:06:31.888 [       worker-1] do some expensive stuff
16:06:32.890 [       worker-1] finish some expensive stuff
16:06:32.891 [       worker-1] before invoke 'app2Service'
16:06:33.135 [x-App2Service-1] before invoke remote service
16:06:33.136 [x-App2Service-1] after invoke remote service
16:06:33.137 [x-App2Service-1] invoke
16:06:33.167 [       worker-1] after invoke 'app2Service'
16:06:33.172 [       worker-1] after start processing
16:07:02.816 [nio-9001-exec-1] Exception Processing ErrorPage[errorCode=0, location=/error]

java.lang.IllegalStateException: Cannot forward after response has been committed
    at org.apache.catalina.core.ApplicationDispatcher.doForward(ApplicationDispatcher.java:328)
    at org.apache.catalina.core.ApplicationDispatcher.forward(ApplicationDispatcher.java:318)
    at org.apache.catalina.core.StandardHostValve.custom(StandardHostValve.java:439)
    at org.apache.catalina.core.StandardHostValve.status(StandardHostValve.java:305)
    at org.apache.catalina.core.StandardHostValve.throwable(StandardHostValve.java:399)
    at org.apache.catalina.core.AsyncContextImpl.setErrorState(AsyncContextImpl.java:438)
    at org.apache.catalina.connector.CoyoteAdapter.asyncDispatch(CoyoteAdapter.java:291)
    at org.apache.coyote.http11.AbstractHttp11Processor.asyncDispatch(AbstractHttp11Processor.java:1709)
    at org.apache.coyote.AbstractProtocol$AbstractConnectionHandler.process(AbstractProtocol.java:649)
    at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1521)
    at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.run(NioEndpoint.java:1478)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61)
    at java.lang.Thread.run(Thread.java:745)

变体 2 的日志输出(第一次调用)

16:07:54.465 [nio-9001-exec-1] before invoke 'app1Service'
16:07:54.472 [nio-9001-exec-1] before start processing
16:07:54.500 [nio-9001-exec-1] after start processing
16:07:54.500 [nio-9001-exec-1] after invoke 'app1Service'
16:07:54.517 [       worker-1] do some expensive stuff
16:07:55.522 [       worker-1] finish some expensive stuff
16:07:55.522 [       worker-1] before invoke 'app2Service'
16:07:55.684 [x-App2Service-1] before invoke remote service
16:07:55.685 [x-App2Service-1] after invoke remote service
16:07:55.686 [x-App2Service-1] invoke
16:07:55.717 [       worker-1] after invoke 'app2Service'
16:08:05.786 [       worker-1] next (from 'app2Service')
16:08:05.786 [       worker-1] do some more expensive stuff with 'value from app2 service'
16:08:07.791 [       worker-1] finish some more expensive stuff with 'value from app2 service'
16:08:07.791 [       worker-1] completed (from 'app2Service')
16:08:07.791 [       worker-1] next (processing)
16:08:07.792 [       worker-1] completed (processing)

WT 的线程转储(使用变体 1 后)

"worker-1" #24 prio=5 os_prio=31 tid=0x00007fe2be8cf000 nid=0x5e03 waiting on condition [0x0000000123413000]
   java.lang.Thread.State: WAITING (parking)
    at sun.misc.Unsafe.park(Native Method)
    - parking to wait for  <0x00000006c0d68fb0> (a org.springframework.util.concurrent.ListenableFutureTask)
    at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
    at java.util.concurrent.FutureTask.awaitDone(FutureTask.java:429)
    at java.util.concurrent.FutureTask.get(FutureTask.java:191)
    at org.springframework.util.concurrent.SettableListenableFuture.get(SettableListenableFuture.java:122)
    at org.springframework.aop.interceptor.AsyncExecutionInterceptor$1.call(AsyncExecutionInterceptor.java:110)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

   Locked ownable synchronizers:
    - <0x00000006c0d68170> (a java.util.concurrent.ThreadPoolExecutor$Worker)

WT 的线程转储(使用变体 2 后)

"worker-1" #24 prio=5 os_prio=31 tid=0x00007fc6136dd800 nid=0x5207 waiting on condition [0x000000012d638000]
   java.lang.Thread.State: WAITING (parking)
    at sun.misc.Unsafe.park(Native Method)
    - parking to wait for  <0x00000006c02f5388> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
    at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
    at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
    at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
    at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1067)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1127)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

   Locked ownable synchronizers:
    - None

解决方案

异步拦截器使用简单的Future,无法处理ListenableFuture。在我再次查看线程转储后,我注意到FutureTask.get 获取。这是一个阻塞调用。这意味着当仅使用 1 个线程时,变体 1 是一个内置死锁。

此代码有效:

控制器

@ResponseBody
@RequestMapping("/value")
public ListenableFuture<String> value() 
    final SettableListenableFuture<String> future;
    this.app1Service.value(future);
    return future;

服务

@Async
public void value(final SettableListenableFuture<String> future) 
    this.doSomeStuff();
    this.valueFromApp2Service().subscribe(future::set, future::setException);

【问题讨论】:

在代码示例中添加导入会很好...... 【参考方案1】:

我想你已经回答了你的问题(或者我遗漏了一些东西):

如果我多次使用变体 1,则第二个(以及以下所有 请求),在第 2 步(而不是第 6 步)中失败,因为线程 pool (size = 1) 仍然被 ListenableFuture (stack 跟踪如下)。

您的TaskExecutor 只有 1 个线程可用,用于@Async。然后,从该线程中,您想再次将 TaskExecutor 用于 Observable:

    this.app2Service.value().observeOn(Schedulers.from(this.taskExecutor)).subscribe(

但是,您的池中没有更多线程可用。如果您增加 coreSize,或为 RxJava 的东西定义不同的 TaskExecutor,它应该可以工作。

编辑

如果你真的需要异步执行app1Service.value(),可以去掉@Asynch,将任务显式提交给taskExecutor,这样就可以给@987654326添加回调@。如果将结果类型更改为DeferredResult,则可以在执行ListenableFuture 的回调时设置它的结果:

@Autowired
private TaskExecutor taskExecutor;

@ResponseBody
@RequestMapping("/value")
public DeferredResult<String> value() 

    final DeferredResult<String> dr = new DeferredResult<String>();
    taskExecutor.execute(() -> 
        final ListenableFuture<String> future = app1Service.value();
        future.addCallback(new ListenableFutureCallback<String>() 

            @Override
            public void onSuccess(String result) 
                dr.setResult(result);
            

            @Override
            public void onFailure(Throwable ex) 
                dr.setErrorResult(ex);
            
        );
    );
    return dr;

或者,当然,使用您的解决方案,这样会更好。

【讨论】:

我还描述了变体 2 在相同条件下工作,并且可以同时处理更多请求。简单的解决方案是:我不能使用 ListenableFuture 作为 @Async 方法的返回类型。 Spring 只是使用 Future 功能并调用阻塞的 'get' 方法。 对。 Spring 在AsyncExecutionInterceptor 中调用future.get(),因为这是它唯一能做的事情,它根本没有其他方法可以得到app1Service.value() 的计算结果来调度请求。实际上,我认为您根本不需要@Async,因为您已经在使用 RxJava。

以上是关于意外的异步行为:Springs 的 @Async 与 RxJava的主要内容,如果未能解决你的问题,请参考以下文章

Electron - 服务内容中的Async关键字会导致意外令牌

异步/等待行为问题[重复]

SyntaxError:异步函数中出现意外的保留字“等待”

猫鼬 findById 的异步/等待行为

(节点 J.S.)SyntaxError:异步函数上的意外令牌函数

使用钩子获取数据时的意外行为