RxJava Observable.create 包装可观察订阅

Posted

技术标签:

【中文标题】RxJava Observable.create 包装可观察订阅【英文标题】:RxJava Observable.create wrapping observable subscriptions 【发布时间】:2017-11-14 03:31:19 【问题描述】:

我使用 Observable.create 以便在某些数据可用时通知订阅者。我有点不确定在我的 create 方法中订阅 observables。这些嵌套订阅会给我带来任何问题吗?我对使用 Observable.create 创建可观察对象并不完全熟悉,所以我想确保我没有做任何不寻常的事情或滥用它。提前谢谢!

abstract class NetworkResource<ApiType, DbType> constructor(private val schedulerProvider: SchedulerProvider) 

    abstract fun fetchFromApi(): Single<ApiType>
    abstract fun fetchFromDb(): Observable<Optional<DbType>>
    abstract fun saveToDb(apiType: ApiType?)
    abstract fun shouldFetchFromApi(cache: DbType?): Boolean

    fun fetch(): Observable<Optional<DbType>>  
        return Observable.create<Optional<DbType>> 
            val subscriber = it

            fetchFromDb()
                    .subscribe(
                        subscriber.onNext(it)

                        if(shouldFetchFromApi(it.get())) 
                            fetchFromApi()
                                    .observeOn(schedulerProvider.io())
                                    .map 
                                        saveToDb(it)
                                        it
                                    
                                    .observeOn(schedulerProvider.ui())
                                    .flatMapObservable 
                                        fetchFromDb()
                                    
                                    .subscribe(
                                        subscriber.onNext(it)
                                        subscriber.onComplete()
                                    )
                        
                        else 
                            subscriber.onComplete()
                        
                    )

        
    

【问题讨论】:

【参考方案1】:

是的,这会导致问题。

首先,像这样嵌套Observable 是不习惯的,Reactive 方法的优点之一是组合Observables,因此具有单个干净的流。用这种方式,你打破了链条,直接的结果是更难阅读的交织代码,以及更多用于连接通知事件的代码,基本上就像用Observable包装异步回调方法一样。 在这里,因为您已经有了响应式组件,您可以简单地组合它们,而不是用回调方法处理它们。

其次,由于链的中断,最直接和最直接的一个 - 取消订阅外部Observable 不会自动影响内部Observable。尝试添加subscribeOn() 也是如此,并且在背压很重要的不同场景中也适用。

另一种作曲方式可能是这样的:

fun fetch2(): Observable<Optional<DbType>> 
        return fetchFromDb()
                .flatMap 
                    if (shouldFetchFromApi(it.get())) 
                        fetchFromApi()
                                .observeOn(schedulerProvider.io())
                                .doOnSuccess  saveToDb(it) 
                                .observeOn(schedulerProvider.ui())
                                .flatMapObservable 
                                    fetchFromDb()
                                

                     else 
                        Observable.empty()
                    
                
    

如果出于某种原因,您无论如何都希望单独发出第一个 fetchFromDb() 结果,您也可以使用带有选择器的 publish() 来实现:

 fun fetch2(): Observable<Optional<DbType>> 
    return fetchFromDb()
            .publish 
                Observable.merge(it,
                        it.flatMap 
                            if (shouldFetchFromApi(it.get())) 
                                fetchFromApi()
                                        .observeOn(schedulerProvider.io())
                                        .doOnSuccess  saveToDb(it) 
                                        .observeOn(schedulerProvider.ui())
                                        .flatMapObservable 
                                            fetchFromDb()
                                        

                             else 
                                Observable.empty()
                            
                        )
            


【讨论】:

太棒了,谢谢!您能否举例说明使用 publish() 的含义? 示例添加了发布选择器,通过这种方式,我们将获得 fetchFromDb() 的原始结果和结果 flatMapped fetchFromDb 非常感谢您的帮助。

以上是关于RxJava Observable.create 包装可观察订阅的主要内容,如果未能解决你的问题,请参考以下文章

RxJava源码浅析

RxJava源码浅析

Rxjava 流程分析

Rxjava 流程分析

RxJava 操作符

RxJava中的Observable,在smartConfig中的应用