RxJava2.1.0:在不同线程上订阅时未调用 PublishSubject onNext

Posted

技术标签:

【中文标题】RxJava2.1.0:在不同线程上订阅时未调用 PublishSubject onNext【英文标题】:RxJava2.1.0 : PublishSubject onNext not invoked when subscribed on different threads 【发布时间】:2019-02-19 23:08:16 【问题描述】:

发现在Subjects上调用onNext事件后10-20ms内订阅一个序列化的PublishSubject时;新订阅者的 onNext 没有被调用。

在下面的代码sn-p中;要观察 [1] 的值被指定为“2000”,并且在对值为 1998 [2] 的主题调用 onNext() 后调用 subscribeToSubject();我们看到,如果间隔为 10 毫秒,新订阅者将错过 Subject 触发的值 2000;然而,如果间隔为 50 毫秒或更大,那么新订阅者似乎收到了预期值;这是预期的行为吗?

这种行为出现在 RxJava 2.1.0 上;似乎是某种竞争条件

public class PublishSubjectTest 

    private final Subject<String> singlePropertyUpdateSubject =
            PublishSubject.<String>create().toSerialized();


    public static void main(String[] args) 
        PublishSubjectTest obj= new PublishSubjectTest();
        obj.sendEvents();
        try 
            Thread.currentThread().join();
         catch (InterruptedException e) 
            e.printStackTrace();
        

    


//[1]
    private final String valueToObserve = "2000";
    private void subscribeToSubject() 
        System.out.println("Subscribing .....");
        io.reactivex.Observable.range(1,10).subscribeOn(Schedulers.newThread()).subscribe(
                value -> getAndObserve(valueToObserve).subscribe(observedValue -> System.out.println("  Value Received   "+observedValue +" By "+Thread.currentThread() ))
        );



    

    private io.reactivex.Observable<String> getAndObserve(String value) 
        final io.reactivex.Observable<String> observable = singlePropertyUpdateSubject
                //.doOnNext(v-> System.out.println("Received value "+v))
                .filter(v -> v.equals(value))
                .doOnSubscribe(c-> System.out.println("Consumer subscribed "+c));
        return observable;
    


// 50ms >= expected result ;  Anything less than 10ms will fail.
    private void sendEvents() 
        io.reactivex.Observable.interval(10, TimeUnit.MILLISECONDS).subscribe(value -> 
            String key = value.toString();
            //System.out.println("Adding key "+key);
            singlePropertyUpdateSubject.onNext(key);
//[2]           
 if (value == 1998)
                subscribeToSubject();;
            
            if (value%100==0) 
                System.out.println(value);
            

        );
    

【问题讨论】:

好的,问题出在 PublishSubject 上;使用 BehaviourSubject / ReplaySubject 似乎可以解决这个问题;使用 Replay/Behaviour Subject 时,应用程序调用 system.exit(1) 也存在问题;详细讨论提供github.com/ReactiveX/RxJava/issues/6414 我投票决定将此问题作为离题结束,因为 OP 由于不耐烦而将其三重发布到 RxJava 问题列表中。 OP 在那里收到了足够的反馈。 是的,请关闭它;这可能不是因为不耐烦,也许是问题被立即关闭的事实;一个已关闭的问题可能不认为有一个有效的通信线程。关于上述问题;看来 BehaviourSubject 似乎确实有问题;使用 ReplaySubject 似乎已经修复了它,RxJava 列表中的反馈非常有用。我不知道如何关闭这个问题,如果可以的话我会这样做。 我会自动关闭在 SO 和 RxJava 问题列表上交叉发布的问题。 很抱歉在 SO 上交叉发布;我想我在发原始帖子时错过了 GitHub 中的描述。我可以看到您在列表中提到了它。 【参考方案1】:

好的,问题出在 PublishSubject 上;使用 ReplaySubject 似乎可以解决并发订阅的问题;使用 ReplaySubject 时调用 system.exit(1) 的测试代码也存在问题;详细讨论提供 github.com/ReactiveX/RxJava/issues/6414 –

请将此问题视为已解决。

【讨论】:

以上是关于RxJava2.1.0:在不同线程上订阅时未调用 PublishSubject onNext的主要内容,如果未能解决你的问题,请参考以下文章

RxJava 观察调用/订阅线程

客户端加入时未调用事件“连接”

处理 Observable.subscribe 的结果时未调用 OnNext

MFC 工作线程在意外关闭时未清理

为 ce-worker-0 上的线程 'Worker 0 (UUID=AXInwYzVDFskptgsIgOb) 调用了两次负载,或者上次使用时未清除状态

EventBus事件通信框架 ( 发送事件 | 判断发布线程是否是主线程 | 子线程切换主线程 | 主线程切换子线程 )