Rxjava 2 - 使用领域和改造获取数据
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Rxjava 2 - 使用领域和改造获取数据相关的知识,希望对你有一定的参考价值。
任务:我想先获取本地数据。如果返回的数据为null,我想调用API服务。
主持人:
private fun getData() {
val disposable = dataRepository.getDataFromRepository(String: itemId)
.observeOn(androidSchedulers.mainThread())
.subscribe({ _ ->
// I got my data
}, {
// error
})
compositeDisposable.add(disposable)
}
DataRepository:
fun getDataFromRepository(itemId: String): Single<Data> {
val dataAsSingle: Single<Event>
//fetch data locally
dataAsSingle = getDataLocally(itemId)
//if nothing was found locally, call api service to fetch data
if (dataAsSingle == null) {
getDataFromApi(itemId)
}
return dataAsSingle
}
private fun getDataLocally(itemId: String): Single<Data> {
var data: Data? = null
val realm = Realm.getDefaultInstance()
data = realm.where(Data::class.java).equalTo(itemId)
// some other logic..
realm.close()
return data?.let {
Single.just(data)
} ?: Single.error(NoSuchEventFoundException())
}
private fun getDataFromApi(itemId: String): Single<Data> {
return dataService.getDataFromApiCall(itemId)
.onErrorResumeNext(ErrorHandler(BaseErrorParser()))
.map { apiResponse ->
//some logic blah blah blah...
return@map data
}
}
}
好吧,我认为我的做法是错误的,但我不能初步告诉你为什么......这样做没问题,但似乎它跳过了RxJava的要点。也许有人可以解释如何使用RxJava 2以正确的模式和方法解决这个问题?
更新
DataRepository:
fun getDataFromRepository(itemId: String): Single<Data> {
val localData: Observable<Data> = getDataLocally(itemId)
val remoteData: Observable<Data> = getDataFromApi(itemId)
.doOnNext(
})
return Observable
.concat(localEvent, remoteEvent)
.first()
}
private fun getDataLocally(itemId: String): Observable<Data> {
var data: Data? = null
val realm = Realm.getDefaultInstance()
data = realm.where(Data::class.java).equalTo(itemId)
// some other logic..
realm.close()
return Observable.just(data)
}
private fun getDataFromApi(itemId: String): Observable<Data> {
return dataService.getDataFromApiCall(itemId)
.map { apiResponse ->
//some logic blah blah blah...
return@map data
}
}
}
答案
实际上,使用Realm执行此操作的正确方法是侦听Realm中的更改,如果Realm不包含您要查找的内容,则从API中的后台线程更新Realm
(限制在UI线程中使用此方法,以便将RealmResults接收到UI线程 - 这样您就可以使用异步事务并侦听Realm中的更改):
private fun getData() {
val disposable = dataRepository.getDataFromRepository(String: itemId)
.observeOn(AndroidSchedulers.mainThread())
.subscribe({ _ ->
// I got my data
}, {
// error
})
compositeDisposable.add(disposable)
}
哪里:
fun getDataFromRepository(itemId: String): Observable<Data> =
Observable.create { emitter ->
val realm = Realm.getDefaultInstance()
val results = realm.where<Data>().equalTo("itemId", itemId).findAllAsync()
val listener = RealmChangeListener { results ->
if(!emitter.isDisposed()) {
emitter.onNext(results)
}
}
emitter.setDisposable(Disposable {
results.removeChangeListener(listener)
realm.close()
})
results.addChangeListener(listener)
}.subscribeOn(AndroidSchedulers.mainThread())
.doOnNext { results ->
if(results.isEmpty()) {
dataService.getDataFromApiCall(itemId)
// .toSingle() // ensure this completes automatically, but only if it's not a single already
.subscribeOn(Schedulers.io())
.subscribeWith(object: DisposableObserver<>() {
fun onNext(data: Data) {
Realm.getDefaultInstance().use { realm ->
realm.executeTransaction { realm ->
realm.insert(data)
}
}
}
fun onError(throwable: Throwable) {
... /* do something */
}
)
}
}
}
另一答案
您可以使用concat
运算符。重要的是你不应该从本地源返回null值。您应该返回Empty Observable
以进行远程呼叫。这是一个示例代码:
final Observable<Page> localResult = mSearchLocalDataSource.search(query);
final Observable<Page> remoteResult = mSearchRemoteDataSource.search(query)
.doOnNext(new Action1<Page>() {
@Override
public void call(Page page) {
if (page != null) {
mSearchLocalDataSource.save(query, page);
mResultCache.put(query, page);
}
}
});
return Observable.concat(localResult, remoteResult)
.first()
.map(new Func1<Page, Page>() {
@Override
public Page call(Page page) {
if (page == null) {
throw new NoSuchElementException("No result found!");
}
return page;
}
});
以上是关于Rxjava 2 - 使用领域和改造获取数据的主要内容,如果未能解决你的问题,请参考以下文章