RxJava2:如何改进并行数据下载和缓存?

Posted

技术标签:

【中文标题】RxJava2:如何改进并行数据下载和缓存?【英文标题】:RxJava2: How to improve parallel data downloading and caching? 【发布时间】:2017-04-10 19:36:17 【问题描述】:

我正在努力通过 RxJava2。我想知道我的解决方案是否可以接受,或者有什么方法可以改进它。

用例

    用户按下更新数据按钮 显示一个对话框 - 请稍候 并行处理多个后端调用 一旦完成这些 - 数据将保存在本地数据库中 在完成所有请求(后端调用和持久化)后,应关闭对话框

当前解决方案

我有几个Completables 看起来像这样:

Completable organisationUnitCompletable = backendService.getOrganisationUnits()
    .doOnNext(data -> organisationUnitDao.saveInTx(data))
    .ignoreElements()
    .subscribeOn(Schedulers.io());

Completable locationCompletable = backendService.getLocations()
    .doOnNext(data -> locationDao.saveInTx(data))
    .ignoreElements()
    .subscribeOn(Schedulers.io());

Completable prioritiesCompletable = backendService.getPriorities()
    .doOnNext(data -> priorityDao.saveInTx(data))
    .ignoreElements()
    .subscribeOn(Schedulers.io());

我通过添加到列表并使用merge 运算符将它们打包成一个:

List<Completable> compatibles = new ArrayList<>();
compatibles.add(organisationUnitCompletable);
compatibles.add(locationCompletable);
compatibles.add(prioritiesCompletable);

Completable.merge(compatibles)
.subscribeOn(Schedulers.io())
.observeOn(androidSchedulers.mainThread())
.subscribe(() -> 
     progressDialog.dismiss();
);

可能的改进

好的,这样可以按预期工作。但有些事情我并不那么高兴。

我真的必须将subscribeOn(Schedulers.io()) 添加到每个 Completable 中吗?没有它就不能并行工作,但也许有更好的方法来做到这一点?

所有可完成项都有这些行。

    .ignoreElements()
    .subscribeOn(Schedulers.io());

有没有办法将它提取到一种方法中?我尝试过这样的事情:

private <T> Completable prepareCompletable(Function<Void, Observable<List<T>>> source, AbstractDao<T, Long> dao) 

    Completable orderTypeCompletable = source
            .doOnNext(data -> dao.saveInTx(data))
            .ignoreElements()
            .subscribeOn(Schedulers.io());

我只是将 Observable 和 DAO 放入其中。当然编译不了。看来它需要的泛型知识比我已有的要多得多。

抱歉这个问题太长了,很难用几句话来解释整个用例。

【问题讨论】:

很好很清楚的问题。谢谢。 【参考方案1】:

我真的必须将 subscribeOn(Schedulers.io()) 添加到每个 Completable 吗?

是的,但在Completable.merge() 之后就不需要了。

有没有办法将它提取到一个方法中?

public static <T> Function<Flowable<T>, Completable> applyIgnore() 
    return f -> f.ignoreElements().subscribeOn(Schedulers.io());


Completable locationCompletable = backendService.getLocations()
.doOnNext(data -> locationDao.saveInTx(data))
.to(applyIgnore());

【讨论】:

以上是关于RxJava2:如何改进并行数据下载和缓存?的主要内容,如果未能解决你的问题,请参考以下文章

基于并行下载算法和动态缓存池的高性能WebGIS数据存取与显示关键技术研究

基于RxJava2+Retrofit2精心打造的Android基础框架XSnow

基于RxJava2+Retrofit2精心打造的Android基础框架XSnow

如何使用 RxJava 2 改进从 Firebase db 读取的 Flowable<Object> 数据?

一款基于RxJava2+Retrofit2实现简单易用的网络请求框架

centos设置缓存盘