RxJava:链接可观察对象
Posted
技术标签:
【中文标题】RxJava:链接可观察对象【英文标题】:RxJava: chaining observables 【发布时间】:2015-01-12 04:31:35 【问题描述】:是否可以使用 RxJava 实现类似下一个链接的东西:
loginObservable()
.then( (someData) ->
// returns another Observable<T> with some long operation
return fetchUserDataObservable(someData);
).then( (userData) ->
// it should be called when fetching user data completed (with userData of type T)
cacheUserData(userData);
).then( (userData) ->
// it should be called after all previous operations completed
displayUserData()
).doOnError( (error) ->
//do something
)
我发现这个库非常有趣,但不知道我们如何在彼此依赖的地方链接请求。
【问题讨论】:
【参考方案1】:当然,RxJava 支持 .map
这样做。来自 RxJava Wiki:
基本上是这样的:
loginObservable()
.switchMap( someData -> fetchUserDataObservable(someData) )
.map( userData -> cacheUserData(userData) )
.subscribe(new Subscriber<YourResult>()
@Override
public void onCompleted()
// observable stream has ended - no more logins possible
@Override
public void onError(Throwable e)
// do something
@Override
public void onNext(YourType yourType)
displayUserData();
);
【讨论】:
据我了解,map 将一个值转换为另一个值(例如,int -> String)。因此,即使我从第一个地图返回 ObservableswitchMap
可能是您想要的。
谢谢,好像 switchMap 我想要的。如果你把它作为答案,我会接受它
@Mikhail 对不起,我的错 - 是 afk 并且没有阅读评论。如果您正在使用 observable,switchMap
确实是您所需要的。
此外,"flatMap" 为您提供典型的链接行为,与链接为 then
的 Promises 相当。【参考方案2】:
这是谷歌搜索 RxJava 链 observables 时的头条帖子,所以我将添加另一个常见情况,即您不想转换收到的数据,但是将其与另一个操作链接起来(例如,将数据设置到数据库中)。使用.flatmap()
。这是一个例子:
mDataManager
.fetchQuotesFromApi(limit)
.subscribeOn(mSchedulerProvider.io())
.observeOn(mSchedulerProvider.ui())
// OnErrorResumeNext and Observable.error() would propagate the error to
// the next level. So, whatever error occurs here, would get passed to
// onError() on the UI side.
.onErrorResumeNext(Function Observable.error<List<Quote>>(it) )
.flatMap t: List<Quote> ->
// Chain observable as such
mDataManager.setQuotesToDb(t).subscribe(
,
e "setQuotesToDb() error occurred: $it.localizedMessage" ,
d "Done server set"
)
Observable.just(t)
.subscribeBy(
onNext = ,
onError = mvpView?.showError("No internet connection") ,
onComplete = d "onComplete(): done with fetching quotes from api"
)
这是 RxKotlin2,但思路与 RxJava & RxJava2 相同:
快速解释:
我们尝试从带有mDataManager.fetchQuotesFromApi()
的api 中获取一些数据(本例中的引号)
我们订阅 observable 以在 .io()
线程上执行操作,并在 .ui()
线程上显示结果。
onErrorResumeNext()
确保我们在获取数据时遇到的任何错误都被此方法捕获。我想在出现错误时终止整个链,所以我返回一个Observable.error()
.flatmap()
是链接部分。我希望能够将从 API 获得的任何数据设置到我的数据库中。我没有转换使用.map()
收到的数据,我只是在其他使用该数据而不转换它。
我订阅了最后一个 observables 链。如果在获取数据(第一个 observable)时发生错误,它将被处理(在这种情况下,传播到订阅的 onError()
)onErrorResumeNext()
我非常清楚我正在订阅 DB observable(在 flatmap()
内)。通过此 observable 发生的任何错误不会传播到最后的 subscribeBy()
方法,因为它是在 .flatmap()
链内的 subscribe()
方法内处理的。
代码来自this project,位于此处:https://github.com/Obaied/Sohan/blob/master/app/src/main/java/com/obaied/dingerquotes/ui/start/StartPresenter.kt
【讨论】:
您可以使用 map() 并简单地返回 t,而不是使用 flatMap() 并返回 Observable.just(t)。请注意,这两个版本都不会等待 mDataManager.setQuotesToDb() 的结果并在完成之前返回。【参考方案3】:尝试使用 scan()
Flowable.fromArray(array).scan(...).subscribe(...)
【讨论】:
以上是关于RxJava:链接可观察对象的主要内容,如果未能解决你的问题,请参考以下文章