RXJava2 管理订阅

Posted

技术标签:

【中文标题】RXJava2 管理订阅【英文标题】:RXJava2 manage subscriptions 【发布时间】:2018-01-31 22:38:23 【问题描述】:

我需要澄清一下使用 RxJava2 管理特定场景的最佳方法是什么(整个应用程序结构都基于它):

在我的应用程序中,很多人可以在同一个文档中进行更改,因此我需要将每个更改传递给查看该文档的每个人。但是这个对象非常复杂和沉重,所以我需要在最后一个人关闭它时将它从内存中删除。还有更多:该文档可以是另一个文档的子文档,因此父文档中的每个更改都必须发送给所有子文档。

到目前为止我所做的:我创建了一个管理器,因此每个文档请求都会发送到它。每当有人需要处理文档时,我会在地图中查看文档是否已打开。如果不是,我创建一个 BaseDocument 实例,它接收来自文档的数据和一个 PublishSubject 来分发事件并添加到该地图。然后我在 PublishSubject 上订阅用户的 Observer 以获取更改。每当用户需要更改某些内容时,它将更改发送到 BaseDocument,它进行更改并通过 onNext() 将新版本发送给每个人。到目前为止一切顺利。

我的问题是我无法控制何时有人处置文档观察者,因此我无法控制何时不再需要该文档,因此我可以保留任何未保存的更改并销毁该对象。除了“hasObservers()”之外,我找不到任何订阅列表或类似的东西,我不想添加一个计时器来轮询它以在一切完成后关闭。

我的“神奇答案”将是在最后一个订阅者处置时调用的回调,所以我可以打扫房子并将整个对象扔掉,但我找不到这样的东西。那么,我该如何管理订阅呢?

【问题讨论】:

您可以将调制后的PublishSubjectObservable 共享为doOnDisposeshare,这样您就会在最后一个观察者处置时被调用。 【参考方案1】:

其中一种方法是计算订阅和处置的数量。如果数字达到 0,则删除文档。它看起来像这样:

int numberOfSubscribers = 0;

...

public Observable<T> expose()
    return subject.asObservable()
        .doOnSubscribe(()-> numberOfSubscribers++)
        .doOnDispose(()-> 
            numberOfSubscribers--;
            if (numberOfSubscribers == 0)
              //remove the object
             
        );

当然需要在这里添加对并发问题的支持(synchronized/atomic int),这只是一个草稿。

希望这会有所帮助:)

【讨论】:

优秀。这有助于 100%。谢谢!

以上是关于RXJava2 管理订阅的主要内容,如果未能解决你的问题,请参考以下文章

RxJava2.1.0:在不同线程上订阅时未调用 PublishSubject onNext

RxJava2 中多种取消订阅 dispose 的方法梳理( 源码分析 )

RxJava2:如何在处置订阅者后避免InterruptibleException?

RxJava2.0的使用详解

andorid jar/库源码解析之RxJava2

Rxjava2 可连接的Observable(ConnectableObservable)操作详解及实例