RxSwift - 将序列拆分为更小的序列以连接

Posted

技术标签:

【中文标题】RxSwift - 将序列拆分为更小的序列以连接【英文标题】:RxSwift - Split up sequence into smaller sequences to concat on 【发布时间】:2017-04-20 18:02:51 【问题描述】:

背景

我正在尝试使用 Range 标头通过块下载文件。

目标

我想将大量的 http 请求序列分成四个序列,然后我可以将它们连接起来以一次处理 4 个请求。

当前排名

我目前正在执行我的序列并使用concat 来确保在我开始第二个请求之前完成第一个可观察请求。这样做是为了确保我不会因过多的请求而使 Alamofire 过载,从而导致请求超时。

理想情况下,我想将我的序列分成四个相当相等的序列,因为 Alamofire 被设置为一次处理与主机的四个连接。我想这样做是因为我相信它会提高我的下载速度。

使用块下载文件

Observable.generate(initialState: 0, condition:  $0 < fileSize , iterate: $0 + self.defaultChunkSize)
    .map(  (startChunk) in
        let endChunk = startChunk + self.defaultChunkSize > fileSize ? fileSize : startChunk + self.defaultChunkSize

        return (startChunk, endChunk)
    )
    .map( (startChunk: Int, endChunk: Int) -> Observable<FileChunkResult> in
        self.filesClient.downloadChunkOf(fileId: file.id, startChunk: Int64(startChunk), endChunk: Int64(endChunk))
    )

    .concat() // <----- This is where I am forcing the large sequence to do one observable at a time

    .flatMap(  (result: FileChunkResult) -> Observable<FileSaveChunkResult> in
        switch (result) 
        case FileChunkResult.success(let chunkData):
            return self.saveChunkToFile(fileChunk: chunkData, location: urlToSaveTo)
        case FileChunkResult.failure: // Maybe change this to just default and return Observable.just(FileSaveChunkResult.failure)
            break
        case FileChunkResult.parserError:
            break
        

        return Observable.just(FileSaveChunkResult.failure)
    )
    .flatMap(  (result: FileSaveChunkResult) -> Observable<Progress> in
        switch (result) 
        case FileSaveChunkResult.success(let bytesSaved):
            progress.completedUnitCount += bytesSaved
        case FileSaveChunkResult.failure:
            break
        

        return Observable.just(progress)
    )

【问题讨论】:

仅供参考。 (1) AlamoFire 中没有任何东西会限制您一次处理四个请求,尽管您的服务器可能有某些东西会导致这种限制。 (2) 以 1/4 最大速度并行上传的四个请求不太可能比每个以全速顺序上传的四个请求更快地完成。 【参考方案1】:

下面的代码会将块分成四个大小相等的数组,这些数组使用 concat 确保每个数组一次只有一个保存处于活动状态。这意味着无论任何特定呼叫的速度有多快或多慢,您都会在任何时候始终有 4 个 saveChunkToFile 呼叫处于活动状态。

换句话说,它立即启动四个请求,然后每次之前的请求之一完成时启动一个请求。

let generator = Observable.generate(initialState: 0, condition:  $0 < fileSize , iterate:  $0 + defaultChunkSize )
let chunks  = generator.map(  (startChunk) -> (Int64, Int64) in
    let endChunk = (startChunk + defaultChunkSize > fileSize ? fileSize : startChunk + defaultChunkSize )
    return (startChunk, endChunk)
)

let count = ceil(Double(fileSize) / Double(defaultChunkSize) / 4)
let requests = chunks.window(timeSpan: 0.0, count: Int(count), scheduler: MainScheduler.instance)
    .flatMap  $0
        .map(  (startChunk: Int64, endChunk: Int64) -> Observable<FileChunk> in
            return makeChunkRequest(url: downloadUrl, startChunk: startChunk, endChunk: endChunk)
        ).concat()


let downloadObservable = requests
    .flatMap(  (fileChunk: FileChunk) -> Observable<FileSaveChunkResult> in
        return saveChunkToFile(fileChunk: fileChunk, location: localDestinationUrl)
    ).flatMap(  (saveResult: FileSaveChunkResult) -> Observable<Progress> in
        if case .success(let bytesSaved) = saveResult 
            progress.completedUnitCount += bytesSaved
        
        return Observable.just(progress)
    )

_ = downloadObservable.subscribe(onNext:  print(Date(), $0) )

【讨论】:

以上是关于RxSwift - 将序列拆分为更小的序列以连接的主要内容,如果未能解决你的问题,请参考以下文章

将多维数组拆分为更小的数组

我可以按元素属性将列表拆分为更小的列表吗

putExtra:我应该如何处理大型数组?数据库或拆分为更小的阵列?

如何使用 JavaScript 将长数组拆分为更小的数组

如何使用JavaScript将长数组拆分为更小的数组

动态规划