在 Web 应用程序中使用 RxJava Observables 无法解释的性能改进不足

Posted

技术标签:

【中文标题】在 Web 应用程序中使用 RxJava Observables 无法解释的性能改进不足【英文标题】:Unexplainable lack of performance improvement using RxJava Observables in Web Applications 【发布时间】:2016-03-21 01:18:15 【问题描述】:

我正在执行一些测试来评估使用基于 Observables 的反应式 API 是否有真正的优势,而不是传统的阻塞式 API。

整个例子是available on Githug

令人惊讶的是,结果表明,thoughput 结果是:

最好的:返回封装阻塞操作的Callable/DeferredResult 的REST 服务。

还不错:阻止 REST 服务。

最糟糕的情况:返回 DeferredResult 的 REST 服务,其结果由 RxJava Observable 设置。

这是我的 Spring WebApp:

应用程序

@SpringBootApplication
public class SpringNioRestApplication 

   @Bean
    public ThreadPoolTaskExecutor executor()
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(10);
        executor.setMaxPoolSize(20);
        return executor;
    

    public static void main(String[] args) 
        SpringApplication.run(SpringNioRestApplication.class, args);
    

同步控制器

@RestController("SyncRestController")
@Api(value="", description="Synchronous data controller")
public class SyncRestController 

    @Autowired
    private DataService dataService;

    @RequestMapping(value="/sync/data", method=RequestMethod.GET, produces="application/json")
    @ApiOperation(value = "Gets data", notes="Gets data synchronously")
    @ApiResponses(value=@ApiResponse(code=200, message="OK"))
    public List<Data> getData()
        return dataService.loadData();
    

AsyncController:同时具有原始 Callable 和 Observable 端点

@RestController
@Api(value="", description="Synchronous data controller")
public class AsyncRestController 

    @Autowired
    private DataService dataService;

    private Scheduler scheduler;

    @Autowired
    private TaskExecutor executor;

     @PostConstruct
    protected void initializeScheduler()
        scheduler = Schedulers.from(executor);
    

    @RequestMapping(value="/async/data", method=RequestMethod.GET, produces="application/json")
    @ApiOperation(value = "Gets data", notes="Gets data asynchronously")
    @ApiResponses(value=@ApiResponse(code=200, message="OK"))
    public Callable<List<Data>> getData()
        return ( () -> return dataService.loadData(); );
    

    @RequestMapping(value="/observable/data", method=RequestMethod.GET, produces="application/json")
     @ApiOperation(value = "Gets data through Observable", notes="Gets data asynchronously through Observable")
     @ApiResponses(value=@ApiResponse(code=200, message="OK"))
     public DeferredResult<List<Data>> getDataObservable()
         DeferredResult<List<Data>> dr = new DeferredResult<List<Data>>();
         Observable<List<Data>> dataObservable = dataService.loadDataObservable();
         dataObservable.subscribeOn(scheduler).subscribe( dr::setResult, dr::setErrorResult);
         return dr;
     

DataServiceImpl

@Service
public class DataServiceImpl implements DataService

    @Override
    public List<Data> loadData() 
        return generateData();
    

    @Override
    public Observable<List<Data>> loadDataObservable() 
        return Observable.create( s -> 
            List<Data> dataList = generateData();
            s.onNext(dataList);
            s.onCompleted();
        );
    

    private List<Data> generateData()
        List<Data> dataList = new ArrayList<Data>();
        for (int i = 0; i < 20; i++) 
            Data data = new Data("key"+i, "value"+i);
            dataList.add(data);
        
        //Processing time simulation
        try 
            Thread.sleep(500);
         catch (InterruptedException e) 
            e.printStackTrace();
        
        return dataList;
    

我已设置Thread.sleep(500) 延迟以增加服务响应时间。

负载测试的结果是:

与 Callable 异步:700 rps,无错误

>>loadtest -c 15 -t 60 --rps 700 http://localhost:8080/async/data    
...
Requests: 0, requests per second: 0, mean latency: 0 ms
Requests: 2839, requests per second: 568, mean latency: 500 ms
Requests: 6337, requests per second: 700, mean latency: 500 ms
Requests: 9836, requests per second: 700, mean latency: 500 ms
...
Completed requests:  41337
Total errors:        0
Total time:          60.002348360999996 s
Requests per second: 689
Total time:          60.002348360999996 s

阻塞:大约 404 rps 但会产生错误

