RxSwift - Observable.generate - 使用附加映射处理顺序请求

Posted

技术标签:

【中文标题】RxSwift - Observable.generate - 使用附加映射处理顺序请求【英文标题】:RxSwift - Observable.generate - Handle sequential requests with additional mapping 【发布时间】:2017-04-17 17:20:49 【问题描述】:

目标

我正在尝试实现一个使用 Range 标头下载文件的服务。这允许我一次下载一大块文件。

实现 - Observable.generate()

为了为每个请求创建一个 observable 并保存我尝试使用的文件:

let downloadObservable = Observable.generate(initialState: 0, condition:  $0 < fileSize , iterate:  $0 + self.defaultChunkSize )

这似乎工作得很好!除了较大的文件,它似乎有一个错误。我的请求被取消了。调试后,我发现我的工作流程并没有按照我的预期工作。这是附加到上一行的其余工作流程。

.map(  (startChunk) -> (Int64, Int64) in
    // I determine the end chunk so I can download any size file in chunks of size X
    let endChunk = (startChunk + self.defaultChunkSize > fileSize ? fileSize : startChunk + self.defaultChunkSize )

    return (startChunk, endChunk)
).flatMap(  [unowned self] (startChunk: Int64, endChunk: Int64) -> Observable<FileChunk> in
    // I make the request via alamofire - UNEXPECTED FLOW HERE SEE NOTE #1
    return self.makeChunkRequest(url: downloadUrl, startChunk: startChunk, endChunk: endChunk)

).flatMap(  [unowned self] (fileChunk: FileChunk) -> Observable<FileSaveChunkResult> in
    // Upon receiving chunk response save to file
    return self.saveChunkToFile(fileChunk: fileChunk, location: localDestinationUrl)

).flatMap(  (saveResult: FileSaveChunkResult) -> Observable<Progress> in
    // Update progress if successful
    switch (saveResult) 
    case .success(let bytesSaved):
        progress.completedUnitCount += bytesSaved
    case .failure:
        break
    

    return Observable.just(progress)
)

注意 #1

当我运行和调试它时,我的第一个 flatMap 循环直到发出 ALL 块请求。我希望这会更加连续,我们将生成一个 observable,然后通过 flatMap 进行所有转换,然后循环回到开头。

这不是我应该如何实施的吗?

我需要对我的Observable.generate() 上的merge() 施展魔法吗?

【问题讨论】:

这个问题的答案花了一点时间,但我找到了! 【参考方案1】:

我想我已经找到了这个问题的解决方案。关键是通过网络请求map,然后concat 他们。这样做而不是使用flatMapconcat 操作符将等到请求发送 onCompleted 后再开始下一个请求。代码如下:

let downloadObservable = Observable.generate(initialState: 0, condition:  $0 < fileSize , iterate:  $0 + self.defaultChunkSize )
    .map(  (startChunk) -> (Int64, Int64) in
        let endChunk = (startChunk + self.defaultChunkSize > fileSize ? fileSize : startChunk + self.defaultChunkSize )
        return (startChunk, endChunk)
    ).map(  [unowned self] (startChunk: Int64, endChunk: Int64) -> Observable<FileChunk> in
        return self.makeChunkRequest(url: downloadUrl, startChunk: startChunk, endChunk: endChunk)
    ).concat()
    .flatMap(  [unowned self] (fileChunk: FileChunk) -> Observable<FileSaveChunkResult> in
        return self.saveChunkToFile(fileChunk: fileChunk, location: localDestinationUrl)
    ).flatMap(  (saveResult: FileSaveChunkResult) -> Observable<Progress> in
        if case .success(let bytesSaved) = saveResult 
            progress.completedUnitCount += bytesSaved
        
        return Observable.just(progress)
    )

我想出了如何将它分成 4 批。我将它展开了一点,并在代码中制作了 cmets 以提供帮助:

let generator = Observable.generate(initialState: 0, condition:  $0 < fileSize , iterate:  $0 + defaultChunkSize )
let chunks  = generator.map(  (startChunk) -> (Int64, Int64) in
    let endChunk = (startChunk + defaultChunkSize > fileSize ? fileSize : startChunk + defaultChunkSize )
    return (startChunk, endChunk)
)
let requests = chunks.buffer(timeSpan: 0.0, count: 4, scheduler: MainScheduler.instance)// makes batches of four item arrays.
    .map  (batch) -> Observable<FileChunk> in
        let requests = Observable.from(batch) // spreads the four items back out.
        return requests.flatMap(  (startChunk: Int64, endChunk: Int64) -> Observable<FileChunk> in
            return makeChunkRequest(url: downloadUrl, startChunk: startChunk, endChunk: endChunk)
        ) // start the four requests as normal.
    .concat() // wait until the four requests are finished before allowing the next four to begin.

let downloadObservable = requests
    .flatMap(  (fileChunk: FileChunk) -> Observable<FileSaveChunkResult> in
        return saveChunkToFile(fileChunk: fileChunk, location: localDestinationUrl)
    ).flatMap(  (saveResult: FileSaveChunkResult) -> Observable<Progress> in
        if case .success(let bytesSaved) = saveResult 
            progress.completedUnitCount += bytesSaved
        
        return Observable.just(progress)
    )

【讨论】:

这太棒了!所以我学到的一件事是,我在 alamofire 中的会话管理器一次最多可以处理 4 个请求。使用你的 map().concat() 我确实一次只做一个请求,这可以防止我遇到的问题。但是,它也降低了我的下载速度,因为我一次只能执行一个请求。知道如何改进它以允许 4 个请求吗?我还没有尝试让它这样做,所以我现在会看看,但我想我应该回复你的答案! 您需要使用buffer 运算符。 所以我想做一些类似 .concat().buffer().flatMapWithIndex() 的事情?由于在这种情况下 buffer() 返回一个 FileChunks 数组?抱歉,完全不熟悉缓冲区。 现在是让您的代码可测试的好时机。我自己只使用过一次缓冲区,所以我不完全确定如何设置它。 给你,分成四个批次。现在唯一的缺点是整个批次必须在接下来的四个开始之前完成......

以上是关于RxSwift - Observable.generate - 使用附加映射处理顺序请求的主要内容,如果未能解决你的问题,请参考以下文章

RxSwift

RXSwift的一些基本交互(OC,Swift,RXSwift对比)

如何正确安装“RxSwift”模块?

RxSwift + 用户默认值

给 iOS 开发者的 RxSwift

RxSwift + Moya + ObjectMapper