RxSwift - Observable.generate - 使用附加映射处理顺序请求
Posted
技术标签:
【中文标题】RxSwift - Observable.generate - 使用附加映射处理顺序请求【英文标题】:RxSwift - Observable.generate - Handle sequential requests with additional mapping 【发布时间】:2017-04-17 17:20:49 【问题描述】:目标
我正在尝试实现一个使用 Range 标头下载文件的服务。这允许我一次下载一大块文件。
实现 - Observable.generate()
为了为每个请求创建一个 observable 并保存我尝试使用的文件:
let downloadObservable = Observable.generate(initialState: 0, condition: $0 < fileSize , iterate: $0 + self.defaultChunkSize )
这似乎工作得很好!除了较大的文件,它似乎有一个错误。我的请求被取消了。调试后,我发现我的工作流程并没有按照我的预期工作。这是附加到上一行的其余工作流程。
.map( (startChunk) -> (Int64, Int64) in
// I determine the end chunk so I can download any size file in chunks of size X
let endChunk = (startChunk + self.defaultChunkSize > fileSize ? fileSize : startChunk + self.defaultChunkSize )
return (startChunk, endChunk)
).flatMap( [unowned self] (startChunk: Int64, endChunk: Int64) -> Observable<FileChunk> in
// I make the request via alamofire - UNEXPECTED FLOW HERE SEE NOTE #1
return self.makeChunkRequest(url: downloadUrl, startChunk: startChunk, endChunk: endChunk)
).flatMap( [unowned self] (fileChunk: FileChunk) -> Observable<FileSaveChunkResult> in
// Upon receiving chunk response save to file
return self.saveChunkToFile(fileChunk: fileChunk, location: localDestinationUrl)
).flatMap( (saveResult: FileSaveChunkResult) -> Observable<Progress> in
// Update progress if successful
switch (saveResult)
case .success(let bytesSaved):
progress.completedUnitCount += bytesSaved
case .failure:
break
return Observable.just(progress)
)
注意 #1
当我运行和调试它时,我的第一个 flatMap
循环直到发出 ALL 块请求。我希望这会更加连续,我们将生成一个 observable,然后通过 flatMap
进行所有转换,然后循环回到开头。
这不是我应该如何实施的吗?
我需要对我的Observable.generate()
上的merge()
施展魔法吗?
【问题讨论】:
这个问题的答案花了一点时间,但我找到了! 【参考方案1】:我想我已经找到了这个问题的解决方案。关键是通过网络请求map
,然后concat
他们。这样做而不是使用flatMap
。 concat
操作符将等到请求发送 onCompleted 后再开始下一个请求。代码如下:
let downloadObservable = Observable.generate(initialState: 0, condition: $0 < fileSize , iterate: $0 + self.defaultChunkSize )
.map( (startChunk) -> (Int64, Int64) in
let endChunk = (startChunk + self.defaultChunkSize > fileSize ? fileSize : startChunk + self.defaultChunkSize )
return (startChunk, endChunk)
).map( [unowned self] (startChunk: Int64, endChunk: Int64) -> Observable<FileChunk> in
return self.makeChunkRequest(url: downloadUrl, startChunk: startChunk, endChunk: endChunk)
).concat()
.flatMap( [unowned self] (fileChunk: FileChunk) -> Observable<FileSaveChunkResult> in
return self.saveChunkToFile(fileChunk: fileChunk, location: localDestinationUrl)
).flatMap( (saveResult: FileSaveChunkResult) -> Observable<Progress> in
if case .success(let bytesSaved) = saveResult
progress.completedUnitCount += bytesSaved
return Observable.just(progress)
)
我想出了如何将它分成 4 批。我将它展开了一点,并在代码中制作了 cmets 以提供帮助:
let generator = Observable.generate(initialState: 0, condition: $0 < fileSize , iterate: $0 + defaultChunkSize )
let chunks = generator.map( (startChunk) -> (Int64, Int64) in
let endChunk = (startChunk + defaultChunkSize > fileSize ? fileSize : startChunk + defaultChunkSize )
return (startChunk, endChunk)
)
let requests = chunks.buffer(timeSpan: 0.0, count: 4, scheduler: MainScheduler.instance)// makes batches of four item arrays.
.map (batch) -> Observable<FileChunk> in
let requests = Observable.from(batch) // spreads the four items back out.
return requests.flatMap( (startChunk: Int64, endChunk: Int64) -> Observable<FileChunk> in
return makeChunkRequest(url: downloadUrl, startChunk: startChunk, endChunk: endChunk)
) // start the four requests as normal.
.concat() // wait until the four requests are finished before allowing the next four to begin.
let downloadObservable = requests
.flatMap( (fileChunk: FileChunk) -> Observable<FileSaveChunkResult> in
return saveChunkToFile(fileChunk: fileChunk, location: localDestinationUrl)
).flatMap( (saveResult: FileSaveChunkResult) -> Observable<Progress> in
if case .success(let bytesSaved) = saveResult
progress.completedUnitCount += bytesSaved
return Observable.just(progress)
)
【讨论】:
这太棒了!所以我学到的一件事是,我在 alamofire 中的会话管理器一次最多可以处理 4 个请求。使用你的 map().concat() 我确实一次只做一个请求,这可以防止我遇到的问题。但是,它也降低了我的下载速度,因为我一次只能执行一个请求。知道如何改进它以允许 4 个请求吗?我还没有尝试让它这样做,所以我现在会看看,但我想我应该回复你的答案! 您需要使用buffer
运算符。
所以我想做一些类似 .concat().buffer().flatMapWithIndex() 的事情?由于在这种情况下 buffer() 返回一个 FileChunks 数组?抱歉,完全不熟悉缓冲区。
现在是让您的代码可测试的好时机。我自己只使用过一次缓冲区,所以我不完全确定如何设置它。
给你,分成四个批次。现在唯一的缺点是整个批次必须在接下来的四个开始之前完成......以上是关于RxSwift - Observable.generate - 使用附加映射处理顺序请求的主要内容,如果未能解决你的问题,请参考以下文章