组合框架序列化异步操作
Posted
技术标签:
【中文标题】组合框架序列化异步操作【英文标题】:Combine framework serialize async operations 【发布时间】:2020-05-01 18:41:32 【问题描述】:如何让构成组合框架的异步管道同步(串行)排列?
假设我有 50 个 URL,我想从其中下载相应的资源,假设我想一次下载一个。我知道如何使用 Operation / OperationQueue 来做到这一点,例如使用在下载完成之前不会声明自己完成的操作子类。我如何使用 Combine 做同样的事情?
目前我所想到的就是保留剩余 URL 的全局列表并弹出一个,为一次下载设置一个管道,进行下载,然后在管道的 sink
中重复.这似乎不太像组合。
我确实尝试过制作 URL 数组并将其映射到发布者数组。我知道我可以“产生”一个发布者,并使用flatMap
使其在管道上发布。但是我仍然同时进行所有下载。没有任何 Combine 方法可以以受控的方式遍历数组 - 或者有吗?
(我也想过用 Future 做点什么,但我变得非常困惑。我不习惯这种思维方式。)
【问题讨论】:
【参考方案1】:在所有其他响应式框架中,这真的很容易;您只需使用concat
一步连接和展平结果,然后您可以reduce
将结果放入最终数组。 Apple 使这变得困难,因为Publisher.Concatenate
没有接受发布者数组的重载。 Publisher.Merge
也有类似的奇怪之处。我感觉这与它们返回嵌套的泛型发布者而不是仅仅返回像 rx Observable 这样的单个泛型类型有关。我想你可以在循环中调用Concatenate,然后将连接的结果减少到一个数组中,但我真的希望他们在下一个版本中解决这个问题。肯定需要连接超过 2 个发布者并合并超过 4 个发布者(这两个运算符的重载甚至不一致,这很奇怪)。
编辑:
我回过头来发现你确实可以连接任意的发布者数组,它们会按顺序发出。我不知道为什么没有像ConcatenateMany
这样的函数来为您执行此操作,但看起来只要您愿意使用类型擦除的发布者,自己编写一个并不难。这个例子展示了 merge 按时间顺序发出,而 concat 按组合顺序发出:
import PlaygroundSupport
import SwiftUI
import Combine
let p = Just<Int>(1).append(2).append(3).delay(for: .seconds(0.25), scheduler: RunLoop.main).eraseToAnyPublisher()
let q = Just<Int>(4).append(5).append(6).eraseToAnyPublisher()
let r = Just<Int>(7).append(8).append(9).delay(for: .seconds(0.5), scheduler: RunLoop.main).eraseToAnyPublisher()
let concatenated: AnyPublisher<Int, Never> = [q,r].reduce(p) total, next in
total.append(next).eraseToAnyPublisher()
var subscriptions = Set<AnyCancellable>()
concatenated
.sink(receiveValue: v in
print("concatenated: \(v)")
).store(in: &subscriptions)
Publishers
.MergeMany([p,q,r])
.sink(receiveValue: v in
print("merge: \(v)")
).store(in: &subscriptions)
【讨论】:
是的,你可能猜到我故意选择了 50 这样的大数字。 有一个 MergeMany。我不明白为什么没有 ConcatenateMany。 Rx swift 有 Observable.concat 而 Reactive Swift 有 flatMap(.concat) 所以这很奇怪;也许我错过了一些东西。我会继续看developer.apple.com/documentation/combine/publishers/mergemanyconcat
会序列化(在其他反应式框架中)吗?
是的。对于序列序列,您只有一种展平方法,即,将一个内部序列的元素一个接一个地放置,就像在 swift 中的 Sequence.flatMap 一样。当你有一个异步序列时,你必须在展平时考虑时间维度。因此,您可以按时间顺序从所有内部序列发出元素(合并),也可以按序列的顺序从每个内部序列发出元素(连续)。见弹珠图:rxmarbles.com/#concat vs rxmarbles.com/#merge
请注意,.append
是一个创建Publisher.Concatenate
的运算符。【参考方案2】:
这是描述可能方法的一页游乐场代码。主要思想是将异步API调用转换为Future
发布者链,从而制作串行管道。
输入:从 1 到 10 的 int 范围,在后台队列中异步转换为字符串
直接调用异步 API 的演示:
let group = DispatchGroup()
inputValues.map
group.enter()
asyncCall(input: $0) (output, _) in
print(">> \(output), in \(Thread.current)")
group.leave()
group.wait()
输出:
>> 1, in <NSThread: 0x7fe76264fff0>number = 4, name = (null) >> 3, in <NSThread: 0x7fe762446b90>number = 3, name = (null) >> 5, in <NSThread: 0x7fe7624461f0>number = 5, name = (null) >> 6, in <NSThread: 0x7fe762461ce0>number = 6, name = (null) >> 10, in <NSThread: 0x7fe76246a7b0>number = 7, name = (null) >> 4, in <NSThread: 0x7fe764c37d30>number = 8, name = (null) >> 7, in <NSThread: 0x7fe764c37cb0>number = 9, name = (null) >> 8, in <NSThread: 0x7fe76246b540>number = 10, name = (null) >> 9, in <NSThread: 0x7fe7625164b0>number = 11, name = (null) >> 2, in <NSThread: 0x7fe764c37f50>number = 12, name = (null)
组合管道演示:
输出:
>> got 1 >> got 2 >> got 3 >> got 4 >> got 5 >> got 6 >> got 7 >> got 8 >> got 9 >> got 10 >>>> finished with true
代码:
import Cocoa
import Combine
import PlaygroundSupport
// Assuming there is some Asynchronous API with
// (eg. process Int input value during some time and generates String result)
func asyncCall(input: Int, completion: @escaping (String, Error?) -> Void)
DispatchQueue.global(qos: .background).async
sleep(.random(in: 1...5)) // wait for random Async API output
completion("\(input)", nil)
// There are some input values to be processed serially
let inputValues = Array(1...10)
// Prepare one pipeline item based on Future, which trasform Async -> Sync
func makeFuture(input: Int) -> AnyPublisher<Bool, Error>
Future<String, Error> promise in
asyncCall(input: input) (value, error) in
if let error = error
promise(.failure(error))
else
promise(.success(value))
.receive(on: DispatchQueue.main)
.map
print(">> got \($0)") // << sideeffect of pipeline item
return true
.eraseToAnyPublisher()
// Create pipeline trasnforming input values into chain of Future publishers
var subscribers = Set<AnyCancellable>()
let pipeline =
inputValues
.reduce(nil as AnyPublisher<Bool, Error>?) (chain, value) in
if let chain = chain
return chain.flatMap _ in
makeFuture(input: value)
.eraseToAnyPublisher()
else
return makeFuture(input: value)
// Execute pipeline
pipeline?
.sink(receiveCompletion: _ in
// << do something on completion if needed
) output in
print(">>>> finished with \(output)")
.store(in: &subscribers)
PlaygroundPage.current.needsIndefiniteExecution = true
【讨论】:
【参考方案3】:我只是对此进行了简单的测试,但在第一次通过时,似乎每个请求都在等待前一个请求完成后再开始。
我发布此解决方案以寻求反馈。如果这不是一个好的解决方案,请批评。
extension Collection where Element: Publisher
func serialize() -> AnyPublisher<Element.Output, Element.Failure>?
// If the collection is empty, we can't just create an arbititary publisher
// so we return nil to indicate that we had nothing to serialize.
if isEmpty return nil
// We know at this point that it's safe to grab the first publisher.
let first = self.first!
// If there was only a single publisher then we can just return it.
if count == 1 return first.eraseToAnyPublisher()
// We're going to build up the output starting with the first publisher.
var output = first.eraseToAnyPublisher()
// We iterate over the rest of the publishers (skipping over the first.)
for publisher in self.dropFirst()
// We build up the output by appending the next publisher.
output = output.append(publisher).eraseToAnyPublisher()
return output
此解决方案的更简洁版本(由@matt 提供):
extension Collection where Element: Publisher
func serialize() -> AnyPublisher<Element.Output, Element.Failure>?
guard let start = self.first else return nil
return self.dropFirst().reduce(start.eraseToAnyPublisher())
$0.append($1).eraseToAnyPublisher()
【讨论】:
太好了,谢谢。append
正是我想要的。 — 您的代码可以大大收紧;特别是,在count == 1
的情况下不需要过早返回,因为在这种情况下dropFirst
将是空的,我们不会循环。并且不需要维护output
变量,因为我们可以使用reduce
代替for...in
。请参阅我的答案以获得更紧密的渲染。【参考方案4】:
您可以创建自定义订阅者,接收返回的订阅者.Demand.max(1)。在这种情况下,订阅者只有在收到一个值时才会请求下一个值。该示例适用于 Int.publisher,但地图中的一些随机延迟模仿了网络流量 :-)
import PlaygroundSupport
import SwiftUI
import Combine
class MySubscriber: Subscriber
typealias Input = String
typealias Failure = Never
func receive(subscription: Subscription)
print("Received subscription", Thread.current.isMainThread)
subscription.request(.max(1))
func receive(_ input: Input) -> Subscribers.Demand
print("Received input: \(input)", Thread.current.isMainThread)
return .max(1)
func receive(completion: Subscribers.Completion<Never>)
DispatchQueue.main.async
print("Received completion: \(completion)", Thread.current.isMainThread)
PlaygroundPage.current.finishExecution()
(110...120)
.publisher.receive(on: DispatchQueue.global())
.map
print(Thread.current.isMainThread, Thread.current)
usleep(UInt32.random(in: 10000 ... 1000000))
return String(format: "%02x", $0)
.subscribe(on: DispatchQueue.main)
.subscribe(MySubscriber())
print("Hello")
PlaygroundPage.current.needsIndefiniteExecution = true
游乐场打印 ...
Hello
Received subscription true
false <NSThread: 0x600000064780>number = 5, name = (null)
Received input: 6e false
false <NSThread: 0x60000007cc80>number = 9, name = (null)
Received input: 6f false
false <NSThread: 0x60000007cc80>number = 9, name = (null)
Received input: 70 false
false <NSThread: 0x60000007cc80>number = 9, name = (null)
Received input: 71 false
false <NSThread: 0x60000007cc80>number = 9, name = (null)
Received input: 72 false
false <NSThread: 0x600000064780>number = 5, name = (null)
Received input: 73 false
false <NSThread: 0x600000064780>number = 5, name = (null)
Received input: 74 false
false <NSThread: 0x60000004dc80>number = 8, name = (null)
Received input: 75 false
false <NSThread: 0x60000004dc80>number = 8, name = (null)
Received input: 76 false
false <NSThread: 0x60000004dc80>number = 8, name = (null)
Received input: 77 false
false <NSThread: 0x600000053400>number = 3, name = (null)
Received input: 78 false
Received completion: finished true
更新
最后我找到了.flatMap(maxPublishers: )
,这迫使我用有点不同的方法来更新这个有趣的话题。请注意,我正在使用全局队列进行调度,不仅仅是一些随机延迟,只是为了确保接收序列化流不是“随机”或“幸运”行为:-)
import PlaygroundSupport
import Combine
import Foundation
PlaygroundPage.current.needsIndefiniteExecution = true
let A = (1 ... 9)
.publisher
.flatMap(maxPublishers: .max(1)) value in
[value].publisher
.flatMap value in
Just(value)
.delay(for: .milliseconds(Int.random(in: 0 ... 100)), scheduler: DispatchQueue.global())
.sink value in
print(value, "A")
let B = (1 ... 9)
.publisher
.flatMap value in
[value].publisher
.flatMap value in
Just(value)
.delay(for: .milliseconds(Int.random(in: 0 ... 100)), scheduler: RunLoop.main)
.sink value in
print(" ",value, "B")
打印
1 A
4 B
5 B
7 B
1 B
2 B
8 B
6 B
2 A
3 B
9 B
3 A
4 A
5 A
6 A
7 A
8 A
9 A
根据这里写的
.serialize()?
由 Clay Ellis 定义的接受的答案可以替换为
.publisher.flatMap(maxPublishers: .max(1))$0
而“非序列化”版本必须使用
.publisher.flatMap$0
“现实世界的例子”
import PlaygroundSupport
import Foundation
import Combine
let path = "postman-echo.com/get"
let urls: [URL] = "... which proves the downloads are happening serially .-)".map(String.init).compactMap (parameter) in
var components = URLComponents()
components.scheme = "https"
components.path = path
components.queryItems = [URLQueryItem(name: parameter, value: nil)]
return components.url
//["https://postman-echo.com/get?]
struct Postman: Decodable
var args: [String: String]
let collection = urls.compactMap value in
URLSession.shared.dataTaskPublisher(for: value)
.tryMap data, response -> Data in
return data
.decode(type: Postman.self, decoder: JSONDecoder())
.catch _ in
Just(Postman(args: [:]))
extension Collection where Element: Publisher
func serialize() -> AnyPublisher<Element.Output, Element.Failure>?
guard let start = self.first else return nil
return self.dropFirst().reduce(start.eraseToAnyPublisher())
return $0.append($1).eraseToAnyPublisher()
var streamA = ""
let A = collection
.publisher.flatMap$0
.sink(receiveCompletion: (c) in
print(streamA, " ", c, " .publisher.flatMap$0")
, receiveValue: (postman) in
print(postman.args.keys.joined(), terminator: "", to: &streamA)
)
var streamC = ""
let C = collection
.serialize()?
.sink(receiveCompletion: (c) in
print(streamC, " ", c, " .serialize()?")
, receiveValue: (postman) in
print(postman.args.keys.joined(), terminator: "", to: &streamC)
)
var streamD = ""
let D = collection
.publisher.flatMap(maxPublishers: .max(1))$0
.sink(receiveCompletion: (c) in
print(streamD, " ", c, " .publisher.flatMap(maxPublishers: .max(1))$0")
, receiveValue: (postman) in
print(postman.args.keys.joined(), terminator: "", to: &streamD)
)
PlaygroundPage.current.needsIndefiniteExecution = true
打印
.w.h i.c hporves ht edownloadsa erh appeninsg eriall y.-) finished .publisher.flatMap$0
... which proves the downloads are happening serially .-) finished .publisher.flatMap(maxPublishers: .max(1))$0
... which proves the downloads are happening serially .-) finished .serialize()?
在我看来,在其他情况下也非常有用。尝试在下一个 sn-p 中使用 maxPublishers 的默认值并比较结果:-)
import Combine
let sequencePublisher = Publishers.Sequence<Range<Int>, Never>(sequence: 0..<Int.max)
let subject = PassthroughSubject<String, Never>()
let handle = subject
.zip(sequencePublisher.print())
//.publish
.flatMap(maxPublishers: .max(1), (pair) in
Just(pair)
)
.print()
.sink letters, digits in
print(letters, digits)
"Hello World!".map(String.init).forEach (s) in
subject.send(s)
subject.send(completion: .finished)
【讨论】:
@matt sink 没有任何不同,只是在接收返回 Subsribers.Demand.unlimited ... 可能正在使用正确的工具,如串行队列和 Data.init?(contentsOf url: URL ) 是您场景中的最佳选择。如果您需要对两个 Int 求和,您是否将其作为 [lhs: Int, rhs: Int].reduce .... ???我将在 MySerialDownloaderSubscriber 的 receive(_input:) 中使用 Data.init?(contentsOf url: URL)。 @matt 请查看更新后的答案。结合是令人兴奋的,但(至少对我来说)很难理解...... 是的,我明白了!使用maxPublishers
参数,我们可以添加背压。这与我在我的问题中所说的相符:“我知道我可以“生产”一个发布者并使用 flatMap 使其在管道上发布。但是我仍然同时进行所有下载。好吧,使用 maxPublishers
参数,它们不是同时发生的。
@matt 是的,sink 使用 Subscribers.Demand.unlimited 调用发布者自己的订阅者,在我们的用例 .max(1) 中,flatMap 与设置发布者自己的订阅者具有不同的值具有相同的效果。我只是添加了另一个具有不同场景的示例,它非常有用。【参考方案5】:
来自原始问题:
我确实尝试过制作 URL 数组并将其映射到发布者数组。我知道我可以“产生”一个发布者并使用
flatMap
使其在管道上发布。但是我仍然同时进行所有下载。没有任何组合方式可以以受控方式遍历数组——或者有吗?
这是一个代表真正问题的玩具示例:
let collection = (1 ... 10).map
Just($0).delay(
for: .seconds(Double.random(in:1...5)),
scheduler: DispatchQueue.main)
.eraseToAnyPublisher()
collection.publisher
.flatMap() $0
.sink print($0).store(in:&self.storage)
这会在随机时间以随机顺序发出从 1 到 10 的整数。目标是对collection
做一些事情,使其按顺序发出从 1 到 10 的整数。
现在我们只改变一件事:在行中
.flatMap $0
我们添加maxPublishers
参数:
let collection = (1 ... 10).map
Just($0).delay(
for: .seconds(Double.random(in:1...5)),
scheduler: DispatchQueue.main)
.eraseToAnyPublisher()
collection.publisher
.flatMap(maxPublishers:.max(1)) $0
.sink print($0).store(in:&self.storage)
Presto,我们现在做按顺序发出从 1 到 10 的整数,它们之间有随机间隔。
让我们将其应用于原始问题。为了演示,我需要一个相当慢的 Internet 连接和一个相当大的资源来下载。首先,我会用普通的.flatMap
:
let eph = URLSessionConfiguration.ephemeral
let session = URLSession(configuration: eph)
let url = "https://photojournal.jpl.nasa.gov/tiff/PIA23172.tif"
let collection = [url, url, url]
.map URL(string:$0)!
.map session.dataTaskPublisher(for: $0)
.eraseToAnyPublisher()
collection.publisher.setFailureType(to: URLError.self)
.handleEvents(receiveOutput: _ in print("start"))
.flatMap() $0
.map $0.data
.sink(receiveCompletion: comp in
switch comp
case .failure(let err): print("error", err)
case .finished: print("finished")
, receiveValue: _ in print("done"))
.store(in:&self.storage)
结果是
start
start
start
done
done
done
finished
这表明我们正在同时进行三个下载。好的,现在换
.flatMap() $0
到
.flatMap(maxPublishers:.max(1) $0
现在的结果是:
start
done
start
done
start
done
finished
所以我们现在是串行下载,这是原来要解决的问题。
追加
根据 TIMTOWTDI 的原则,我们可以改为使用 append
链接发布者来序列化它们:
let collection = (1 ... 10).map
Just($0).delay(
for: .seconds(Double.random(in:1...5)),
scheduler: DispatchQueue.main)
.eraseToAnyPublisher()
let pub = collection.dropFirst().reduce(collection.first!)
return $0.append($1).eraseToAnyPublisher()
结果是一个发布者序列化原始集合中的延迟发布者。让我们通过订阅它来证明它:
pub.sink print($0).store(in:&self.storage)
果然,整数现在按顺序到达(之间有随机间隔)。
我们可以按照 Clay Ellis 的建议,封装从发布者集合中创建的 pub
,并在 Collection 上进行扩展:
extension Collection where Element: Publisher
func serialize() -> AnyPublisher<Element.Output, Element.Failure>?
guard let start = self.first else return nil
return self.dropFirst().reduce(start.eraseToAnyPublisher())
return $0.append($1).eraseToAnyPublisher()
【讨论】:
【参考方案6】:将flatMap(maxPublishers:transform:)
与.max(1)
一起使用,例如
func imagesPublisher(for urls: [URL]) -> AnyPublisher<UIImage, URLError>
Publishers.Sequence(sequence: urls.map self.imagePublisher(for: $0) )
.flatMap(maxPublishers: .max(1)) $0
.eraseToAnyPublisher()
在哪里
func imagePublisher(for url: URL) -> AnyPublisher<UIImage, URLError>
URLSession.shared.dataTaskPublisher(for: url)
.compactMap UIImage(data: $0.data)
.receive(on: RunLoop.main)
.eraseToAnyPublisher()
和
var imageRequests: AnyCancellable?
func fetchImages()
imageRequests = imagesPublisher(for: urls).sink completion in
switch completion
case .finished:
print("done")
case .failure(let error):
print("failed", error)
receiveValue: image in
// do whatever you want with the images as they come in
结果是:
但我们应该认识到,您按顺序执行这些操作会对性能造成很大影响,就像这样。例如,如果我一次将它提高到 6 个,它的速度是原来的两倍多:
就个人而言,我建议仅在绝对必须的情况下按顺序下载(在下载一系列图像/文件时,几乎可以肯定不是这种情况)。是的,并发执行请求可能会导致它们没有按特定顺序完成,但我们只是使用与顺序无关的结构(例如字典而不是简单数组),但性能提升非常显着,通常值得这样做。
但是,如果您希望它们按顺序下载,maxPublishers
参数可以实现。
【讨论】:
是的,这就是我的答案已经说的:***.com/a/59889993/341994 以及我将赏金授予***.com/a/59889174/341994的答案 现在也可以看看我的书apeth.com/UnderstandingCombine/operators/… 顺便说一下顺序,我已经很好地利用了您的顺序异步操作来完成不同的任务,感谢您编写它 @matt - 大声笑。我承认我没有看到您找到了maxPublishers
选项。如果我注意到是你,我就不会喋喋不休地谈论“不要做连续剧”(因为我知道你完全理解连续剧与并发的利弊)。我实际上只看到“我想一次下载一个文件”,我最近偶然发现maxPublishers
选项用于我正在做的其他事情(即提供modern solution to this question),我想我会分享组合解决方案我想出了。我不是故意的。
是的,我之前说的就是***.com/a/48104095/1271826提到的解决方案;我发现这很有帮助。【参考方案7】:
URL 的动态数组呢,比如数据总线?
var array: [AnyPublisher<Data, URLError>] = []
array.append(Task())
array.publisher
.flatMap $0
.sink
// it will be finished
array.append(Task())
array.append(Task())
array.append(Task())
【讨论】:
【参考方案8】:另一种方法,如果您想收集所有下载结果,以便知道哪个失败,哪个失败,则编写一个如下所示的自定义发布者:
extension Publishers
struct Serialize<Upstream: Publisher>: Publisher
typealias Output = [Result<Upstream.Output, Upstream.Failure>]
typealias Failure = Never
let upstreams: [Upstream]
init<C: Collection>(_ upstreams: C) where C.Element == Upstream
self.upstreams = Array(upstreams)
init(_ upstreams: Upstream...)
self.upstreams = upstreams
func receive<S>(subscriber: S) where S : Subscriber, Self.Failure == S.Failure, Self.Output == S.Input
guard let first = upstreams.first else return Empty().subscribe(subscriber)
first
.map Result<Upstream.Output, Upstream.Failure>.success($0)
.catch Just(Result<Upstream.Output, Upstream.Failure>.failure($0))
.map [$0]
.append(Serialize(upstreams.dropFirst()))
.collect()
.map $0.flatMap $0
.subscribe(subscriber)
extension Collection where Element: Publisher
func serializedPublishers() -> Publishers.Serialize<Element>
.init(self)
发布者获取第一个下载任务,将其输出/失败转换为 Result
实例,并将其添加到列表其余部分的“递归”调用之前。
用法:Publishers.Serialize(listOfDownloadTasks)
,或listOfDownloadTasks.serializedPublishers()
。
这个实现的一个小不便是Result
实例需要被包装到一个数组中,只是为了在管道中的三个步骤之后被展平。也许有人可以提出更好的替代方案。
【讨论】:
以上是关于组合框架序列化异步操作的主要内容,如果未能解决你的问题,请参考以下文章
了不起的Java-CompletableFuture组合异步编程