RxJava Flowable.create(),如何尊重 subscribeOn() 线程
Posted
技术标签:
【中文标题】RxJava Flowable.create(),如何尊重 subscribeOn() 线程【英文标题】:RxJava Flowable.create(), how to respect subscribeOn() thread 【发布时间】:2019-05-26 05:42:36 【问题描述】:我正在将自定义库 (dataClient
) 回调 api 包装到 RxJava Flowable。 dataClient
使用自己的线程,所以它的回调是在自己的线程上调用的。
在我的 Rx 链中,我尝试使用 .subscribeOn(Schedulers.computation())
指定计算调度程序。尽管如此,当我在我的 Rx 链上打印线程名称时,我得到了我的 dataClient
线程。
我应该怎么做,让我的 Flowable 使用.subscribeOn()
中指定的线程?
Flowable.create( emitter ->
dataClient.setCallback(object : Callback
override fun message(message: DataModel)
emitter.onNext(vehicle)
override fun done()
emitter.onComplete()
)
emitter.setCancellable
dataClient.setCallback(null)
, BackpressureStrategy.BUFFER)
.subscribeOn(Schedulers.computation())
.doOnNext Log.e("DATA", Thread.currentThread().name)
.observeOn(androidSchedulers.mainThread())
.subscribe data -> Log.d("DATA", "Got data" + data.id))
【问题讨论】:
要将onNext
信号移动到另一个线程,请使用observeOn
。 subscribeOn
用于将订阅副作用移动到另一个线程,并且对拥有自己的发射线程的源几乎没有影响。
是的,observeOn
可以移动到另一个线程,但在这种情况下,subscribeOn
有什么用处?
除非setCallback
有特殊的线程需求,比如需要从UI或者一些工作线程中调用,否则没有。
【参考方案1】:
subscribeOn
调度程序确保订阅在相关线程上完成。订阅完全是发生的,它的处理方式与 observeOn
调度程序不同,后者在新线程上调度元素的发射。
Flowable.create( emitter ->
// this runs with the computation scheduler
dataClient.setCallback(object : Callback
override fun message(message: DataModel)
// this runs on the thread it's called from
emitter.onNext(vehicle)
override fun done()
// this runs on the thread it's called from
emitter.onComplete()
)
emitter.setCancellable
dataClient.setCallback(null)
, BackpressureStrategy.BUFFER)
.subscribeOn(Schedulers.computation())
.doOnNext
// this runs on the thread of the onNext call
Log.e("DATA", Thread.currentThread().name)
.observeOn(AndroidSchedulers.mainThread())
.subscribe
// this runs on the main thread
data -> Log.d("DATA", "Got data" + data.id))
由于您的订阅代码没有阻塞并且不维护发射线程,因此不需要设置subscribeOn
,可以省略。它主要只对同步源有效。
【讨论】:
通常当我获得 Rx api 时,我会根据需要设置subscribeOn
。对于这个例子subscribeOn
没有任何影响,我担心当其他开发人员使用我的api时,他们也会根据他们的需要设置subscribeOn
。这导致 Rx 链在他们期望的另一个线程上运行的问题。我应该如何解决这个问题?
不要使用subscribeOn
来满足这个需求。如果它之前已经在链上调用过,无论如何它都没有任何效果。只会考虑第一次调用。以上是关于RxJava Flowable.create(),如何尊重 subscribeOn() 线程的主要内容,如果未能解决你的问题,请参考以下文章