RxJava - 即使调用 onNext() 如何阻止 PublishSubject 发布

Posted

技术标签:

【中文标题】RxJava - 即使调用 onNext() 如何阻止 PublishSubject 发布【英文标题】:RxJava - how to stop PublishSubject from publishing even if onNext() is called 【发布时间】:2019-01-25 01:43:42 【问题描述】:

我查看了我正在调用以下内容:

// class member
var myPublishSubject = PublishSubject.create<SomeObservable>()
// later on in the class somewhere:    
while(true)
   myPublishSubject.onNext(someObservable)

我想停止发射,但让 while 循环永远继续下去。所以我希望 onNext 调用什么都不做。但我担心如果我调用 myPublishSubject.onComplete() 最终主题将为空,我将获得 NPE。即使 onNext() 被反复调用,是否还有只是为了让它静音。退订是最好的方法吗?

【问题讨论】:

我不明白你想要完成什么,也不明白 while(true) 什么时候会终止。 我正在收听 zip 通话。 myobservable 发生变化的那一刻,我正在触发一个动作。但是在我完成订阅方法后,我不再需要主题。我想结束 onnext 通话但继续循环 【参考方案1】:

几点说明

这是一个非常罕见的案例,但如果您可以通过 Observable 向我们展示您的真实意图,我们可能会帮助您构建它,如果不是最好的,那就更好了。

你能做什么

对于我的示例,我只使用了一个非常简单的标志变量,可以根据项目的任何触发器进行更改。

选项 1

您可以直接在主题发布者上调用onComplete

val maxEmittedItemCount = 10
var currentEmittedItemCount = 0
val someStringValue = "Some observable" // Create whatever observable you have
val publishSubject = PublishSubject.create<String>()

publishSubject.subscribe(
    currentEmittedItemCount++
    println(it)
, 
    println(it)
)

while (currentEmittedItemCount != maxEmittedItemCount) 
    // Print indication that the loop is still running
    println("Still looping")

    // Publish value on the subject
    publishSubject.onNext(someStringValue)

    // Test flag for trigger
    if (currentEmittedItemCount == maxEmittedItemCount) publishSubject.onComplete()

选项 2

您还可以持有对订阅的引用,然后在之后释放它,这比前一个更语义化,因为它会在资源释放时执行代码块而不调用onNext(t)

lateinit var disposable: Disposable // Will hold reference to the subscription
var maxEmittedItemCount = 10
var currentEmittedItemCount = 0
var someStringValue = "Some observable" // Create whatever observable you have
var publishSubject = PublishSubject.create<String>()

disposable = publishSubject.subscribeWith(object : DisposableObserver<String>() 
    override fun onComplete() 
        // Print indication of completion for the subject publisher
        System.out.println("Complete")
    

    override fun onNext(t: String) 
        // Test flag count synchonizer
        currentEmittedItemCount++

        // Print out current emitted item count
        System.out.println(currentEmittedItemCount)

        // Print current string
        System.out.println(t)
    

    override fun onError(e: Throwable) 
        // Print error
        System.out.println(e)
    
)

while (currentEmittedItemCount != maxEmittedItemCount) 
    // Publish value on the subject
    if (!disposable.isDisposed) publishSubject.onNext(someStringValue)

    // Test flag for trigger
    if (currentEmittedItemCount == maxEmittedItemCount) 
        publishSubject.onComplete() // optional if you need to invoke `onComplete()` block on the subject
        disposable.dispose()
    

    // Print indication that the loop is still running
    System.out.println("Still looping")

了解更多

https://medium.com/@vanniktech/rxjava-2-disposable-under-the-hood-f842d2373e64 https://medium.com/@nazarivanchuk/types-of-subjects-in-rxjava-96f3a0c068e4 http://reactivex.io/RxJava/javadoc/io/reactivex/disposables/Disposable.html

【讨论】:

我在您的选项 1 中担心的是,在调用 publishSubject.onComplete() 后主题将为空。然后我将在 while 循环迭代期间获得 NPE。我错了吗? 它永远不会返回 null,在您调用 onComplete() 后,后台发生的事情是将订阅者的状态设置为 TERMINATED。这是订阅者终止时的下一步检查。 if(subscribers.get() == TERMINATED) return。基本上它告诉它只是跳过已经处理的订阅者。需要注意的一件事是,您不能同时调用 onComplete()onError()。选择一个,更多:github.com/ReactiveX/RxJava/issues/6181【参考方案2】:

由于观察者已订阅,我们必须调用 unsubscribe 以避免多次 onNext 调用。

我建议在subject.onNext()的工作完成后调用onComplete

这是一个例子

PublishSubject<Integer> source = PublishSubject.create();

source.onNext(1);
source.onComplete();

source.subscribe(getObserver());

然后在观察者中,我们重新创建另一个 PublishSubject 实例

 source.subscribe(new Observer<Boolean>() 
                                        @Override
                                        public void onSubscribe(Disposable d) 

                                    

                                    @Override
                                    public void onNext(Integer value) 

                                    

                                    @Override
                                    public void onError(Throwable e) 

                                    

                                    @Override
                                    public void onComplete() 
                                        source = PublishSubject.create();
                                    
                                );

希望这种方法对您的要求有所帮助

【讨论】:

以上是关于RxJava - 即使调用 onNext() 如何阻止 PublishSubject 发布的主要内容,如果未能解决你的问题,请参考以下文章

在 200 网络响应上调用 onError 而不是 onNext - RxJava,Retrofit

RxJava2.1.0:在不同线程上订阅时未调用 PublishSubject onNext

单元测试 RxJava onComplete、onNext 和 onError

Android - 框架之RxJava的使用

RxJava使用filter时如何知道是否所有的item都被过滤了?

RxJava整合Retrofit遇到的问题总结