RxSwift - 确定 Observable 是不是已被释放

Posted

技术标签:

【中文标题】RxSwift - 确定 Observable 是不是已被释放【英文标题】:RxSwift - Determining whether an Observable has been disposedRxSwift - 确定 Observable 是否已被释放 【发布时间】:2021-11-29 20:11:56 【问题描述】:

我正在尝试获取Publisher,它将Observables 出售给它的客户Consumer,以确定它的一位消费者何时处置了它的Observable

烦人。我的代码运行良好,直到我从 Consumer 代码中删除了 RxSwift .debug

有没有其他方法可以让我正常工作?

private class Subscriber 
    var ids: [Int]
    // This property exists so I can watch whether the observable has 
    // gone nil (which I though would happen when its been disposed, but it 
    // seems to happen immediately)
    weak var observable: Observable<[Updates]>?

class Publisher 
    private let relay: BehaviorRelay<[Int: Updates]> 
    private var subscribers: [Subscriber] = []

    func updatesStream(for ids: [Int]) -> Observable<[Updates]> 
        let observable = relay
           .map  map in
               return map
                   .filter  ids.contains($0.key) 
                   .map  $0.value 
           
           .filter  !$0.isEmpty 
           .asObservable()

        let subscriber = Subscriber(ids: ids, observable: observable)
        subscribers.append(subscriber)
        return observable
    

    private func repeatTimer() 
        let updates: [Updates] = ....

        // I need to be able to determine at this point whether the subscriber's 
        // observable has been disposed of, so I can remove it from the list of
        // subscribers. However `subscriber.observable` is always nil here.
        // PS: I am happy for this to happen before the `repeatTimer` func fires
        subscribers.remove(where:  subscriber in
            return subscriber.observable == nil
        )

        relay.accept(updates)
    


class Client 

    private var disposeBag: DisposeBag?
    private let publisher = Publisher()

    func startWatching() 
        let disposeBag = DisposeBag()
        self.disposeBag = disposeBag

        publisher
            // with the `.debug` below things work OK, without it the 
            ///`Publisher.Subscriber.observable` immediately becomes nil
            //.debug("Consumer") 
            .updatesStream(for: [1, 2, 3])
            .subscribe(onNext:  values in
                print(values)
            )
           .disposed(by: disposeBag)
    

    func stopWatching() 
        disposeBag = nil
    

【问题讨论】:

您应该学会接受 FRP 的功能特性。可观察类型应始终使用let 声明,从不使用var,尤其是弱变量。我不确定您调用 repeatTimer 的上下文,但该函数似乎与 Observables 的使用方式背道而驰……也许如果您提供更多上下文,我们中的一个可以帮助您构建更适合的抽象范式。 感谢您的回复。 repeatTimer 实际上并不是生产代码中的东西,我只是添加了这个来证明外部更新可以在随机时间和未知来源出现。虽然我尊重您采用 FRP 的建议,但不幸的是,这不是我可以在这种情况下更改的代码。 这里的问题是 Observables 永远不会被释放。订阅被释放,任何特定的 Observable 上都可能有很多订阅。因此,即使问“一个 Observable 是否已被处理”也没有任何意义。同样,我们需要更多上下文来提供解决方案。 【参考方案1】:

我认为这是一个非常糟糕的主意,但它解决了所要求的问题...如果我必须将此代码放在我的一个项目中,我会非常担心竞争条件...

struct Subscriber 
    let ids: [Int]
    var subscribeCount: Int = 0
    let lock = NSRecursiveLock()


class Publisher 
    private let relay = BehaviorRelay<[Int: Updates]>(value: [:])
    private var subscribers: [Subscriber] = []

    func updatesStream(for ids: [Int]) -> Observable<[Updates]> 
        var subscriber = Subscriber(ids: ids)
        let observable = relay
            .map  map in
                return map
                    .filter  ids.contains($0.key) 
                    .map  $0.value 
            
            .filter  !$0.isEmpty 
            .do(
                onSubscribe: 
                    subscriber.lock.lock()
                    subscriber.subscribeCount += 1
                    subscriber.lock.unlock()
                ,
                onDispose: 
                    subscriber.lock.lock()
                    subscriber.subscribeCount -= 1
                    subscriber.lock.unlock()
                )
                .asObservable()

                subscribers.append(subscriber)
                return observable
                

    private func repeatTimer() 
        subscribers.removeAll(where:  subscriber in
            subscriber.subscribeCount == 0
        )
    

【讨论】:

以上是关于RxSwift - 确定 Observable 是不是已被释放的主要内容,如果未能解决你的问题,请参考以下文章

RxSwift:延迟 observable 直到另一个 observable 完成?

RxSwift - 一个 Observable 中的多个 Observable 值

Observable 的 RxSwift 用途

如何检查 RXSwift 中的 observable 是不是为空?

RxSwift:如何让一个 observable 触发另一个?

如何在 RxSwift 中取消订阅 Observable?