RxJava:链接可观察对象

Posted

技术标签:

【中文标题】RxJava:链接可观察对象【英文标题】:RxJava: chaining observables 【发布时间】:2015-01-12 04:31:35 【问题描述】:

是否可以使用 RxJava 实现类似下一个链接的东西:

loginObservable()
   .then( (someData) -> 
      // returns another Observable<T> with some long operation
      return fetchUserDataObservable(someData);

   ).then( (userData) -> 
      // it should be called when fetching user data completed (with userData of type T)
      cacheUserData(userData);

   ).then( (userData) -> 
      // it should be called after all previous operations completed
      displayUserData()

   ).doOnError( (error) -> 
      //do something
   )

我发现这个库非常有趣,但不知道我们如何在彼此依赖的地方链接请求。

【问题讨论】:

【参考方案1】:

当然,RxJava 支持 .map 这样做。来自 RxJava Wiki:

基本上是这样的:

loginObservable()
   .switchMap( someData -> fetchUserDataObservable(someData) )
   .map( userData -> cacheUserData(userData) )
   .subscribe(new Subscriber<YourResult>() 
        @Override
        public void onCompleted() 
           // observable stream has ended - no more logins possible
        
        @Override
        public void onError(Throwable e) 
            // do something
        
        @Override
        public void onNext(YourType yourType) 
            displayUserData();
        
    );

【讨论】:

据我了解,map 将一个值转换为另一个值(例如,int -> String)。因此,即使我从第一个地图返回 Observable,第二个地图也会立即使用 Observable 类型的 userData 调用,但我希望在 fetchUserData 完成时使用 T 类型调用它 我认为switchMap 可能是您想要的。 谢谢,好像 switchMap 我想要的。如果你把它作为答案,我会接受它 @Mikhail 对不起,我的错 - 是 afk 并且没有阅读评论。如果您正在使用 observable,switchMap 确实是您所需要的。 此外,"flatMap" 为您提供典型的链接行为,与链接为 then 的 Promises 相当。【参考方案2】:

这是谷歌搜索 RxJava 链 observables 时的头条帖子,所以我将添加另一个常见情况,即您不想转换收到的数据,但是将其与另一个操作链接起来(例如,将数据设置到数据库中)。使用.flatmap()。这是一个例子:

mDataManager
    .fetchQuotesFromApi(limit)
    .subscribeOn(mSchedulerProvider.io())
    .observeOn(mSchedulerProvider.ui())
    // OnErrorResumeNext and Observable.error() would propagate the error to
    // the next level. So, whatever error occurs here, would get passed to
    // onError() on the UI side.
    .onErrorResumeNext(Function  Observable.error<List<Quote>>(it) )
    .flatMap  t: List<Quote> ->
        // Chain observable as such
        mDataManager.setQuotesToDb(t).subscribe(
            ,
             e  "setQuotesToDb() error occurred: $it.localizedMessage"  ,
             d  "Done server set"  
        )
        Observable.just(t)
    
    .subscribeBy(
        onNext = ,
        onError =  mvpView?.showError("No internet connection") ,
        onComplete =  d  "onComplete(): done with fetching quotes from api"  
    )

这是 RxKotlin2,但思路与 RxJava & RxJava2 相同:

快速解释:

我们尝试从带有mDataManager.fetchQuotesFromApi() 的api 中获取一些数据(本例中的引号) 我们订阅 observable 以在 .io() 线程上执行操作,并在 .ui() 线程上显示结果。 onErrorResumeNext() 确保我们在获取数据时遇到的任何错误都被此方法捕获。我想在出现错误时终止整个链,所以我返回一个Observable.error() .flatmap() 是链接部分。我希望能够将从 API 获得的任何数据设置到我的数据库中。我没有转换使用.map() 收到的数据,我只是在其他使用该数据而不转换它。 我订阅了最后一个 observables 链。如果在获取数据(第一个 observable)时发生错误,它将被处理(在这种情况下,传播到订阅的 onError()onErrorResumeNext() 我非常清楚我正在订阅 DB observable(在 flatmap() 内)。通过此 observable 发生的任何错误不会传播到最后的 subscribeBy() 方法,因为它是在 .flatmap() 链内的 subscribe() 方法内处理的。

代码来自this project,位于此处:https://github.com/Obaied/Sohan/blob/master/app/src/main/java/com/obaied/dingerquotes/ui/start/StartPresenter.kt

【讨论】:

您可以使用 map() 并简单地返回 t,而不是使用 flatMap() 并返回 Observable.just(t)。请注意,这两个版本都不会等待 mDataManager.setQuotesToDb() 的结果并在完成之前返回。【参考方案3】:

尝试使用 scan()

Flowable.fromArray(array).scan(...).subscribe(...)

【讨论】:

以上是关于RxJava:链接可观察对象的主要内容,如果未能解决你的问题,请参考以下文章

在对对象进行操作时链接递归 RxJS 可观察对象

RxJava Observable.create 包装可观察订阅

RxJava入门

如何测试 rxjava 链接?

RxJava使用入门

弄懂 RxJava