如何实现串行网络调用队列然后在 RxSwift 中处理?

Posted

技术标签:

【中文标题】如何实现串行网络调用队列然后在 RxSwift 中处理?【英文标题】:How to implement a queue of serial network calls then processing in RxSwift? 【发布时间】:2017-09-12 10:40:27 【问题描述】:

我正在开发一个应用程序,我想使用 RxSwift 和 RxCocoa 实现以下目标

    将包含 url 的 JSON 下载到 X 个文件 下载文件1,处理文件1 下载文件2,处理文件2 下载文件3,处理文件3

...等

这里的关键是每个文件的处理都必须在下载下一个文件之前完成。至少文件处理的顺序必须按顺序执行。如果我可以在文件 1 正在处理的同时开始下载文件 2,那就太棒了,但不是必需的。

我尝试使用 SerialDispatchQueueScheduler 来完成这项工作,但由于文件大小不同,每个文件的下载在不同时间完成,因此处理代码的触发顺序与我开始下载的顺序不同。

我可以通过使用 NSOperations 等在不使用 Rx 的情况下轻松实现这一点,但我想继续在这个应用程序中使用 Rx,因为它是我在这个应用程序的其他地方使用的。

下面我在一些代码中包含了一个 sn-p。为了这个问题,已经添加了评论。

       .flatMap  [unowned self] (tasks: [DiffTask]) -> Observable<ApplyDiffStatus> in
            return Observable.from(tasks)
                .observeOn(self.backgroundScheduler) // ***: backgroundScheduler is a SerialDispatchQueueScheduler
                .flatMapWithIndex( [unowned self] (task, index) in
                    return self.fetchDiff(for: task, taskIndex: index, taskCount: tasks.count) // ***: Downloads a file from a URL
                )
                .catchError( (error) -> Observable<DictionaryUpdater.DiffTaskProgress> in
                    observable.onError(error)
                    throw error
                )
                .map( (diffTask : DiffTaskProgress) -> DiffTaskProgress.Progress in
                    // Stack Overflow: I've wrapped much of the progress observable in a Observable<UpdateProgress>
                    switch diffTask.progress 
                    case .started(currentTask: let currentTask, taskCount: let taskCount):
                        observable.on(.next(.fetchingDiff(progress: diffTask, currentDiff: currentTask, diffCount: taskCount)))
                    case .finished(data: _, currentTask: let currentTask, taskCount: let taskCount):
                        observable.on(.next(.fetchingDiff(progress: diffTask, currentDiff: currentTask, diffCount: taskCount)))
                    case .progress(completion: _, currentTask: let currentTask, taskCount: let taskCount):
                        observable.on(.next(.fetchingDiff(progress: diffTask, currentDiff: currentTask, diffCount: taskCount)))
                    

                    return diffTask.progress
                )
                .flatMap( [unowned self] (progress: DiffTaskProgress.Progress) -> Observable<ApplyDiffStatus> in
                    switch progress 
                    case .finished(data: let data, currentTask: let currentTask, taskCount: let taskCount):
                        return self.applyDiff(data, currentTask: currentTask, taskCount: taskCount) // ***: PROCESSES THE FILE THAT WAS DOWNLOADED
                    default:
                        return Observable.empty()
                    
                )
        

【问题讨论】:

【参考方案1】:

我设法通过使用concatMap 运算符解决了这个问题。所以不是

.flatMapWithIndex( [unowned self] (task, index) in
    return self.fetchDiff(for: task, taskIndex: index, taskCount: tasks.count) // ***: Downloads a file from a URL
)

我做了这样的事情:

tasks.enumerated().concatMap  (index, task) in
    return self.fetchDiff(for: task, taskIndex: index, taskCount: tasks.count)

concatMap 操作符确保第一个 observable 在发出更多信号之前完成。我不得不使用enumerated(),因为 concatMap 没有 concatMapWithIndex,但它可以工作:)

【讨论】:

以上是关于如何实现串行网络调用队列然后在 RxSwift 中处理?的主要内容,如果未能解决你的问题,请参考以下文章

Swift 4:创建一个异步串行队列,每个作业后等待2秒钟

RxSwift - 递归 Observables?

RxSwift - 来自 Singleton 的不同类型的队列 observables

springBoot接口排队(串行执行)

如何同步 URLSession 任务的串行队列?

使用 RxSwift 同步异步网络调用