Apple Combine框架:如何并行执行多个发布者并等待所有发布者完成?

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Apple Combine框架:如何并行执行多个发布者并等待所有发布者完成?相关的知识,希望对你有一定的参考价值。

我正在发现组合。我编写了以“组合”方式发出HTTP请求的方法,例如:

func testRawDataTaskPublisher(for url: URL) -> AnyPublisher<Data, Error> {
    var request = URLRequest(url: url,
                             cachePolicy: .useProtocolCachePolicy,
                             timeoutInterval: 15)
    request.httpMethod = "GET"

    return urlSession.dataTaskPublisher(for: request)
        .tryMap {
            return $0.data
        }
        .eraseToAnyPublisher()
}

例如,我想多次调用该方法并完成一项任务:

let myURLs: [URL] = ...

for url in myURLs {
    let cancellable = testRawDataTaskPublisher(for: url)
        .sink(receiveCompletion: { _ in }) { data in
            // save the data...
        }
}

上面的代码无法正常工作,因为我必须将cancellable存储在属于该类的变量中。第一个问题是:将许多(例如1000个)可取消对象存储在Set<AnyCancellable>之类的东西中是个好主意吗?会不会导致内存泄漏?

var cancellables = Set<AnyCancellable>()

...

    let cancellable = ...

    cancellables.insert(cancellable) // ???

第二个问题是:所有可取消条件完成后如何启动任务?我在想类似的东西

class Test {
    var cancellables = Set<AnyCancellable>()

    func run() {
        // show a loader

        let cancellable = runDownloads()
            .receive(on: RunLoop.main)
            .sink(receiveCompletion: { _ in }) { _ in
                // hide the loader
            }

        cancellables.insert(cancellable)
    }

    func runDownloads() -> AnyPublisher<Bool, Error> {
        let myURLs: [URL] = ...

        return Future<Bool, Error> { promise in
            let numberOfURLs = myURLS.count
            var numberOfFinishedTasks = 0

            for url in myURLs {
                let cancellable = testRawDataTaskPublisher(for: url)
                    .sink(receiveCompletion: { _ in }) { data in
                        // save the data...
                        numberOfFinishedTasks += 1

                        if numberOfFinishedTasks >= numberOfURLs {
                            promise(.success(true))
                        }
                    }

                cancellables.insert(cancellable)
            }
        }.eraseToAnyPublisher()
    }

    func testRawDataTaskPublisher(for url: URL) -> AnyPublisher<Data, Error> {
        ...
    }
}

通常,我会使用DispatchGroup,启动多个HTTP任务并在任务完成时使用通知,但我想知道如何使用Combine以现代的方式编写它。

答案

您可以通过创建发布者集合,应用flatMap运算符,然后应用collect来等待所有发布者完成才能并行运行某些操作,然后再继续。这是您可以在操场上运行的示例:

以上是关于Apple Combine框架:如何并行执行多个发布者并等待所有发布者完成?的主要内容,如果未能解决你的问题,请参考以下文章

如何在机器人框架中并行运行多个测试套件上的多个测试用例 | Python

Combine

Combine如何驯服Future发布器的“怪癖”

使用 Swift 和 Combine 链接 + 压缩多个网络请求

将 .combine 与 cforest 一起使用时遇到问题

35 并行/发 同/异步 非/ 阻塞 进程的两种开启方式, 进程的常用方法及属性