组合框架序列化异步操作

Posted

技术标签:

【中文标题】组合框架序列化异步操作【英文标题】:Combine framework serialize async operations 【发布时间】:2020-05-01 18:41:32 【问题描述】:

如何让构成组合框架的异步管道同步(串行)排列?

假设我有 50 个 URL,我想从其中下载相应的资源,假设我想一次下载一个。我知道如何使用 Operation / OperationQueue 来做到这一点,例如使用在下载完成之前不会声明自己完成的操作子类。我如何使用 Combine 做同样的事情?

目前我所想到的就是保留剩余 URL 的全局列表并弹出一个,为一次下载设置一个管道,进行下载,然后在管道的 sink 中重复.这似乎不太像组合。

我确实尝试过制作 URL 数组并将其映射到发布者数组。我知道我可以“产生”一个发布者,并使用flatMap 使其在管道上发布。但是我仍然同时进行所有下载。没有任何 Combine 方法可以以受控的方式遍历数组 - 或者有吗?

(我也想过用 Future 做点什么,但我变得非常困惑。我不习惯这种思维方式。)

【问题讨论】:

【参考方案1】:

在所有其他响应式框架中,这真的很容易;您只需使用concat 一步连接和展平结果,然后您可以reduce 将结果放入最终数组。 Apple 使这变得困难,因为Publisher.Concatenate 没有接受发布者数组的重载。 Publisher.Merge 也有类似的奇怪之处。我感觉这与它们返回嵌套的泛型发布者而不是仅仅返回像 rx Observable 这样的单个泛型类型有关。我想你可以在循环中调用Concatenate,然后将连接的结果减少到一个数组中,但我真的希望他们在下一个版本中解决这个问题。肯定需要连接超过 2 个发布者并合并超过 4 个发布者(这两个运算符的重载甚至不一致,这很奇怪)。

编辑:

我回过头来发现你确实可以连接任意的发布者数组,它们会按顺序发出。我不知道为什么没有像ConcatenateMany 这样的函数来为您执行此操作,但看起来只要您愿意使用类型擦除的发布者,自己编写一个并不难。这个例子展示了 merge 按时间顺序发出,而 concat 按组合顺序发出:

import PlaygroundSupport
import SwiftUI
import Combine

let p = Just<Int>(1).append(2).append(3).delay(for: .seconds(0.25), scheduler: RunLoop.main).eraseToAnyPublisher()
let q = Just<Int>(4).append(5).append(6).eraseToAnyPublisher()
let r = Just<Int>(7).append(8).append(9).delay(for: .seconds(0.5), scheduler: RunLoop.main).eraseToAnyPublisher()
let concatenated: AnyPublisher<Int, Never> = [q,r].reduce(p)  total, next in
  total.append(next).eraseToAnyPublisher()


var subscriptions = Set<AnyCancellable>()

concatenated
  .sink(receiveValue:  v in
    print("concatenated: \(v)")
  ).store(in: &subscriptions)

Publishers
  .MergeMany([p,q,r])
  .sink(receiveValue:  v in
    print("merge: \(v)")
  ).store(in: &subscriptions)

【讨论】:

是的,你可能猜到我故意选择了 50 这样的大数字。 有一个 MergeMany。我不明白为什么没有 ConcatenateMany。 Rx swift 有 Observable.concat 而 Reactive Swift 有 flatMap(.concat) 所以这很奇怪;也许我错过了一些东西。我会继续看developer.apple.com/documentation/combine/publishers/mergemany concat 会序列化(在其他反应式框架中)吗? 是的。对于序列序列,您只有一种展平方法,即,将一个内部序列的元素一个接一个地放置,就像在 swift 中的 Sequence.flatMap 一样。当你有一个异步序列时,你必须在展平时考虑时间维度。因此,您可以按时间顺序从所有内部序列发出元素(合并),也可以按序列的顺序从每个内部序列发出元素(连续)。见弹珠图:rxmarbles.com/#concat vs rxmarbles.com/#merge 请注意,.append 是一个创建Publisher.Concatenate 的运算符。【参考方案2】:

这是描述可能方法的一页游乐场代码。主要思想是将异步API调用转换为Future发布者链,从而制作串行管道。

输入:从 1 到 10 的 int 范围,在后台队列中异步转换为字符串

