如何在组合中安排异步调用的同步序列?

Posted

技术标签:

【中文标题】如何在组合中安排异步调用的同步序列?【英文标题】:How to schedule a synchronous sequence of asynchronous calls in Combine? 【发布时间】:2019-06-18 21:11:36 【问题描述】:

我想在我的应用中处理一系列网络调用。每个调用都是异步的,flatMap() 似乎是正确的调用。但是,flatMap 同时处理所有参数,我需要按顺序调用——下一个网络调用仅在前一个网络调用完成后才开始。我查找了 RxSwift answer,但它需要 concatMap 运算符,而 Combine 没有。这是我正在尝试做的粗略概述,但flatMap 会同时触发所有myCalls

Publishers.Sequence(sequence: urls)
  .flatMap  url in
    Publishers.Future<Result, Error>  callback in 
        myCall  data, error in 
            if let data = data 
                callback(.success(data))
             else if let error = error 
                callback(.failure(error))
            
        
    
  

【问题讨论】:

【参考方案1】:

在操场上尝试了一段时间后,我相信我找到了解决方案,但是如果您有更好的想法,请分享。解决方法是在flatMap中添加maxPublishers参数,并将值设置为max(1)

Publishers.Sequence(sequence: urls)
  .flatMap(maxPublishers: .max(1)) // <<<<--- here
   url in 
    Publishers.Future<Result, Error>  callback in 
      myCall  data, error in 
        if let data = data 
          callback(.success(data))
         else if let error = error 
          callback(.failure(error))
        
      
    
  

【讨论】:

【参考方案2】:

您还可以在 observable 上使用 prepend(_:) method 来创建连接序列,我想这类似于 RxSwift 中的 Observable.concat(:)

这是一个简单的示例,我试图模拟您的用例,其中我有几个不同的序列,它们依次跟随。

func dataTaskPublisher(_ urlString: String) -> AnyPublisher<(data: Data, response: URLResponse), Never> 
    let interceptedError = (Data(), URLResponse())
    return Publishers.Just(URL(string: urlString)!)
                        .flatMap 
                            URLSession.shared
                                        .dataTaskPublisher(for: $0)
                                        .replaceError(with: interceptedError)
                        
                        .eraseToAnyPublisher()


let publisher: AnyPublisher<(data: Data, response: URLResponse), Never> = Publishers.Empty().eraseToAnyPublisher()


for urlString in [
    "http://ipv4.download.thinkbroadband.com/1MB.zip",
    "http://ipv4.download.thinkbroadband.com/50MB.zip",
    "http://ipv4.download.thinkbroadband.com/10MB.zip"
    ] 
        publisher = publisher.prepend(dataTaskPublisher(urlString)).eraseToAnyPublisher()


publisher.sink(receiveCompletion:  completion in
    print("Completed")
)  response in
    print("Data: \(response)")

在这里,prepend(_:) 运算符为序列添加前缀,因此,前置序列首先开始,完成,下一个序列开始。

如果您运行下面的代码,您应该会看到首先下载了 10 MB 的文件,然后是 50 MB,最后是 1 MB,因为最后一个前置文件首先开始,依此类推。

prepend(_:) 运算符还有其他变体,它采用数组,但这似乎不能按顺序工作。

【讨论】:

谢谢。我也尝试了前置方法。有用。但是,对于许多 url (1000+),构造基于 prepend(或附加)的发布者的循环在计算上变得昂贵。顺便说一句,您可以用 reduce 运算符替换循环,因此它也成为流的一部分。

以上是关于如何在组合中安排异步调用的同步序列?的主要内容,如果未能解决你的问题,请参考以下文章

SpringBoot集成篇 异步调用Async

springboot+async异步接口实现和调用

Sprint Boot如何基于Redis发布订阅实现异步消息系统的同步调用?

Linux下同步模式异步模式阻塞调用非阻塞调用总结

[概念] 同步 异步 阻塞 非阻塞

获取 异步执行调用的结果