RxJava Flowable.create(),如何尊重 subscribeOn() 线程

Posted

技术标签:

【中文标题】RxJava Flowable.create(),如何尊重 subscribeOn() 线程【英文标题】:RxJava Flowable.create(), how to respect subscribeOn() thread 【发布时间】:2019-05-26 05:42:36 【问题描述】:

我正在将自定义库 (dataClient) 回调 api 包装到 RxJava Flowable。 dataClient 使用自己的线程,所以它的回调是在自己的线程上调用的。

在我的 Rx 链中,我尝试使用 .subscribeOn(Schedulers.computation()) 指定计算调度程序。尽管如此,当我在我的 Rx 链上打印线程名称时,我得到了我的 dataClient 线程。

我应该怎么做,让我的 Flowable 使用.subscribeOn() 中指定的线程?

Flowable.create( emitter ->
    dataClient.setCallback(object : Callback 
        override fun message(message: DataModel) 
            emitter.onNext(vehicle)
        

        override fun done() 
            emitter.onComplete()
        
    )
    emitter.setCancellable 
        dataClient.setCallback(null)
    
, BackpressureStrategy.BUFFER)
    .subscribeOn(Schedulers.computation())
    .doOnNext  Log.e("DATA", Thread.currentThread().name) 
    .observeOn(androidSchedulers.mainThread())
    .subscribe  data -> Log.d("DATA", "Got data" + data.id)) 

【问题讨论】:

要将onNext 信号移动到另一个线程,请使用observeOnsubscribeOn 用于将订阅副作用移动到另一个线程,并且对拥有自己的发射线程的源几乎没有影响。 是的,observeOn 可以移动到另一个线程,但在这种情况下,subscribeOn 有什么用处? 除非setCallback有特殊的线程需求,比如需要从UI或者一些工作线程中调用,否则没有。 【参考方案1】:

subscribeOn 调度程序确保订阅在相关线程上完成。订阅完全是发生的,它的处理方式与 observeOn 调度程序不同,后者在新线程上调度元素的发射。

Flowable.create( emitter ->
    // this runs with the computation scheduler
    dataClient.setCallback(object : Callback 
        override fun message(message: DataModel) 
            // this runs on the thread it's called from
            emitter.onNext(vehicle)
        

        override fun done() 
            // this runs on the thread it's called from
            emitter.onComplete()
        
    )
    emitter.setCancellable 
        dataClient.setCallback(null)
    
, BackpressureStrategy.BUFFER)
    .subscribeOn(Schedulers.computation())
    .doOnNext 
        // this runs on the thread of the onNext call
        Log.e("DATA", Thread.currentThread().name)
    
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe 
        // this runs on the main thread
        data -> Log.d("DATA", "Got data" + data.id))
    

由于您的订阅代码没有阻塞并且不维护发射线程,因此不需要设置subscribeOn,可以省略。它主要只对同步源有效。

【讨论】:

通常当我获得 Rx api 时,我会根据需要设置subscribeOn。对于这个例子subscribeOn没有任何影响,我担心当其他开发人员使用我的api时,他们也会根据他们的需要设置subscribeOn。这导致 Rx 链在他们期望的另一个线程上运行的问题。我应该如何解决这个问题? 不要使用subscribeOn 来满足这个需求。如果它之前已经在链上调用过,无论如何它都没有任何效果。只会考虑第一次调用。

以上是关于RxJava Flowable.create(),如何尊重 subscribeOn() 线程的主要内容,如果未能解决你的问题,请参考以下文章

RxJava系列6(从微观角度解读RxJava源码)

RxJava使用详解

Android RxJava使用介绍 RxJava的操作符

RxJava核心思想(看懂再学RxJava)

Rxjava 整理(未完)

深入浅出RxJava