>>loadtest -c 15 -t 60 --rps 700 http://localhost:8080/sync/data    
...
Requests: 7683, requests per second: 400, mean latency: 7420 ms
Requests: 9683, requests per second: 400, mean latency: 9570 ms
Requests: 11680, requests per second: 399, mean latency: 11720 ms
Requests: 13699, requests per second: 404, mean latency: 13760 ms
...
Percentage of the requests served within a certain time
  50%      8868 ms
  90%      22434 ms
  95%      24103 ms
  99%      25351 ms
 100%      26055 ms (longest request)

 100%      26055 ms (longest request)

   -1:   7559 errors
Requests: 31193, requests per second: 689, mean latency: 14350 ms
Errors: 1534, accumulated errors: 7559, 24.2% of total requests

Async with Observable不超过 20 rps,更快出错

>>loadtest -c 15 -t 60 --rps 700 http://localhost:8080/observable/data
Requests: 0, requests per second: 0, mean latency: 0 ms
Requests: 90, requests per second: 18, mean latency: 2250 ms
Requests: 187, requests per second: 20, mean latency: 6770 ms
Requests: 265, requests per second: 16, mean latency: 11870 ms
Requests: 2872, requests per second: 521, mean latency: 1560 ms
Errors: 2518, accumulated errors: 2518, 87.7% of total requests
Requests: 6373, requests per second: 700, mean latency: 1590 ms
Errors: 3401, accumulated errors: 5919, 92.9% of total requests 

Observable 以 10 的 corePoolSize 执行,但将其增加到 50 也没有任何改进。

有什么解释?

更新:根据 akarnokd 的建议,我进行了以下更改。从服务中的 Object.create 移动到 Object.fromCallable 并在控制器中重用调度程序,但我仍然得到相同的结果。

【问题讨论】:

你能用Observable.fromCallable代替Observable.create吗?您对create 的使用似乎很奇怪。此外,Thread.sleep 不保证睡眠量的准确,而是取决于操作系统。在getVideoInfoAsync 中,您正在不必要地反复创建调度程序包装器。 您好 akarnokd,感谢您的评论。有几件事,使用 Observable.create 有什么问题?另外,我不明白“一遍又一遍地创建调度程序包装器”的意思。为了实现它,我按照我看到的here in dzone 一开始你不会调用 s.onCompleted() ,但缺乏处理退订可能也会有问题。此外,您应该查看故障是什么,这也可能表明性能损失的来源。您有一个 TaskExecutor 作为成员字段,但是您使用 Scheduler.wrap 将其包装为 getVideoInfoAsync 的每次调用,我猜这每秒会发生数百次。 嗯,我还在原始示例中添加了s.onCompleted(),但它也没有改进。我在控制台中看到的唯一错误是o.a.c.c.C.[Tomcat].[localhost] : Exception Processing ErrorPage[errorCode=0, location=/error] java.lang.IllegalStateException: Cannot forward after response has been committed 我将 observable 更改为 Observable.just 并返回一些模拟对象以排除 RxJava 的任何缺陷。由于 RxJava 不做网络,我假设你有一个框架来代替它。根据错误消息,此框架可能配置错误、过时或存在错误。 【参考方案1】:

问题是由某个时候的编程错误引起的。实际上,问题中的示例非常有效。

防止别人出问题的一个警告:小心使用Observable.just(func),func实际上是在Observable创建时调用的。所以任何放置在那里的 Thread.sleep 都会阻塞调用线程

@Override
public Observable<List<Data>> loadDataObservable() 
    return Observable.just(generateData()).delay(500, TimeUnit.MILLISECONDS);


private List<Data> generateData()
    List<Data> dataList = new ArrayList<Data>();
    for (int i = 0; i < 20; i++) 
        Data data = new Data("key"+i, "value"+i);
        dataList.add(data);
    
    return dataList;

我在RxJava Google group 发起了讨论,他们帮助我解决了问题。

【讨论】:

那么,最终的结果是什么? 结果是 Observable 的性能比只使用 Callable 略好。

以上是关于在 Web 应用程序中使用 RxJava Observables 无法解释的性能改进不足的主要内容,如果未能解决你的问题,请参考以下文章

RX

如何在 RxJava 2 和 Android 中延迟 onError()?

Android RxJava使用介绍概念

RxJava 和 RxAndroid 四(RxBinding的使用)

MVP模式入门(结合Rxjava,Retrofit)

Android RxJava使用介绍 RxJava的操作符