组合框架:如何在继续之前异步处理数组的每个元素

Posted

技术标签:

【中文标题】组合框架:如何在继续之前异步处理数组的每个元素【英文标题】:Combine framework: how to process each element of array asynchronously before proceeding 【发布时间】:2020-09-02 13:53:23 【问题描述】:

我在使用 ios 组合框架时有点精神障碍。

我正在将一些代码从远程 API 的“手动”获取转换为使用 Combine。基本上,API 是 SQL 和 REST(实际上是 Salesforce,但这与问题无关)。代码用来调用一个带有完成处理程序的 REST 查询方法。我正在做的是用组合未来替换它。到目前为止,一切顺利。

当以下情况发生(并且经常发生)时,就会出现问题:

    我们执行 REST 查询并返回一组“对象”。

    但这些“对象”并未完全填充。它们中的每一个都需要来自某个相关对象的附加数据。因此,对于每个“对象”,我们使用来自该“对象”的信息执行另一个 REST 查询,从而为我们提供 另一个“对象”数组。

    这可能允许我们完成第一个“对象”的填充,也可能不允许 - 否则,我们可能必须使用来自每个 第二个的信息进行 另一个 REST 查询 “对象”,等等。

结果是很多这样的代码结构(这是伪代码):

func fetchObjects(completion: @escaping ([Object] -> Void) 
    let restQuery = ...
    RESTClient.performQuery(restQuery)  results in
        let partialObjects = results.map  ... 
        let group = DispatchGroup()
        for partialObject in partialObjects 
            let restQuery = ... // something based on partialObject
            group.enter()
            RESTClient.performQuery(restQuery)  results in
                group.leave()
                let partialObjects2 = results.map  ... 
                partialObject.property1 = // something from partialObjects2
                partialObject.property2 = // something from partialObjects2
                // and we could go down yet _another_ level in some cases
            
        
        group.notify 
            completion([partialObjects])
        
    

每次我在伪代码中说results in,这就是异步网络调用的完成处理程序。

好的,我已经很清楚如何在 Combine 中链接异步调用了,例如使用 Futures 和 flatMap(又是伪代码):

let future1 = Future...
future1.map 
    // do something
.flatMap 
    let future2 = Future...
    return future2.map 
        // do something
    

// ...

在该代码中,我们形成future2 的方式可以取决于我们从future1 的执行中收到的值,并且在future2 上的map 中,我们可以在收到之前修改从上游收到的内容通过管道传递。没问题。这一切都很漂亮。

但这并没有告诉我我在预组合代码中所做的事情,即循环。在这里,我在一个 loop 中执行 多个 异步调用,在继续之前由 DispatchGroup 固定。问题是:

什么是组合模式?

记住情况。我有一个对象的 array。我想 loop 遍历该数组,对循环中的 each 对象进行异步调用,异步获取新信息并在此基础上修改该对象,然后再继续管道。每个循环都可能涉及一个进一步的嵌套循环,甚至异步收集更多信息:

Fetch info from online database, it's an array
   |
   V
For each element in the array, fetch _more_ info, _that's_ an array
   |
   V
For each element in _that_ array, fetch _more_ info
   |
   V
Loop thru the accumulated info and populate that element of the original array 

执行此操作的旧代码看起来很糟糕,充满了嵌套的完成处理程序和由 DispatchGroup enter/leave/notify 保存的循环。 但它起作用了。我无法让我的组合代码以相同的方式工作。我该怎么做?基本上我的管道输出是一个数组,我觉得我需要将该数组拆分为单个元素,对每个元素执行 异步 操作,然后将这些元素重新组合到一个数组中。怎么样?


我一直在解决这个问题的方法有效,但无法扩展,尤其是当异步调用需要在管道链中返回几个步骤的信息时。我一直在做这样的事情(我从https://***.com/a/58708381/341994得到这个想法):

    一组对象从上游到达。

    我将 flatMapmap 数组输入到发布者数组中,每个发布者都以 Future 为首,该 Future 获取与 one 对象相关的更多在线内容,然后是产生修改的对象的管道。

    现在我有一个管道数组,每个管道生成一个对象。我 merge 该数组并从 flatMap 生成该发布者(MergeMany)。

    我将collect 得到的值返回到一个数组中。

但这似乎仍然需要大量工作,更糟糕的是,当每个子管道本身需要生成一组子管道时,它无法扩展。这一切都变得难以理解,过去很容易到达完成块的信息(因为 Swift 的范围规则)不再到达主管道的后续步骤(或者因为我在管道中传递越来越大的元组而很难到达)。

必须有一些简单的组合模式才能做到这一点,但我完全错过了它。请告诉我它是什么。

【问题讨论】:

只是出于好奇,需要实体数组吗?仅使用 flatMap,您将在每个实体完成时一次获得一个。实体可以在完成时更新,而不是等到一切都完成。 @JefferyThomas 好吧,我想这取决于您所说的“需要”是什么意思。上游 API 返回一个数组,下游视图控制器需要一个数组。所以管道的端点并不完全取决于我,如果你明白我的意思的话。 @JefferyThomas 我也不知道你说的“只有flatMap”是什么意思。仅使用 flatMap 不会展平数组。 哦,是的,我使用 MergeMany 来组合 flatMap 中的发布者数组。这是一个重要的细节。 @JefferyThomas 所以你指的是我已经在做的事情。但那是我不想做的。 【参考方案1】:

使用公认的答案,我最终得到了这个结构:

head // [Entity]
    .flatMap  entities -> AnyPublisher<Entity, Error> in
        Publishers.Sequence(sequence: entities).eraseToAnyPublisher()
    .flatMap  entity -> AnyPublisher<Entity, Error> in
        self.makeFuture(for: entity) // [Derivative]
            .flatMap  derivatives -> AnyPublisher<Derivative, Error> in
                Publishers.Sequence(sequence: derivatives).eraseToAnyPublisher()
            
            .flatMap  derivative -> AnyPublisher<Derivative2, Error> in
                self.makeFuture(for: derivative).eraseToAnyPublisher() // Derivative2
        .collect().map  derivative2s -> Entity in
            self.configuredEntity(entity, from: derivative2s)
        .eraseToAnyPublisher()
    .collect()

这正是我一直在寻找的优雅紧密度!所以这个想法是:

我们收到一个数组,我们需要异步处理每个元素。旧方法是 DispatchGroup 和 for...in 循环。等价的组合是:

for...in 行的等效项是 flatMap 和 Publishers.Sequence。

DispatchGroup(处理异步)的等价物是另一个flatMap(在单个元素上)和一些发布者。就我而言,我从基于我们刚刚收到的单个元素的 Future 开始。

末尾右花括号的等价物是collect(),等待所有元素处理完毕,再将数组重新组合在一起。

所以总结一下,模式是:

    flatMap 数组到序列。 flatMap 将单个元素发送给在该元素上启动异步操作的发布者。 根据需要继续该发布者的链。 collect 回到数组中。

通过嵌套该模式,我们可以利用 Swift 范围规则将我们需要处理的事物保持在范围内,直到我们获得足够的信息来生成已处理的对象。

【讨论】:

嗨@matt 这似乎是一个非常有趣的方法,我可以帮助我!您是否可以通过 Github gist 共享示例代码和一些虚拟数据进行测试?在我的用例中,我还获取一个包含 2 个嵌套数组的数据数组,每个数组都有一个要下载的图像,其中包含自己相关的嵌套数组。提前谢谢! 好的,我会尝试一下 tomm...如果有虚拟数据(即类似于您的用例的数组结构)来测试这些嵌套发布者以真正了解发生了什么,这将非常有帮助。谢谢 您好@matt,我尝试将您的解决方案应用于我的用例(进行了一些调整),并且确实成功地从 API 服务器获得了预期的响应。但是,我遇到了一个问题,我不知道如何将返回的 Future 对象数组分配为关联的数组实体。我在这里详细发布了我的问题:***.com/questions/62895539/…。任何帮助将不胜感激!谢谢 我们不能使用 head.publisher 代替 Publishers.Sequence 吗?我对 Combine 比较陌生,所以我不确定这是否是最近添加的。 @Aswath 要作为序列发布的内容是derivatives,而不是head。符号由您决定;我觉得这种方式更清晰。【参考方案2】:

您的最新编辑和下面的评论:

我实际上是在问是否存在相当于“在涉及多个异步步骤的这一步完成之前不要继续下一步”

我认为这种模式可以通过 .flatMap 到一个数组发布者 (Publishers.Sequence) 来实现,该发布者一个接一个地发出并完成,然后是所需的每个元素异步处理,并用 @ 完成987654322@,等待所有元素完成后再继续

所以,在代码中,假设我们有这些函数:

func getFoos() -> AnyPublisher<[Foo], Error>
func getPartials(for: Foo) -> AnyPublisher<[Partial], Error>
func getMoreInfo(for: Partial, of: Foo) -> AnyPublisher<MoreInfo, Error>

我们可以做到以下几点:

getFoos()
.flatMap  fooArr in 
    fooArr.publisher.setFailureType(to: Error.self)
 

// per-foo element async processing
.flatMap  foo in

  getPartials(for: foo)
    .flatMap  partialArr in
       partialArr.publisher.setFailureType(to: Error.self)
     

     // per-partial of foo async processing
    .flatMap  partial in

       getMoreInfo(for: partial, of: foo)
         // build completed partial with more info
         .map  moreInfo in
            var newPartial = partial
            newPartial.moreInfo = moreInfo
            return newPartial
         
     
     .collect()
     // build completed foo with all partials
     .map  partialArr in
        var newFoo = foo
        newFoo.partials = partialArr
        return newFoo
     

.collect()

(删除旧答案)

【讨论】:

谢谢。在我看来,这与我已经在做的没有什么不同。我也不得不求助于元组来保持所有信息在管道中一起流动。您的代码以失败类型为 Never 的主题开头并使用 要求失败类型为 Never 的 arrOfFoos.publisher,从而丢失错误信息;我不愿意这样做,这就是为什么我在数组上使用.map 也许我没有关注。这个管道从一个部分对象开始——它不关心你是怎么得到它的。但是您可以随时处理原始请求的错误(使用Catch 等),并且仅在成功的情况下将对象提供给fooSubject。这里和您所做的最大区别在于它创建了一个 Foo 管道,并按顺序将 Foo 对象输入其中,而不是创建并行管道。它也是一个线性组合(无嵌套),可以像您需要的那样使用 .Zip 传播上游值 你是指哪个.mapfuture1.map 只是将一个结果 - 一个数组 - 映射到另一个对象数组中。您可以修改您的部分对象,但是在将它们发送到他们自己的管道之前,您可以根据需要修改它们,尽管以 Never 的失败类型开始(这是有道理的,因为它们是真实的对象),不需要在没有失败类型的情况下结束。实际上,它允许您根据需要处理每个单独对象的错误,或者不处理它,视情况而定,并返回 `Result 或其他东西 // @matt 对,我想我喜欢这个。我相信我们仍在做同样的事情:flatMap 并使用 Publishers.Sequence 拆分数组,然后处理每个项目并在最后处理 collect 它们。但是通过嵌套这些,我们可以在不使用元组的情况下将它运行到我们需要的深度,因为我们现在拥有与 DispatchQueue 相同的 Swift 范围规则。让我尝试将我的代码转换为这种结构,然后看看...! 是的,这正是我需要的模式。这个基本想法——flatMap 到 Publishers.Sequence,处理每个项目以基于它的 Future 开始,并以 collect 结束——然后根据需要嵌套它们,这是我希望的干净外观。基本上我用for...in和一个DispatchGroup制作的同一个巢,它做同样的事情:在每个级别,我们不会通过collect,直到前面的东西改变了每个元素的原始数组。这就是我的问题所要求的!

以上是关于组合框架:如何在继续之前异步处理数组的每个元素的主要内容,如果未能解决你的问题,请参考以下文章

每个数组元素的异步调用并等待完成[重复]

在继续该功能之前等待保存

Java编程的逻辑 (94) - 组合式异步编程

Java编程的逻辑 (94) - 组合式异步编程

(94) 组合式异步编程 / 计算机程序的思维逻辑

Java组合式异步编程