直接调用异步 API 的演示:

let group = DispatchGroup()
inputValues.map 
    group.enter()
    asyncCall(input: $0)  (output, _) in
        print(">> \(output), in \(Thread.current)")
        group.leave()
    

group.wait()

输出:

>> 1, in <NSThread: 0x7fe76264fff0>number = 4, name = (null)
>> 3, in <NSThread: 0x7fe762446b90>number = 3, name = (null)
>> 5, in <NSThread: 0x7fe7624461f0>number = 5, name = (null)
>> 6, in <NSThread: 0x7fe762461ce0>number = 6, name = (null)
>> 10, in <NSThread: 0x7fe76246a7b0>number = 7, name = (null)
>> 4, in <NSThread: 0x7fe764c37d30>number = 8, name = (null)
>> 7, in <NSThread: 0x7fe764c37cb0>number = 9, name = (null)
>> 8, in <NSThread: 0x7fe76246b540>number = 10, name = (null)
>> 9, in <NSThread: 0x7fe7625164b0>number = 11, name = (null)
>> 2, in <NSThread: 0x7fe764c37f50>number = 12, name = (null)

组合管道演示:

输出:

>> got 1
>> got 2
>> got 3
>> got 4
>> got 5
>> got 6
>> got 7
>> got 8
>> got 9
>> got 10
>>>> finished with true

代码:

import Cocoa
import Combine
import PlaygroundSupport

// Assuming there is some Asynchronous API with
// (eg. process Int input value during some time and generates String result)
func asyncCall(input: Int, completion: @escaping (String, Error?) -> Void) 
    DispatchQueue.global(qos: .background).async 
            sleep(.random(in: 1...5)) // wait for random Async API output
            completion("\(input)", nil)
        


// There are some input values to be processed serially
let inputValues = Array(1...10)

// Prepare one pipeline item based on Future, which trasform Async -> Sync
func makeFuture(input: Int) -> AnyPublisher<Bool, Error> 
    Future<String, Error>  promise in
        asyncCall(input: input)  (value, error) in
            if let error = error 
                promise(.failure(error))
             else 
                promise(.success(value))
            
        
    
    .receive(on: DispatchQueue.main)
    .map 
        print(">> got \($0)") // << sideeffect of pipeline item
        return true
    
    .eraseToAnyPublisher()


// Create pipeline trasnforming input values into chain of Future publishers
var subscribers = Set<AnyCancellable>()
let pipeline =
    inputValues
    .reduce(nil as AnyPublisher<Bool, Error>?)  (chain, value) in
        if let chain = chain 
            return chain.flatMap  _ in
                makeFuture(input: value)
            .eraseToAnyPublisher()
         else 
            return makeFuture(input: value)
        
    

// Execute pipeline
pipeline?
    .sink(receiveCompletion:  _ in
        // << do something on completion if needed
    )  output in
        print(">>>> finished with \(output)")
    
    .store(in: &subscribers)

PlaygroundPage.current.needsIndefiniteExecution = true

【讨论】:

【参考方案3】:

我只是对此进行了简单的测试,但在第一次通过时,似乎每个请求都在等待前一个请求完成后再开始。

我发布此解决方案以寻求反馈。如果这不是一个好的解决方案,请批评。

extension Collection where Element: Publisher 

    func serialize() -> AnyPublisher<Element.Output, Element.Failure>? 
        // If the collection is empty, we can't just create an arbititary publisher
        // so we return nil to indicate that we had nothing to serialize.
        if isEmpty  return nil 

        // We know at this point that it's safe to grab the first publisher.
        let first = self.first!

        // If there was only a single publisher then we can just return it.
        if count == 1  return first.eraseToAnyPublisher() 

        // We're going to build up the output starting with the first publisher.
        var output = first.eraseToAnyPublisher()

        // We iterate over the rest of the publishers (skipping over the first.)
        for publisher in self.dropFirst() 
            // We build up the output by appending the next publisher.
            output = output.append(publisher).eraseToAnyPublisher()
        

        return output
    



此解决方案的更简洁版本(由@matt 提供):

extension Collection where Element: Publisher 
    func serialize() -> AnyPublisher<Element.Output, Element.Failure>? 
        guard let start = self.first else  return nil 
        return self.dropFirst().reduce(start.eraseToAnyPublisher()) 
            $0.append($1).eraseToAnyPublisher()
        
    

