在 Web 应用程序中使用 RxJava Observables 无法解释的性能改进不足
Posted
技术标签:
【中文标题】在 Web 应用程序中使用 RxJava Observables 无法解释的性能改进不足【英文标题】:Unexplainable lack of performance improvement using RxJava Observables in Web Applications 【发布时间】:2016-03-21 01:18:15 【问题描述】:我正在执行一些测试来评估使用基于 Observables 的反应式 API 是否有真正的优势,而不是传统的阻塞式 API。
整个例子是available on Githug
令人惊讶的是,结果表明,thoughput 结果是:
最好的:返回封装阻塞操作的Callable
/DeferredResult
的REST 服务。
还不错:阻止 REST 服务。
最糟糕的情况:返回 DeferredResult 的 REST 服务,其结果由 RxJava Observable 设置。
这是我的 Spring WebApp:
应用程序:
@SpringBootApplication
public class SpringNioRestApplication
@Bean
public ThreadPoolTaskExecutor executor()
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(10);
executor.setMaxPoolSize(20);
return executor;
public static void main(String[] args)
SpringApplication.run(SpringNioRestApplication.class, args);
同步控制器:
@RestController("SyncRestController")
@Api(value="", description="Synchronous data controller")
public class SyncRestController
@Autowired
private DataService dataService;
@RequestMapping(value="/sync/data", method=RequestMethod.GET, produces="application/json")
@ApiOperation(value = "Gets data", notes="Gets data synchronously")
@ApiResponses(value=@ApiResponse(code=200, message="OK"))
public List<Data> getData()
return dataService.loadData();
AsyncController:同时具有原始 Callable 和 Observable 端点
@RestController
@Api(value="", description="Synchronous data controller")
public class AsyncRestController
@Autowired
private DataService dataService;
private Scheduler scheduler;
@Autowired
private TaskExecutor executor;
@PostConstruct
protected void initializeScheduler()
scheduler = Schedulers.from(executor);
@RequestMapping(value="/async/data", method=RequestMethod.GET, produces="application/json")
@ApiOperation(value = "Gets data", notes="Gets data asynchronously")
@ApiResponses(value=@ApiResponse(code=200, message="OK"))
public Callable<List<Data>> getData()
return ( () -> return dataService.loadData(); );
@RequestMapping(value="/observable/data", method=RequestMethod.GET, produces="application/json")
@ApiOperation(value = "Gets data through Observable", notes="Gets data asynchronously through Observable")
@ApiResponses(value=@ApiResponse(code=200, message="OK"))
public DeferredResult<List<Data>> getDataObservable()
DeferredResult<List<Data>> dr = new DeferredResult<List<Data>>();
Observable<List<Data>> dataObservable = dataService.loadDataObservable();
dataObservable.subscribeOn(scheduler).subscribe( dr::setResult, dr::setErrorResult);
return dr;
DataServiceImpl
@Service
public class DataServiceImpl implements DataService
@Override
public List<Data> loadData()
return generateData();
@Override
public Observable<List<Data>> loadDataObservable()
return Observable.create( s ->
List<Data> dataList = generateData();
s.onNext(dataList);
s.onCompleted();
);
private List<Data> generateData()
List<Data> dataList = new ArrayList<Data>();
for (int i = 0; i < 20; i++)
Data data = new Data("key"+i, "value"+i);
dataList.add(data);
//Processing time simulation
try
Thread.sleep(500);
catch (InterruptedException e)
e.printStackTrace();
return dataList;
我已设置Thread.sleep(500)
延迟以增加服务响应时间。
负载测试的结果是:
与 Callable 异步:700 rps,无错误
>>loadtest -c 15 -t 60 --rps 700 http://localhost:8080/async/data
...
Requests: 0, requests per second: 0, mean latency: 0 ms
Requests: 2839, requests per second: 568, mean latency: 500 ms
Requests: 6337, requests per second: 700, mean latency: 500 ms
Requests: 9836, requests per second: 700, mean latency: 500 ms
...
Completed requests: 41337
Total errors: 0
Total time: 60.002348360999996 s
Requests per second: 689
Total time: 60.002348360999996 s
阻塞:大约 404 rps 但会产生错误
>>loadtest -c 15 -t 60 --rps 700 http://localhost:8080/sync/data
...
Requests: 7683, requests per second: 400, mean latency: 7420 ms
Requests: 9683, requests per second: 400, mean latency: 9570 ms
Requests: 11680, requests per second: 399, mean latency: 11720 ms
Requests: 13699, requests per second: 404, mean latency: 13760 ms
...
Percentage of the requests served within a certain time
50% 8868 ms
90% 22434 ms
95% 24103 ms
99% 25351 ms
100% 26055 ms (longest request)
100% 26055 ms (longest request)
-1: 7559 errors
Requests: 31193, requests per second: 689, mean latency: 14350 ms
Errors: 1534, accumulated errors: 7559, 24.2% of total requests
Async with Observable:不超过 20 rps,更快出错
>>loadtest -c 15 -t 60 --rps 700 http://localhost:8080/observable/data
Requests: 0, requests per second: 0, mean latency: 0 ms
Requests: 90, requests per second: 18, mean latency: 2250 ms
Requests: 187, requests per second: 20, mean latency: 6770 ms
Requests: 265, requests per second: 16, mean latency: 11870 ms
Requests: 2872, requests per second: 521, mean latency: 1560 ms
Errors: 2518, accumulated errors: 2518, 87.7% of total requests
Requests: 6373, requests per second: 700, mean latency: 1590 ms
Errors: 3401, accumulated errors: 5919, 92.9% of total requests
Observable 以 10 的 corePoolSize 执行,但将其增加到 50 也没有任何改进。
有什么解释?
更新:根据 akarnokd 的建议,我进行了以下更改。从服务中的 Object.create 移动到 Object.fromCallable 并在控制器中重用调度程序,但我仍然得到相同的结果。
【问题讨论】:
你能用Observable.fromCallable
代替Observable.create
吗?您对create
的使用似乎很奇怪。此外,Thread.sleep 不保证睡眠量的准确,而是取决于操作系统。在getVideoInfoAsync
中,您正在不必要地反复创建调度程序包装器。
您好 akarnokd,感谢您的评论。有几件事,使用 Observable.create 有什么问题?另外,我不明白“一遍又一遍地创建调度程序包装器”的意思。为了实现它,我按照我看到的here in dzone
一开始你不会调用 s.onCompleted() ,但缺乏处理退订可能也会有问题。此外,您应该查看故障是什么,这也可能表明性能损失的来源。您有一个 TaskExecutor 作为成员字段,但是您使用 Scheduler.wrap 将其包装为 getVideoInfoAsync
的每次调用,我猜这每秒会发生数百次。
嗯,我还在原始示例中添加了s.onCompleted()
,但它也没有改进。我在控制台中看到的唯一错误是o.a.c.c.C.[Tomcat].[localhost] : Exception Processing ErrorPage[errorCode=0, location=/error] java.lang.IllegalStateException: Cannot forward after response has been committed
我将 observable 更改为 Observable.just 并返回一些模拟对象以排除 RxJava 的任何缺陷。由于 RxJava 不做网络,我假设你有一个框架来代替它。根据错误消息,此框架可能配置错误、过时或存在错误。
【参考方案1】:
问题是由某个时候的编程错误引起的。实际上,问题中的示例非常有效。
防止别人出问题的一个警告:小心使用Observable.just(func)
,func实际上是在Observable创建时调用的。所以任何放置在那里的 Thread.sleep 都会阻塞调用线程
@Override
public Observable<List<Data>> loadDataObservable()
return Observable.just(generateData()).delay(500, TimeUnit.MILLISECONDS);
private List<Data> generateData()
List<Data> dataList = new ArrayList<Data>();
for (int i = 0; i < 20; i++)
Data data = new Data("key"+i, "value"+i);
dataList.add(data);
return dataList;
我在RxJava Google group 发起了讨论,他们帮助我解决了问题。
【讨论】:
那么,最终的结果是什么? 结果是 Observable 的性能比只使用 Callable 略好。以上是关于在 Web 应用程序中使用 RxJava Observables 无法解释的性能改进不足的主要内容,如果未能解决你的问题,请参考以下文章
如何在 RxJava 2 和 Android 中延迟 onError()?