在转发时发出进度项会导致单个 observable

Posted

技术标签:

【中文标题】在转发时发出进度项会导致单个 observable【英文标题】:Emitting progress items while forwarding result in a single observable 【发布时间】:2018-01-19 20:04:38 【问题描述】:

我通过 REST API 分两步启动操作:

    开始操作并返回一个任务ID 使用给定 ID 轮询任务并在操作返回完成时完成序列。

轮询任务 id 将返回 202 表示操作仍在进行中,完成时返回 200。任何其他代码都是错误的。

我需要将每个步骤的响应传达给订阅者。

以前,我会让 do 运算符在步骤之间将响应推送到 ReplaySubject。

startReboot()
  .do(onNext:  response in
     operationStatus.next(response)
  )
  .flatMap( response in
     // If we could not get the task ID from the response we error
     guard let taskID = getTaskIDFromJSON(response) else  return Observable.error(API.serverError) 

     return Observable.just(taskID)
  )
  .flatMap( taskID in
      return pollTask(withID: taskID) // internally, it uses retryWhen to check the api again with a five second delay
  )
  .do(onNext:  response in
     operationStatus.next(response)
  )
  .subscribe(onError:  _ in
     showOperationFailedIcon()
  , onCompleted: 
     showOperationCompleteIcon()
  )

在其他地方,该主题的订阅者会执行以下操作:

operationStatus.subscribe(onNext:  response
    showResponse(response)
)

所以本质上我是在同时展示操作的进度和我们从每个步骤中得到的响应。

当时我对 Rx 并不熟悉,无法提出更清洁的解决方案。但是现在我已经熟悉了它,在我看来应该有一个解决方案,我们不使用副作用并将其包含在一个最终的 observable 中。不过,我还是找不到办法。

我在想这样的事情:

let opObs = startReboot()
let pollingObs = pollTask(/* where does the id come from? */)
Observable.concat(opObs, pollingObs)
   .subscribe(onNext :  response in
     showResponse(response)
   , onError:  _ in
     showOperationFailedIcon()
   , onCompleted: 
     showOperationCompleteIcon()
   )

但这意味着一旦 opObs 完成,我需要将任务 ID 保存在 monad 外部的变量中,并在启动时包装 pollingObs 以获取它 - 再次引入副作用。

是否有一个运算符或运算符组合可用于向订阅者发出每个步骤的响应并将其传递给另一个 observable?

【问题讨论】:

【参考方案1】:

这样的事情应该可以工作。请注意使用 share() 以避免触发两次 startReboot 序列。

let opObs = startReboot().share()
let pollingObs = opObs.flatMap 
    guard let taskID = getTaskIDFromJSON(response) else  return Observable.error(API.serverError)

    return pollTask(withID: taskID)

Observable.concat(opObs, pollingObs)
   .subscribe(onNext :  response in
     showResponse(response)
   , onError:  _ in
     showOperationFailedIcon()
   , onCompleted: 
     showOperationCompleteIcon()
   )

【讨论】:

以上是关于在转发时发出进度项会导致单个 observable的主要内容,如果未能解决你的问题,请参考以下文章

RxSwift。结合最新。并不是所有的 observables 都发出了

如何使用 rxswift 从数组中发出单个值?

插入多行时重复项会发生什么?

在托管 bean 构造函数中访问注入的依赖项会导致 NullPointerException

在发出两个 observable 的第一个值后,Zip 没有发出值

添加 Firestore 依赖项会导致 Flutter 应用程序出错