【讨论】:

太好了,谢谢。 append 正是我想要的。 — 您的代码可以大大收紧;特别是,在count == 1 的情况下不需要过早返回,因为在这种情况下dropFirst 将是空的,我们不会循环。并且不需要维护output 变量,因为我们可以使用reduce 代替for...in。请参阅我的答案以获得更紧密的渲染。【参考方案4】:

您可以创建自定义订阅者,接收返回的订阅者.Demand.max(1)。在这种情况下,订阅者只有在收到一个值时才会请求下一个值。该示例适用于 Int.publisher,但地图中的一些随机延迟模仿了网络流量 :-)

import PlaygroundSupport
import SwiftUI
import Combine

class MySubscriber: Subscriber 
  typealias Input = String
  typealias Failure = Never

  func receive(subscription: Subscription) 
    print("Received subscription", Thread.current.isMainThread)
    subscription.request(.max(1))
  

  func receive(_ input: Input) -> Subscribers.Demand 
    print("Received input: \(input)", Thread.current.isMainThread)
    return .max(1)
  

  func receive(completion: Subscribers.Completion<Never>) 
    DispatchQueue.main.async 
        print("Received completion: \(completion)", Thread.current.isMainThread)
        PlaygroundPage.current.finishExecution()
    
  


(110...120)
    .publisher.receive(on: DispatchQueue.global())
    .map 
        print(Thread.current.isMainThread, Thread.current)
        usleep(UInt32.random(in: 10000 ... 1000000))
        return String(format: "%02x", $0)
    
    .subscribe(on: DispatchQueue.main)
    .subscribe(MySubscriber())

print("Hello")

PlaygroundPage.current.needsIndefiniteExecution = true

游乐场打印 ...

Hello
Received subscription true
false <NSThread: 0x600000064780>number = 5, name = (null)
Received input: 6e false
false <NSThread: 0x60000007cc80>number = 9, name = (null)
Received input: 6f false
false <NSThread: 0x60000007cc80>number = 9, name = (null)
Received input: 70 false
false <NSThread: 0x60000007cc80>number = 9, name = (null)
Received input: 71 false
false <NSThread: 0x60000007cc80>number = 9, name = (null)
Received input: 72 false
false <NSThread: 0x600000064780>number = 5, name = (null)
Received input: 73 false
false <NSThread: 0x600000064780>number = 5, name = (null)
Received input: 74 false
false <NSThread: 0x60000004dc80>number = 8, name = (null)
Received input: 75 false
false <NSThread: 0x60000004dc80>number = 8, name = (null)
Received input: 76 false
false <NSThread: 0x60000004dc80>number = 8, name = (null)
Received input: 77 false
false <NSThread: 0x600000053400>number = 3, name = (null)
Received input: 78 false
Received completion: finished true

更新 最后我找到了.flatMap(maxPublishers: ),这迫使我用有点不同的方法来更新这个有趣的话题。请注意,我正在使用全局队列进行调度,不仅仅是一些随机延迟,只是为了确保接收序列化流不是“随机”或“幸运”行为:-)

import PlaygroundSupport
import Combine
import Foundation

PlaygroundPage.current.needsIndefiniteExecution = true

let A = (1 ... 9)
    .publisher
    .flatMap(maxPublishers: .max(1))  value in
        [value].publisher
            .flatMap  value in
                Just(value)
                    .delay(for: .milliseconds(Int.random(in: 0 ... 100)), scheduler: DispatchQueue.global())
        

.sink  value in
    print(value, "A")


let B = (1 ... 9)
    .publisher
    .flatMap  value in
        [value].publisher
            .flatMap  value in
                Just(value)
                    .delay(for: .milliseconds(Int.random(in: 0 ... 100)), scheduler: RunLoop.main)
        

.sink  value in
    print("     ",value, "B")

打印

1 A
      4 B
      5 B
      7 B
      1 B
      2 B
      8 B
      6 B
2 A
      3 B
      9 B
3 A
4 A
5 A
6 A
7 A
8 A
9 A

根据这里写的

.serialize()?

由 Clay Ellis 定义的接受的答案可以替换为

