组合框架:如何在继续之前异步处理数组的每个元素
Posted
技术标签:
【中文标题】组合框架:如何在继续之前异步处理数组的每个元素【英文标题】:Combine framework: how to process each element of array asynchronously before proceeding 【发布时间】:2020-09-02 13:53:23 【问题描述】:我在使用 ios 组合框架时有点精神障碍。
我正在将一些代码从远程 API 的“手动”获取转换为使用 Combine。基本上,API 是 SQL 和 REST(实际上是 Salesforce,但这与问题无关)。代码用来调用一个带有完成处理程序的 REST 查询方法。我正在做的是用组合未来替换它。到目前为止,一切顺利。
当以下情况发生(并且经常发生)时,就会出现问题:
我们执行 REST 查询并返回一组“对象”。
但这些“对象”并未完全填充。它们中的每一个都需要来自某个相关对象的附加数据。因此,对于每个“对象”,我们使用来自该“对象”的信息执行另一个 REST 查询,从而为我们提供 另一个“对象”数组。
这可能允许我们完成第一个“对象”的填充,也可能不允许 - 否则,我们可能必须使用来自每个 第二个的信息进行 另一个 REST 查询 “对象”,等等。
结果是很多这样的代码结构(这是伪代码):
func fetchObjects(completion: @escaping ([Object] -> Void)
let restQuery = ...
RESTClient.performQuery(restQuery) results in
let partialObjects = results.map ...
let group = DispatchGroup()
for partialObject in partialObjects
let restQuery = ... // something based on partialObject
group.enter()
RESTClient.performQuery(restQuery) results in
group.leave()
let partialObjects2 = results.map ...
partialObject.property1 = // something from partialObjects2
partialObject.property2 = // something from partialObjects2
// and we could go down yet _another_ level in some cases
group.notify
completion([partialObjects])
每次我在伪代码中说results in
,这就是异步网络调用的完成处理程序。
好的,我已经很清楚如何在 Combine 中链接异步调用了,例如使用 Futures 和 flatMap
(又是伪代码):
let future1 = Future...
future1.map
// do something
.flatMap
let future2 = Future...
return future2.map
// do something
// ...
在该代码中,我们形成future2
的方式可以取决于我们从future1
的执行中收到的值,并且在future2
上的map
中,我们可以在收到之前修改从上游收到的内容通过管道传递。没问题。这一切都很漂亮。
但这并没有告诉我我在预组合代码中所做的事情,即循环。在这里,我在一个 loop 中执行 多个 异步调用,在继续之前由 DispatchGroup 固定。问题是:
什么是组合模式?
记住情况。我有一个对象的 array。我想 loop 遍历该数组,对循环中的 each 对象进行异步调用,异步获取新信息并在此基础上修改该对象,然后再继续管道。每个循环都可能涉及一个进一步的嵌套循环,甚至异步收集更多信息:
Fetch info from online database, it's an array
|
V
For each element in the array, fetch _more_ info, _that's_ an array
|
V
For each element in _that_ array, fetch _more_ info
|
V
Loop thru the accumulated info and populate that element of the original array
执行此操作的旧代码看起来很糟糕,充满了嵌套的完成处理程序和由 DispatchGroup enter
/leave
/notify
保存的循环。 但它起作用了。我无法让我的组合代码以相同的方式工作。我该怎么做?基本上我的管道输出是一个数组,我觉得我需要将该数组拆分为单个元素,对每个元素执行 异步 操作,然后将这些元素重新组合到一个数组中。怎么样?
我一直在解决这个问题的方法有效,但无法扩展,尤其是当异步调用需要在管道链中返回几个步骤的信息时。我一直在做这样的事情(我从https://***.com/a/58708381/341994得到这个想法):
一组对象从上游到达。
我将 flatMap
和 map
数组输入到发布者数组中,每个发布者都以 Future 为首,该 Future 获取与 one 对象相关的更多在线内容,然后是产生修改的对象的管道。
现在我有一个管道数组,每个管道生成一个对象。我 merge
该数组并从 flatMap
生成该发布者(MergeMany)。
我将collect
得到的值返回到一个数组中。
但这似乎仍然需要大量工作,更糟糕的是,当每个子管道本身需要生成一组子管道时,它无法扩展。这一切都变得难以理解,过去很容易到达完成块的信息(因为 Swift 的范围规则)不再到达主管道的后续步骤(或者因为我在管道中传递越来越大的元组而很难到达)。
必须有一些简单的组合模式才能做到这一点,但我完全错过了它。请告诉我它是什么。
【问题讨论】:
只是出于好奇,需要实体数组吗?仅使用 flatMap,您将在每个实体完成时一次获得一个。实体可以在完成时更新,而不是等到一切都完成。 @JefferyThomas 好吧,我想这取决于您所说的“需要”是什么意思。上游 API 返回一个数组,下游视图控制器需要一个数组。所以管道的端点并不完全取决于我,如果你明白我的意思的话。 @JefferyThomas 我也不知道你说的“只有flatMap
”是什么意思。仅使用 flatMap
不会展平数组。
哦,是的,我使用 MergeMany 来组合 flatMap 中的发布者数组。这是一个重要的细节。
@JefferyThomas 所以你指的是我已经在做的事情。但那是我不想做的。
【参考方案1】:
使用公认的答案,我最终得到了这个结构:
head // [Entity]
.flatMap entities -> AnyPublisher<Entity, Error> in
Publishers.Sequence(sequence: entities).eraseToAnyPublisher()
.flatMap entity -> AnyPublisher<Entity, Error> in
self.makeFuture(for: entity) // [Derivative]
.flatMap derivatives -> AnyPublisher<Derivative, Error> in
Publishers.Sequence(sequence: derivatives).eraseToAnyPublisher()
.flatMap derivative -> AnyPublisher<Derivative2, Error> in
self.makeFuture(for: derivative).eraseToAnyPublisher() // Derivative2
.collect().map derivative2s -> Entity in
self.configuredEntity(entity, from: derivative2s)
.eraseToAnyPublisher()
.collect()
这正是我一直在寻找的优雅紧密度!所以这个想法是:
我们收到一个数组,我们需要异步处理每个元素。旧方法是 DispatchGroup 和 for...in
循环。等价的组合是:
for...in
行的等效项是 flatMap
和 Publishers.Sequence。
DispatchGroup(处理异步)的等价物是另一个flatMap
(在单个元素上)和一些发布者。就我而言,我从基于我们刚刚收到的单个元素的 Future 开始。
末尾右花括号的等价物是collect()
,等待所有元素处理完毕,再将数组重新组合在一起。
所以总结一下,模式是:
flatMap
数组到序列。
flatMap
将单个元素发送给在该元素上启动异步操作的发布者。
根据需要继续该发布者的链。
collect
回到数组中。
通过嵌套该模式,我们可以利用 Swift 范围规则将我们需要处理的事物保持在范围内,直到我们获得足够的信息来生成已处理的对象。
【讨论】:
嗨@matt 这似乎是一个非常有趣的方法,我可以帮助我!您是否可以通过 Github gist 共享示例代码和一些虚拟数据进行测试?在我的用例中,我还获取一个包含 2 个嵌套数组的数据数组,每个数组都有一个要下载的图像,其中包含自己相关的嵌套数组。提前谢谢! 好的,我会尝试一下 tomm...如果有虚拟数据(即类似于您的用例的数组结构)来测试这些嵌套发布者以真正了解发生了什么,这将非常有帮助。谢谢 您好@matt,我尝试将您的解决方案应用于我的用例(进行了一些调整),并且确实成功地从 API 服务器获得了预期的响应。但是,我遇到了一个问题,我不知道如何将返回的 Future 对象数组分配为关联的数组实体。我在这里详细发布了我的问题:***.com/questions/62895539/…。任何帮助将不胜感激!谢谢 我们不能使用 head.publisher 代替 Publishers.Sequence 吗?我对 Combine 比较陌生,所以我不确定这是否是最近添加的。 @Aswath 要作为序列发布的内容是derivatives
,而不是head
。符号由您决定;我觉得这种方式更清晰。【参考方案2】:
您的最新编辑和下面的评论:
我实际上是在问是否存在相当于“在涉及多个异步步骤的这一步完成之前不要继续下一步”
我认为这种模式可以通过 .flatMap
到一个数组发布者 (Publishers.Sequence) 来实现,该发布者一个接一个地发出并完成,然后是所需的每个元素异步处理,并用 @ 完成987654322@,等待所有元素完成后再继续
所以,在代码中,假设我们有这些函数:
func getFoos() -> AnyPublisher<[Foo], Error>
func getPartials(for: Foo) -> AnyPublisher<[Partial], Error>
func getMoreInfo(for: Partial, of: Foo) -> AnyPublisher<MoreInfo, Error>
我们可以做到以下几点:
getFoos()
.flatMap fooArr in
fooArr.publisher.setFailureType(to: Error.self)
// per-foo element async processing
.flatMap foo in
getPartials(for: foo)
.flatMap partialArr in
partialArr.publisher.setFailureType(to: Error.self)
// per-partial of foo async processing
.flatMap partial in
getMoreInfo(for: partial, of: foo)
// build completed partial with more info
.map moreInfo in
var newPartial = partial
newPartial.moreInfo = moreInfo
return newPartial
.collect()
// build completed foo with all partials
.map partialArr in
var newFoo = foo
newFoo.partials = partialArr
return newFoo
.collect()
(删除旧答案)
【讨论】:
谢谢。在我看来,这与我已经在做的没有什么不同。我也不得不求助于元组来保持所有信息在管道中一起流动。您的代码以失败类型为 Never 的主题开头并使用 要求失败类型为 Never 的arrOfFoos.publisher
,从而丢失错误信息;我不愿意这样做,这就是为什么我在数组上使用.map
。
也许我没有关注。这个管道从一个部分对象开始——它不关心你是怎么得到它的。但是您可以随时处理原始请求的错误(使用Catch
等),并且仅在成功的情况下将对象提供给fooSubject
。这里和您所做的最大区别在于它创建了一个 Foo 管道,并按顺序将 Foo 对象输入其中,而不是创建并行管道。它也是一个线性组合(无嵌套),可以像您需要的那样使用 .Zip
传播上游值
你是指哪个.map
? future1.map
只是将一个结果 - 一个数组 - 映射到另一个对象数组中。您可以修改您的部分对象,但是在将它们发送到他们自己的管道之前,您可以根据需要修改它们,尽管以 Never 的失败类型开始(这是有道理的,因为它们是真实的对象),不需要在没有失败类型的情况下结束。实际上,它允许您根据需要处理每个单独对象的错误,或者不处理它,视情况而定,并返回 `Resultcollect
它们。但是通过嵌套这些,我们可以在不使用元组的情况下将它运行到我们需要的深度,因为我们现在拥有与 DispatchQueue 相同的 Swift 范围规则。让我尝试将我的代码转换为这种结构,然后看看...!
是的,这正是我需要的模式。这个基本想法——flatMap
到 Publishers.Sequence,处理每个项目以基于它的 Future 开始,并以 collect
结束——然后根据需要嵌套它们,这是我希望的干净外观。基本上是我用for...in
和一个DispatchGroup制作的同一个巢,它做同样的事情:在每个级别,我们不会通过collect
,直到前面的东西改变了每个元素的原始数组。这就是我的问题所要求的!以上是关于组合框架:如何在继续之前异步处理数组的每个元素的主要内容,如果未能解决你的问题,请参考以下文章