RxJava2.1.0:在不同线程上订阅时未调用 PublishSubject onNext
Posted
技术标签:
【中文标题】RxJava2.1.0:在不同线程上订阅时未调用 PublishSubject onNext【英文标题】:RxJava2.1.0 : PublishSubject onNext not invoked when subscribed on different threads 【发布时间】:2019-02-19 23:08:16 【问题描述】:发现在Subjects上调用onNext事件后10-20ms内订阅一个序列化的PublishSubject时;新订阅者的 onNext 没有被调用。
在下面的代码sn-p中;要观察 [1] 的值被指定为“2000”,并且在对值为 1998 [2] 的主题调用 onNext() 后调用 subscribeToSubject();我们看到,如果间隔为 10 毫秒,新订阅者将错过 Subject 触发的值 2000;然而,如果间隔为 50 毫秒或更大,那么新订阅者似乎收到了预期值;这是预期的行为吗?
这种行为出现在 RxJava 2.1.0 上;似乎是某种竞争条件
public class PublishSubjectTest
private final Subject<String> singlePropertyUpdateSubject =
PublishSubject.<String>create().toSerialized();
public static void main(String[] args)
PublishSubjectTest obj= new PublishSubjectTest();
obj.sendEvents();
try
Thread.currentThread().join();
catch (InterruptedException e)
e.printStackTrace();
//[1]
private final String valueToObserve = "2000";
private void subscribeToSubject()
System.out.println("Subscribing .....");
io.reactivex.Observable.range(1,10).subscribeOn(Schedulers.newThread()).subscribe(
value -> getAndObserve(valueToObserve).subscribe(observedValue -> System.out.println(" Value Received "+observedValue +" By "+Thread.currentThread() ))
);
private io.reactivex.Observable<String> getAndObserve(String value)
final io.reactivex.Observable<String> observable = singlePropertyUpdateSubject
//.doOnNext(v-> System.out.println("Received value "+v))
.filter(v -> v.equals(value))
.doOnSubscribe(c-> System.out.println("Consumer subscribed "+c));
return observable;
// 50ms >= expected result ; Anything less than 10ms will fail.
private void sendEvents()
io.reactivex.Observable.interval(10, TimeUnit.MILLISECONDS).subscribe(value ->
String key = value.toString();
//System.out.println("Adding key "+key);
singlePropertyUpdateSubject.onNext(key);
//[2]
if (value == 1998)
subscribeToSubject();;
if (value%100==0)
System.out.println(value);
);
【问题讨论】:
好的,问题出在 PublishSubject 上;使用 BehaviourSubject / ReplaySubject 似乎可以解决这个问题;使用 Replay/Behaviour Subject 时,应用程序调用 system.exit(1) 也存在问题;详细讨论提供github.com/ReactiveX/RxJava/issues/6414 我投票决定将此问题作为离题结束,因为 OP 由于不耐烦而将其三重发布到 RxJava 问题列表中。 OP 在那里收到了足够的反馈。 是的,请关闭它;这可能不是因为不耐烦,也许是问题被立即关闭的事实;一个已关闭的问题可能不认为有一个有效的通信线程。关于上述问题;看来 BehaviourSubject 似乎确实有问题;使用 ReplaySubject 似乎已经修复了它,RxJava 列表中的反馈非常有用。我不知道如何关闭这个问题,如果可以的话我会这样做。 我会自动关闭在 SO 和 RxJava 问题列表上交叉发布的问题。 很抱歉在 SO 上交叉发布;我想我在发原始帖子时错过了 GitHub 中的描述。我可以看到您在列表中提到了它。 【参考方案1】:好的,问题出在 PublishSubject 上;使用 ReplaySubject 似乎可以解决并发订阅的问题;使用 ReplaySubject 时调用 system.exit(1) 的测试代码也存在问题;详细讨论提供 github.com/ReactiveX/RxJava/issues/6414 –
请将此问题视为已解决。
【讨论】:
以上是关于RxJava2.1.0:在不同线程上订阅时未调用 PublishSubject onNext的主要内容,如果未能解决你的问题,请参考以下文章
处理 Observable.subscribe 的结果时未调用 OnNext
为 ce-worker-0 上的线程 'Worker 0 (UUID=AXInwYzVDFskptgsIgOb) 调用了两次负载,或者上次使用时未清除状态
EventBus事件通信框架 ( 发送事件 | 判断发布线程是否是主线程 | 子线程切换主线程 | 主线程切换子线程 )