.publisher.flatMap(maxPublishers: .max(1))$0

而“非序列化”版本必须使用

.publisher.flatMap$0

“现实世界的例子”

import PlaygroundSupport
import Foundation
import Combine

let path = "postman-echo.com/get"
let urls: [URL] = "... which proves the downloads are happening serially .-)".map(String.init).compactMap  (parameter) in
    var components = URLComponents()
    components.scheme = "https"
    components.path = path
    components.queryItems = [URLQueryItem(name: parameter, value: nil)]
    return components.url

//["https://postman-echo.com/get?]
struct Postman: Decodable 
    var args: [String: String]



let collection = urls.compactMap  value in
        URLSession.shared.dataTaskPublisher(for: value)
        .tryMap  data, response -> Data in
            return data
        
        .decode(type: Postman.self, decoder: JSONDecoder())
        .catch _ in
            Just(Postman(args: [:]))
    


extension Collection where Element: Publisher 
    func serialize() -> AnyPublisher<Element.Output, Element.Failure>? 
        guard let start = self.first else  return nil 
        return self.dropFirst().reduce(start.eraseToAnyPublisher()) 
            return $0.append($1).eraseToAnyPublisher()
        
    


var streamA = ""
let A = collection
    .publisher.flatMap$0

    .sink(receiveCompletion:  (c) in
        print(streamA, "     ", c, "    .publisher.flatMap$0")
    , receiveValue:  (postman) in
        print(postman.args.keys.joined(), terminator: "", to: &streamA)
    )


var streamC = ""
let C = collection
    .serialize()?

    .sink(receiveCompletion:  (c) in
        print(streamC, "     ", c, "    .serialize()?")
    , receiveValue:  (postman) in
        print(postman.args.keys.joined(), terminator: "", to: &streamC)
    )

var streamD = ""
let D = collection
    .publisher.flatMap(maxPublishers: .max(1))$0

    .sink(receiveCompletion:  (c) in
        print(streamD, "     ", c, "    .publisher.flatMap(maxPublishers: .max(1))$0")
    , receiveValue:  (postman) in
        print(postman.args.keys.joined(), terminator: "", to: &streamD)
    )

PlaygroundPage.current.needsIndefiniteExecution = true

打印

.w.h i.c hporves ht edownloadsa erh appeninsg eriall y.-)       finished     .publisher.flatMap$0
... which proves the downloads are happening serially .-)       finished     .publisher.flatMap(maxPublishers: .max(1))$0
... which proves the downloads are happening serially .-)       finished     .serialize()?

在我看来,在其他情况下也非常有用。尝试在下一个 sn-p 中使用 maxPublishers 的默认值并比较结果:-)

import Combine

let sequencePublisher = Publishers.Sequence<Range<Int>, Never>(sequence: 0..<Int.max)
let subject = PassthroughSubject<String, Never>()

let handle = subject
    .zip(sequencePublisher.print())
    //.publish
    .flatMap(maxPublishers: .max(1),  (pair)  in
        Just(pair)
    )
    .print()
    .sink  letters, digits in
        print(letters, digits)
    

"Hello World!".map(String.init).forEach  (s) in
    subject.send(s)

subject.send(completion: .finished)

【讨论】:

@matt sink 没有任何不同,只是在接收返回 Subsribers.Demand.unlimited ... 可能正在使用正确的工具,如串行队列和 Data.init?(contentsOf url: URL ) 是您场景中的最佳选择。如果您需要对两个 Int 求和,您是否将其作为 [lhs: Int, rhs: Int].reduce .... ???我将在 MySerialDownloaderSubscriber 的 receive(_input:) 中使用 Data.init?(contentsOf url: URL)。 @matt 请查看更新后的答案。结合是令人兴奋的,但(至少对我来说)很难理解...... 是的,我明白了!使用maxPublishers 参数,我们可以添加背压。这与我在我的问题中所说的相符:“我知道我可以“生产”一个发布者并使用 flatMap 使其在管道上发布。但是我仍然同时进行所有下载。好吧,使用 maxPublishers 参数,它们不是同时发生的。 @matt 是的,sink 使用 Subscribers.Demand.unlimited 调用发布者自己的订阅者,在我们的用例 .max(1) 中,flatMap 与设置发布者自己的订阅者具有不同的值具有相同的效果。我只是添加了另一个具有不同场景的示例,它非常有用。【参考方案5】:

