RxJava - 即使调用 onNext() 如何阻止 PublishSubject 发布
Posted
技术标签:
【中文标题】RxJava - 即使调用 onNext() 如何阻止 PublishSubject 发布【英文标题】:RxJava - how to stop PublishSubject from publishing even if onNext() is called 【发布时间】:2019-01-25 01:43:42 【问题描述】:我查看了我正在调用以下内容:
// class member
var myPublishSubject = PublishSubject.create<SomeObservable>()
// later on in the class somewhere:
while(true)
myPublishSubject.onNext(someObservable)
我想停止发射,但让 while 循环永远继续下去。所以我希望 onNext 调用什么都不做。但我担心如果我调用 myPublishSubject.onComplete() 最终主题将为空,我将获得 NPE。即使 onNext() 被反复调用,是否还有只是为了让它静音。退订是最好的方法吗?
【问题讨论】:
我不明白你想要完成什么,也不明白while(true)
什么时候会终止。
我正在收听 zip 通话。 myobservable 发生变化的那一刻,我正在触发一个动作。但是在我完成订阅方法后,我不再需要主题。我想结束 onnext 通话但继续循环
【参考方案1】:
几点说明
这是一个非常罕见的案例,但如果您可以通过 Observable
向我们展示您的真实意图,我们可能会帮助您构建它,如果不是最好的,那就更好了。
你能做什么
对于我的示例,我只使用了一个非常简单的标志变量,可以根据项目的任何触发器进行更改。
选项 1
您可以直接在主题发布者上调用onComplete
val maxEmittedItemCount = 10
var currentEmittedItemCount = 0
val someStringValue = "Some observable" // Create whatever observable you have
val publishSubject = PublishSubject.create<String>()
publishSubject.subscribe(
currentEmittedItemCount++
println(it)
,
println(it)
)
while (currentEmittedItemCount != maxEmittedItemCount)
// Print indication that the loop is still running
println("Still looping")
// Publish value on the subject
publishSubject.onNext(someStringValue)
// Test flag for trigger
if (currentEmittedItemCount == maxEmittedItemCount) publishSubject.onComplete()
选项 2
您还可以持有对订阅的引用,然后在之后释放它,这比前一个更语义化,因为它会在资源释放时执行代码块而不调用onNext(t)
。
lateinit var disposable: Disposable // Will hold reference to the subscription
var maxEmittedItemCount = 10
var currentEmittedItemCount = 0
var someStringValue = "Some observable" // Create whatever observable you have
var publishSubject = PublishSubject.create<String>()
disposable = publishSubject.subscribeWith(object : DisposableObserver<String>()
override fun onComplete()
// Print indication of completion for the subject publisher
System.out.println("Complete")
override fun onNext(t: String)
// Test flag count synchonizer
currentEmittedItemCount++
// Print out current emitted item count
System.out.println(currentEmittedItemCount)
// Print current string
System.out.println(t)
override fun onError(e: Throwable)
// Print error
System.out.println(e)
)
while (currentEmittedItemCount != maxEmittedItemCount)
// Publish value on the subject
if (!disposable.isDisposed) publishSubject.onNext(someStringValue)
// Test flag for trigger
if (currentEmittedItemCount == maxEmittedItemCount)
publishSubject.onComplete() // optional if you need to invoke `onComplete()` block on the subject
disposable.dispose()
// Print indication that the loop is still running
System.out.println("Still looping")
了解更多
https://medium.com/@vanniktech/rxjava-2-disposable-under-the-hood-f842d2373e64 https://medium.com/@nazarivanchuk/types-of-subjects-in-rxjava-96f3a0c068e4 http://reactivex.io/RxJava/javadoc/io/reactivex/disposables/Disposable.html【讨论】:
我在您的选项 1 中担心的是,在调用 publishSubject.onComplete() 后主题将为空。然后我将在 while 循环迭代期间获得 NPE。我错了吗? 它永远不会返回 null,在您调用onComplete()
后,后台发生的事情是将订阅者的状态设置为 TERMINATED
。这是订阅者终止时的下一步检查。 if(subscribers.get() == TERMINATED) return
。基本上它告诉它只是跳过已经处理的订阅者。需要注意的一件事是,您不能同时调用 onComplete()
和 onError()
。选择一个,更多:github.com/ReactiveX/RxJava/issues/6181【参考方案2】:
由于观察者已订阅,我们必须调用 unsubscribe 以避免多次 onNext 调用。
我建议在subject.onNext()的工作完成后调用onComplete:
这是一个例子
PublishSubject<Integer> source = PublishSubject.create();
source.onNext(1);
source.onComplete();
source.subscribe(getObserver());
然后在观察者中,我们重新创建另一个 PublishSubject 实例
source.subscribe(new Observer<Boolean>()
@Override
public void onSubscribe(Disposable d)
@Override
public void onNext(Integer value)
@Override
public void onError(Throwable e)
@Override
public void onComplete()
source = PublishSubject.create();
);
希望这种方法对您的要求有所帮助
【讨论】:
以上是关于RxJava - 即使调用 onNext() 如何阻止 PublishSubject 发布的主要内容,如果未能解决你的问题,请参考以下文章
在 200 网络响应上调用 onError 而不是 onNext - RxJava,Retrofit
RxJava2.1.0:在不同线程上订阅时未调用 PublishSubject onNext
单元测试 RxJava onComplete、onNext 和 onError