Swift 5.5 并发:如何序列化异步任务以用 maxConcurrentOperationCount = 1 替换 OperationQueue?

Posted

技术标签:

【中文标题】Swift 5.5 并发:如何序列化异步任务以用 maxConcurrentOperationCount = 1 替换 OperationQueue?【英文标题】:Swift 5.5 Concurrency: how to serialize async Tasks to replace an OperationQueue with maxConcurrentOperationCount = 1? 【发布时间】:2021-12-31 08:48:17 【问题描述】:

我目前正在迁移我的应用程序以使用 Swift 中的并发模型。我想序列化任务以确保它们一个接一个地执行(没有并行性)。在我的用例中,我想收听 NotificationCenter 发布的通知,并在每次发布新通知时执行一个任务。但我想确保没有以前的任务正在运行。这相当于使用 maxConcurrentOperationCount = 1 的 OperationQueue。

例如,我在我的应用程序中使用 CloudKit 和 Core Data,并使用持久历史跟踪来确定商店中发生了哪些变化。 在这个Synchronizing a Local Store to the Cloud 示例代码中,Apple 使用操作队列来处理历史处理任务(在 CoreDataStack 中)。此 OperationQueue 的最大操作数设置为 1。

private lazy var historyQueue: OperationQueue = 
    let queue = OperationQueue()
    queue.maxConcurrentOperationCount = 1
    return queue
()

当收到一个 Core Data 通知时,一个新任务被添加到这个串行操作队列中。因此,如果收到很多通知,它们都会以串行方式一个接一个地执行。

@objc
func storeRemoteChange(_ notification: Notification) 
    // Process persistent history to merge changes from other coordinators.
    historyQueue.addOperation 
        self.processPersistentHistory()
    

在这个Loading and Displaying a Large Data Feed 示例代码中,Apple 使用任务来处理历史更改(在 QuakesProvider 中)。

// Observe Core Data remote change notifications on the queue where the changes were made.
notificationToken = NotificationCenter.default.addObserver(forName: .NSPersistentStoreRemoteChange, object: nil, queue: nil)  note in
    Task 
        await self.fetchPersistentHistory()
    

我觉得第二个项目有问题,因为任务可以按任何顺序发生,不一定按顺序发生(与第一个项目中 OperationQueue 作为 maxConcurrentOperationCount = 1 的情况相反)。

我们是否应该在某处使用演员来确保方法被串行调用?

我想过这样的实现,但我还不太适应:

actor PersistenceStoreListener 
    let historyTokenManager: PersistenceHistoryTokenManager = .init()
    private let persistentContainer: NSPersistentContainer

    init(persistentContainer: NSPersistentContainer) 
        self.persistentContainer = persistentContainer
    

    func processRemoteStoreChange() async 
        print("\(#function) called on \(Date.now.formatted(date: .abbreviated, time: .standard)).")
    

收到新通知时将调用 processRemoteStoreChange 方法的位置(AsyncSequence):

notificationListenerTask = Task 
   let notifications = NotificationCenter.default.notifications(named: .NSPersistentStoreRemoteChange, object: container.persistentStoreCoordinator)
   
   for await _ in notifications 
        print("notificationListenerTask called on \(Date.now.formatted(date: .abbreviated, time: .standard)).")
        await self.storeListener?.processRemoteStoreChange()
    

【问题讨论】:

在 Swift Concurrency 中,顺序执行是正常情况,除非您使用 async let 或分离任务。在一个任务组中,迭代可以同时执行,但结果是按顺序等待的。在您的情况下,AsyncSequence 是正确的方法。 @vadian 所以你的意思是 Apple 使用 Task (developer.apple.com/documentation/coredata/…) 的第二个项目中的代码是以串行方式执行任务?不存在稍后安排的任务在另一个已安排的任务之前执行的风险? 是的,正如名称AsyncSequence 所暗示的那样,它是一个序列,并且序列顺序执行。但请随意尝试。 Hum Apple 没有在他们的代码中使用 AsyncSequence(这是我的建议)。但是即使是,也意味着通知是按顺序接收的,但是当我在Task中调用异步方法时,它们也会按顺序执行吗?我对不同任务的实际处理方式有点困惑。 @matt 你是对的,我很抱歉。 TaskGroup 按完成顺序返回项目。但是 AsyncSequence 按顺序返回项目。 【参考方案1】:

使用actor,它的同步方法已经为您协调好了,不需要其他任何东西。例如。在下面的 Instruments 屏幕快照中,前四个 路标是我等待该 actor 的同步方法的位置,绿色间隔表示该方法实际运行的时间。

现在,如果您正在调用 Actor 的异步方法,那么 Actor 重入会干扰您想要实现的“一次一个”行为。 (有关重入的讨论,请参阅 SE-0306。)请参阅后四个路标,我将其中的 actor 异步方法称为异步方法,结果间隔以橙色显示。

简而言之,如果您的 actor 仅执行同步任务(这是您的情况),那么 actor 会自动产生 maxConcurrentOperationCount = 1 类型的行为。


FWIW,这不是很相关,但这是我的 minalist actor 用于上述内容:

actor ActorExample 
    func synchronousTask() 
        Thread.sleep(forTimeInterval: 2)
    

    func asynchronousTask() async throws 
        try await Task.sleep(nanoseconds: NSEC_PER_SEC * 2)
    

并分别调用它们,如下所示:

let example = ActorExample()

func startSynchronous() 
    Task(priority: .background) 
        os_signpost(.event, log: pointsOfInterest, name: #function)

        await example.synchronousTask()
    


func startAsynchronous() 
    Task(priority: .background) 
        os_signpost(.event, log: pointsOfInterest, name: #function)

        try await example.asynchronousTask()
    


你问:

如果您想了解有关processPersistentHistory 方法的更多详细信息,可以在本文中找到或多或少的内容:https://avanderlee.com/swift/persistent-history-tracking-core-data。您会看到它创建了一个新的核心数据backgroundContext 并执行一些操作,但以同步方式 (.performAndWait)。使用新的 Swift 并发模型和核心数据,这个 performAndWait 现在是 await context.perform ... 。但是如果我们使用Task await context.perform ... sync work ... ,我仍然不清楚如何将它们序列化,因为Task 可以在任意线程上运行。

上面我重点介绍了您问题标题中提出的问题,如何序列化Tasks。但如果问题真的是如何序列化perform(_:),看来您不必这样做。 documentation 表示它已经在队列上运行了已执行的请求:

在协调器的队列上异步执行闭包。

如果确实如此,那么您根本不必担心序列化它,因为 perform(_:) 会为您做到这一点。不需要额外的参与者或其他序列化机制。尽情享受 Swift 并发吧。

【讨论】:

感谢@Rob 的精彩解释。它巩固了我对演员的了解。 “processPersistentHistory”方法是异步的,因为它需要在 NSManagedObjectContext 线程上运行。所以我们需要‘await context.perform ’来确保执行主体在正确的线程上执行。这当然是如果我们想使用新的 async/await,因为我仍然可以使用 performAndWait 方法来同步运行主体。 不要将您在另一个线程(例如,在您的操作队列)上运行它的事实与processPersistentHistory 本身是否是异步的问题混为一谈。它没有完成处理程序并且你做了一个简单的addOperation,这表明它是一个同步方法,你只是在后台线程上异步运行。但是,如果方法本身是异步的,那么您分享的 addOperation 示例将无法正常工作。 如果您想了解更多有关processPersistentHistory 方法的详细信息,或多或少可以在本文中找到:avanderlee.com/swift/persistent-history-tracking-core-data。你会看到它创建了一个新的 Core Data backgroundContext 并执行了一些操作,但是是以同步的方式(.performAndWait)。使用新的 Swift 并发模型和核心数据,这个 performAndWait 现在是 await context.perform 。但是如果我们使用Task await context.perform ... sync work ... ,我仍然不清楚如何序列化这些,因为任务可以在任意线程上运行。 见上面修改后答案的结尾。

以上是关于Swift 5.5 并发:如何序列化异步任务以用 maxConcurrentOperationCount = 1 替换 OperationQueue?的主要内容,如果未能解决你的问题,请参考以下文章

Swift之深入解析如何使用并发系统并行运行多个任务

在 iOS swift 中异步/并发/并行运行任务

iOS(Swift) TaskProtocol异步任务队列

Swift异步序列构造器AsyncStream内部定时器(Timer)无法被触发的解决

Swift异步序列构造器AsyncStream内部定时器(Timer)无法被触发的解决

Swift 5.5 async let - 错误:表达式为“异步”但未标记为“等待”