来自原始问题:

我确实尝试过制作 URL 数组并将其映射到发布者数组。我知道我可以“产生”一个发布者并使用flatMap 使其在管道上发布。但是我仍然同时进行所有下载。没有任何组合方式可以以受控方式遍历数组——或者有吗?


这是一个代表真正问题的玩具示例:

let collection = (1 ... 10).map 
    Just($0).delay(
        for: .seconds(Double.random(in:1...5)),
        scheduler: DispatchQueue.main)
        .eraseToAnyPublisher()

collection.publisher
    .flatMap() $0
    .sink print($0).store(in:&self.storage)

这会在随机时间以随机顺序发出从 1 到 10 的整数。目标是对collection 做一些事情,使其按顺序发出从 1 到 10 的整数。


现在我们只改变一件事:在行中

.flatMap $0

我们添加maxPublishers参数:

let collection = (1 ... 10).map 
    Just($0).delay(
        for: .seconds(Double.random(in:1...5)),
        scheduler: DispatchQueue.main)
        .eraseToAnyPublisher()

collection.publisher
    .flatMap(maxPublishers:.max(1)) $0
    .sink print($0).store(in:&self.storage)

Presto,我们现在按顺序发出从 1 到 10 的整数,它们之间有随机间隔。


让我们将其应用于原始问题。为了演示,我需要一个相当慢的 Internet 连接和一个相当大的资源来下载。首先,我会用普通的.flatMap

let eph = URLSessionConfiguration.ephemeral
let session = URLSession(configuration: eph)
let url = "https://photojournal.jpl.nasa.gov/tiff/PIA23172.tif"
let collection = [url, url, url]
    .map URL(string:$0)!
    .map session.dataTaskPublisher(for: $0)
        .eraseToAnyPublisher()

collection.publisher.setFailureType(to: URLError.self)
    .handleEvents(receiveOutput: _ in print("start"))
    .flatMap() $0
    .map $0.data
    .sink(receiveCompletion: comp in
        switch comp 
        case .failure(let err): print("error", err)
        case .finished: print("finished")
        
    , receiveValue: _ in print("done"))
    .store(in:&self.storage)

结果是

start
start
start
done
done
done
finished

