RxSwift - PublishSubject - 忽略错误并继续订阅(不要处置)

Posted

技术标签:

【中文标题】RxSwift - PublishSubject - 忽略错误并继续订阅(不要处置)【英文标题】:RxSwift - PublishSubject - Ignore error and continue subscription (do not dispose) 【发布时间】:2021-11-23 05:24:57 【问题描述】:

我找不到足够的例子来说明如何做到这一点。

本质上,我们有一个PublishSubject,它只是一个传递;我们没有对其进行任何操作。我们相信,如果有任何错误,我们会丢弃未来的事件,因为它只是一个转发它作为订阅信号接收的值,不知道为什么会出现错误。

Assuming if it is the case(对 Q 的第二条评论),我们如何忽略主题上的任何错误并保持订阅有效?

如果没有选项,有没有办法创建一个新订阅onError 并继续收听未来的.next()

let pubSubj = PublishSubject<String>()
let obs = pubSubj
    .observeOn(ConcurrentDispatchQueueScheduler(qos: .background))
    .catchError  (err) -> Observable<String> in
        print("***** catchError \(err)\n")
        return .never() // expecting this not to terminate the subscription
    

print("***** obs: \(obs)\n")


let dispose = obs.subscribe(onNext:  (str) in
    print("***** received onNext: \(str)\n")
, onError:  (err) in
    print("***** received onErr: \(err)\n")
, onCompleted: 
    print("***** completed\n")
, onDisposed: 
    print("***** onDisposed\n")
)
                    
pubSubj.on(.next("2"))
pubSubj.onError(RxError.overflow) //emits error and terminates
            
pubSubj.on(.next("3")) //is not received, how can we keep from getting disposed

输出:

***** obs: RxSwift.(unknown context at $1124f80f0).Catch<Swift.String>
***** received onNext: 2
***** catchError Arithmetic overflow occurred.

其他 Rx 实现似乎有 onErrorResumeNext,RxSwift 不支持。

但有答案指向 .catchError,但从我们的示例中,它仍然在错误后处理并且我们没有收到“3

【问题讨论】:

旁注,您在其中写道“发出错误并终止”。事实并非如此。请注意,onDisposed 未被调用。通过在 catch 中返回 never(),您可以确保订阅不会终止,而且它永远不会发出另一个值。 [system.reactive] 是一个 .NET 标签 谢谢@DanielT 我没有注意到区别 @Enigmativity 移除了标签,谢谢 【参考方案1】:

在响应式流中收到错误后,您将无法接收任何事件,因为该流将完成,这对于包括 RxSwift 在内的所有响应式库都是如此。 catch 将使用您从闭包返回的Observable 继续流,但外部流无法再次发出:

  let pubSubj = PublishSubject<String>()
  let recover = PublishSubject<String>()
  let obs = pubSubj
    .catch  _ in
      recover
    

  print("***** obs: \(obs)\n")


  let dispose = obs.subscribe(onNext:  (str) in
    print("***** received onNext: \(str)\n")
  , onError:  err in
    print("***** received onErr: \(err)\n")
  , onCompleted: 
    print("***** completed\n")
  , onDisposed: 
    print("***** onDisposed\n")
  )
                      
  pubSubj.onNext("2")
  pubSubj.onError(RxError.overflow)
  pubSubj.onNext("3") // Will not be sent
  recover.onNext("3") // Will be sent

您有几种选择来实现您的需求:

    不要在您的PublishSubject 中提供错误。基本上pubSubj.onError(RxError.overflow) 不应该发生。 如果您需要在PublishSubject 中提供错误,那么您可以实现它。这样,错误将作为 next 事件发送,并且流将无法完成,例如:
  let pubSubj = PublishSubject<Event<String>>()
  let obs = pubSubj

  print("***** obs: \(obs)\n")


  let dispose = obs.subscribe(onNext:  (str) in
      print("***** received onNext: \(str)\n")
  , onCompleted: 
      print("***** completed\n")
  , onDisposed: 
      print("***** onDisposed\n")
  )
                      
  pubSubj.onNext(.next("2"))
  pubSubj.onNext(.error(RxError.overflow))
  pubSubj.onNext(.next("3"))

【讨论】:

以上是关于RxSwift - PublishSubject - 忽略错误并继续订阅(不要处置)的主要内容,如果未能解决你的问题,请参考以下文章

RxSwift 中的 PublishSubject 和 PublishRelay 有啥区别?

RxSwift - PublishSubject - 忽略错误并继续订阅(不要处置)

正在处理 RxSwift PublishSubject

RxSwift:将 PublishSubject 绑定到多个按钮并接收发送者

在 RxSwift PublishSubject 上完成后订阅最后一个值

如何在按钮点击时观察 PublishSubject