Rxjava 2 - 使用领域和改造获取数据

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Rxjava 2 - 使用领域和改造获取数据相关的知识,希望对你有一定的参考价值。

任务:我想先获取本地数据。如果返回的数据为null,我想调用API服务。

主持人:

private fun getData() {
    val disposable = dataRepository.getDataFromRepository(String: itemId)
            .observeOn(androidSchedulers.mainThread())
            .subscribe({ _ ->
                // I got my data
            }, {
                // error
            })
    compositeDisposable.add(disposable)
}

DataRepository:

fun getDataFromRepository(itemId: String): Single<Data> {
    val dataAsSingle: Single<Event>

    //fetch data locally
    dataAsSingle = getDataLocally(itemId)

    //if nothing was found locally, call api service to fetch data
    if (dataAsSingle == null) {
        getDataFromApi(itemId)
    }

    return dataAsSingle
}

private fun getDataLocally(itemId: String): Single<Data> {
    var data: Data? = null

    val realm = Realm.getDefaultInstance()
        data = realm.where(Data::class.java).equalTo(itemId)
    // some other logic..
    realm.close()

    return data?.let {
        Single.just(data)
    } ?: Single.error(NoSuchEventFoundException())
}

private fun getDataFromApi(itemId: String): Single<Data> {
    return dataService.getDataFromApiCall(itemId)
            .onErrorResumeNext(ErrorHandler(BaseErrorParser()))
            .map { apiResponse ->
                    //some logic blah blah blah...
                    return@map data
                } 
            }
}

好吧,我认为我的做法是错误的,但我不能初步告诉你为什么......这样做没问题,但似乎它跳过了RxJava的要点。也许有人可以解释如何使用RxJava 2以正确的模式和方法解决这个问题?

更新

DataRepository:

fun getDataFromRepository(itemId: String): Single<Data> {

    val localData: Observable<Data> = getDataLocally(itemId)
    val remoteData: Observable<Data> = getDataFromApi(itemId)
           .doOnNext(

            })

    return Observable
            .concat(localEvent, remoteEvent)
            .first()
}

private fun getDataLocally(itemId: String): Observable<Data> {
    var data: Data? = null

    val realm = Realm.getDefaultInstance()
        data = realm.where(Data::class.java).equalTo(itemId)
    // some other logic..
    realm.close()

    return Observable.just(data)
}

private fun getDataFromApi(itemId: String): Observable<Data> {
    return dataService.getDataFromApiCall(itemId)
            .map { apiResponse ->
                    //some logic blah blah blah...
                    return@map data
                } 
            }
}
答案

实际上,使用Realm执行此操作的正确方法是侦听Realm中的更改,如果Realm不包含您要查找的内容,则从API中的后台线程更新Realm

(限制在UI线程中使用此方法,以便将RealmResults接收到UI线程 - 这样您就可以使用异步事务并侦听Realm中的更改):

private fun getData() {
    val disposable = dataRepository.getDataFromRepository(String: itemId)
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe({ _ ->
                // I got my data
            }, {
                // error
            })
    compositeDisposable.add(disposable)
}

哪里:

fun getDataFromRepository(itemId: String): Observable<Data> =
    Observable.create { emitter ->
        val realm = Realm.getDefaultInstance()
        val results = realm.where<Data>().equalTo("itemId", itemId).findAllAsync()
        val listener = RealmChangeListener { results ->
            if(!emitter.isDisposed()) {
                emitter.onNext(results)
            }
        }
        emitter.setDisposable(Disposable {
            results.removeChangeListener(listener)
            realm.close()
        })
        results.addChangeListener(listener)
    }.subscribeOn(AndroidSchedulers.mainThread())
    .doOnNext { results ->
        if(results.isEmpty()) {
            dataService.getDataFromApiCall(itemId)
                       // .toSingle() // ensure this completes automatically, but only if it's not a single already
                       .subscribeOn(Schedulers.io())
                       .subscribeWith(object: DisposableObserver<>() {
                           fun onNext(data: Data) {
                               Realm.getDefaultInstance().use { realm ->
                                  realm.executeTransaction { realm ->
                                      realm.insert(data)
                                   }
                               }
                           }

                           fun onError(throwable: Throwable) {
                               ... /* do something */ 
                           }
                      )
        }
    }
}
另一答案

您可以使用concat运算符。重要的是你不应该从本地源返回null值。您应该返回Empty Observable以进行远程呼叫。这是一个示例代码:

final Observable<Page> localResult = mSearchLocalDataSource.search(query);
final Observable<Page> remoteResult = mSearchRemoteDataSource.search(query)
                .doOnNext(new Action1<Page>() {
                    @Override
                    public void call(Page page) {
                        if (page != null) {
                            mSearchLocalDataSource.save(query, page);
                            mResultCache.put(query, page);
                        }
                    }
                });

        return Observable.concat(localResult, remoteResult)
                .first()
                .map(new Func1<Page, Page>() {
                    @Override
                    public Page call(Page page) {
                        if (page == null) {
                            throw new NoSuchElementException("No result found!");
                        }
                        return page;
                    }
});

以上是关于Rxjava 2 - 使用领域和改造获取数据的主要内容,如果未能解决你的问题,请参考以下文章

Rxjava,改造和多次调用

从retrofit2和rxjava中的错误获取url

使用 rxJava 和改造多次调用另一个请求中的请求

带有 rxJava2 和改造的 UndeliverableException

RxJava 和改造的单元测试

使用改造对 mvvm 中的 Rxjava 进行错误处理