RxJava Observable.create 包装可观察订阅
Posted
技术标签:
【中文标题】RxJava Observable.create 包装可观察订阅【英文标题】:RxJava Observable.create wrapping observable subscriptions 【发布时间】:2017-11-14 03:31:19 【问题描述】:我使用 Observable.create 以便在某些数据可用时通知订阅者。我有点不确定在我的 create 方法中订阅 observables。这些嵌套订阅会给我带来任何问题吗?我对使用 Observable.create 创建可观察对象并不完全熟悉,所以我想确保我没有做任何不寻常的事情或滥用它。提前谢谢!
abstract class NetworkResource<ApiType, DbType> constructor(private val schedulerProvider: SchedulerProvider)
abstract fun fetchFromApi(): Single<ApiType>
abstract fun fetchFromDb(): Observable<Optional<DbType>>
abstract fun saveToDb(apiType: ApiType?)
abstract fun shouldFetchFromApi(cache: DbType?): Boolean
fun fetch(): Observable<Optional<DbType>>
return Observable.create<Optional<DbType>>
val subscriber = it
fetchFromDb()
.subscribe(
subscriber.onNext(it)
if(shouldFetchFromApi(it.get()))
fetchFromApi()
.observeOn(schedulerProvider.io())
.map
saveToDb(it)
it
.observeOn(schedulerProvider.ui())
.flatMapObservable
fetchFromDb()
.subscribe(
subscriber.onNext(it)
subscriber.onComplete()
)
else
subscriber.onComplete()
)
【问题讨论】:
【参考方案1】:是的,这会导致问题。
首先,像这样嵌套Observable
是不习惯的,Reactive 方法的优点之一是组合Observables
,因此具有单个干净的流。用这种方式,你打破了链条,直接的结果是更难阅读的交织代码,以及更多用于连接通知事件的代码,基本上就像用Observable
包装异步回调方法一样。
在这里,因为您已经有了响应式组件,您可以简单地组合它们,而不是用回调方法处理它们。
其次,由于链的中断,最直接和最直接的一个 - 取消订阅外部Observable
不会自动影响内部Observable
。尝试添加subscribeOn()
也是如此,并且在背压很重要的不同场景中也适用。
另一种作曲方式可能是这样的:
fun fetch2(): Observable<Optional<DbType>>
return fetchFromDb()
.flatMap
if (shouldFetchFromApi(it.get()))
fetchFromApi()
.observeOn(schedulerProvider.io())
.doOnSuccess saveToDb(it)
.observeOn(schedulerProvider.ui())
.flatMapObservable
fetchFromDb()
else
Observable.empty()
如果出于某种原因,您无论如何都希望单独发出第一个 fetchFromDb()
结果,您也可以使用带有选择器的 publish()
来实现:
fun fetch2(): Observable<Optional<DbType>>
return fetchFromDb()
.publish
Observable.merge(it,
it.flatMap
if (shouldFetchFromApi(it.get()))
fetchFromApi()
.observeOn(schedulerProvider.io())
.doOnSuccess saveToDb(it)
.observeOn(schedulerProvider.ui())
.flatMapObservable
fetchFromDb()
else
Observable.empty()
)
【讨论】:
太棒了,谢谢!您能否举例说明使用 publish() 的含义? 示例添加了发布选择器,通过这种方式,我们将获得 fetchFromDb() 的原始结果和结果 flatMapped fetchFromDb 非常感谢您的帮助。以上是关于RxJava Observable.create 包装可观察订阅的主要内容,如果未能解决你的问题,请参考以下文章