这表明我们正在同时进行三个下载。好的,现在换

    .flatMap() $0

    .flatMap(maxPublishers:.max(1) $0

现在的结果是:

start
done
start
done
start
done
finished

所以我们现在是串行下载,这是原来要解决的问题。


追加

根据 TIMTOWTDI 的原则,我们可以改为使用 append 链接发布者来序列化它们:

let collection = (1 ... 10).map 
    Just($0).delay(
        for: .seconds(Double.random(in:1...5)),
        scheduler: DispatchQueue.main)
        .eraseToAnyPublisher()

let pub = collection.dropFirst().reduce(collection.first!) 
    return $0.append($1).eraseToAnyPublisher()

结果是一个发布者序列化原始集合中的延迟发布者。让我们通过订阅它来证明它:

pub.sink print($0).store(in:&self.storage)

果然,整数现在按顺序到达(之间有随机间隔)。


我们可以按照 Clay Ellis 的建议,封装从发布者集合中创建的 pub,并在 Collection 上进行扩展:

extension Collection where Element: Publisher 
    func serialize() -> AnyPublisher<Element.Output, Element.Failure>? 
        guard let start = self.first else  return nil 
        return self.dropFirst().reduce(start.eraseToAnyPublisher()) 
            return $0.append($1).eraseToAnyPublisher()
        
    

【讨论】:

【参考方案6】:

flatMap(maxPublishers:transform:).max(1) 一起使用,例如

func imagesPublisher(for urls: [URL]) -> AnyPublisher<UIImage, URLError> 
    Publishers.Sequence(sequence: urls.map  self.imagePublisher(for: $0) )
        .flatMap(maxPublishers: .max(1))  $0 
        .eraseToAnyPublisher()

在哪里

func imagePublisher(for url: URL) -> AnyPublisher<UIImage, URLError> 
    URLSession.shared.dataTaskPublisher(for: url)
        .compactMap  UIImage(data: $0.data) 
        .receive(on: RunLoop.main)
        .eraseToAnyPublisher()

var imageRequests: AnyCancellable?

func fetchImages() 
    imageRequests = imagesPublisher(for: urls).sink  completion in
        switch completion 
        case .finished:
            print("done")
        case .failure(let error):
            print("failed", error)
        
     receiveValue:  image in
        // do whatever you want with the images as they come in
    

结果是:

但我们应该认识到,您按顺序执行这些操作会对性能造成很大影响,就像这样。例如,如果我一次将它提高到 6 个,它的速度是原来的两倍多:

就个人而言,我建议仅在绝对必须的情况下按顺序下载(在下载一系列图像/文件时,几乎可以肯定不是这种情况)。是的,并发执行请求可能会导致它们没有按特定顺序完成,但我们只是使用与顺序无关的结构(例如字典而不是简单数组),但性能提升非常显着,通常值得这样做。

但是,如果您希望它们按顺序下载,maxPublishers 参数可以实现。

【讨论】:

是的,这就是我的答案已经说的:***.com/a/59889993/341994 以及我将赏金授予***.com/a/59889174/341994的答案 现在也可以看看我的书apeth.com/UnderstandingCombine/operators/… 顺便说一下顺序,我已经很好地利用了您的顺序异步操作来完成不同的任务,感谢您编写它 @matt - 大声笑。我承认我没有看到您找到了maxPublishers 选项。如果我注意到是你,我就不会喋喋不休地谈论“不要做连续剧”(因为我知道你完全理解连续剧与并发的利弊)。我实际上只看到“我想一次下载一个文件”,我最近偶然发现maxPublishers 选项用于我正在做的其他事情(即提供modern solution to this question),我想我会分享组合解决方案我想出了。我不是故意的。 是的,我之前说的就是***.com/a/48104095/1271826提到的解决方案;我发现这很有帮助。【参考方案7】:

URL 的动态数组呢,比如数据总线?

      var array: [AnyPublisher<Data, URLError>] = []

      array.append(Task())

      array.publisher
         .flatMap  $0 
         .sink 

         
         // it will be finished
      array.append(Task())
      array.append(Task())
      array.append(Task())

【讨论】:

【参考方案8】:

另一种方法,如果您想收集所有下载结果,以便知道哪个失败,哪个失败,则编写一个如下所示的自定义发布者:

extension Publishers 
    struct Serialize<Upstream: Publisher>: Publisher 
        typealias Output = [Result<Upstream.Output, Upstream.Failure>]
        typealias Failure = Never

        let upstreams: [Upstream]

        init<C: Collection>(_ upstreams: C) where C.Element == Upstream 
            self.upstreams = Array(upstreams)
        

        init(_ upstreams: Upstream...) 
            self.upstreams = upstreams
        

        func receive<S>(subscriber: S) where S : Subscriber, Self.Failure == S.Failure, Self.Output == S.Input 
            guard let first = upstreams.first else  return Empty().subscribe(subscriber) 
            first
                .map  Result<Upstream.Output, Upstream.Failure>.success($0) 
                .catch  Just(Result<Upstream.Output, Upstream.Failure>.failure($0)) 
                .map  [$0] 
                .append(Serialize(upstreams.dropFirst()))
                .collect()
                .map  $0.flatMap  $0  
                .subscribe(subscriber)
        
    


extension Collection where Element: Publisher   
    func serializedPublishers() -> Publishers.Serialize<Element> 
        .init(self)
    

发布者获取第一个下载任务,将其输出/失败转换为 Result 实例,并将其添加到列表其余部分的“递归”调用之前。

用法:Publishers.Serialize(listOfDownloadTasks),或listOfDownloadTasks.serializedPublishers()

这个实现的一个小不便是Result 实例需要被包装到一个数组中,只是为了在管道中的三个步骤之后被展平。也许有人可以提出更好的替代方案。

【讨论】:

以上是关于组合框架序列化异步操作的主要内容,如果未能解决你的问题,请参考以下文章

了不起的Java-CompletableFuture组合异步编程

15.6.2Task使用 组合异步操作

如何在双向绑定组合框(WPF)上调用异步操作

来自异步序列化操作的 MissingMethodException

如何组成异步操作?

Perl的web框架Mojolicious与异步数据库操作