在调度屏障中同时执行多个块

Posted

技术标签:

【中文标题】在调度屏障中同时执行多个块【英文标题】:Execute multiple blocks concurrently in a dispatch barrier 【发布时间】:2018-05-15 00:55:40 【问题描述】:

我想向 Web 服务发送 2 种类型的请求。第一个是更改后端状态的 POST。第二个是从后端检索数据的 GET。我希望能够同时发送多个 POST 请求,因为它们不会导致不同步。但是,我希望与 POST 请求相关联地串行发送 GET 请求(当发送 GET 请求时,在没有收到 GET 响应时不能发送 POST 请求。我使用 GET 请求的调度屏障实现了这一点. 我的问题是,在执行一个 GET 请求时,我希望同时发送更多 GET 请求的选项以及在收到最后一个发送的 GET 请求的响应时打破障碍。 我一直在尝试使用调度障碍来完成这项工作,但到目前为止还没有找到解决方案。也许应该在其他地方寻找解决方案。

【问题讨论】:

【参考方案1】:

如果使用GCD屏障方式,有两点需要注意:

    在网络请求完成之前,您的分派任务不应完成(否则您正在同步请求的发出,而不是整个请求到响应的过程)。

    你需要有三种类型的任务可以被分派:GETs(没有障碍),POSTs(没有障碍),和一些“切换”任务(有障碍)你每当您从 GET 切换到 POST 时使用。这个“切换”任务不需要做任何事情,但它就在那里,所以你有一个障碍。

    因此,请跟踪最后一个请求是 GET 还是 POST,例如lastRequestType,如果新任务不属于同一类型,那么在调度新的网络请求任务之前先调度你的“切换”屏障任务。

    显然,“检查lastRequestType,发出'switch'屏障并在必要时更新最后一个请求类型,并发出新请求”的整个过程需要同步以使其线程安全。

还有其他方法。例如,您可以使用操作队列(可能一个用于最近的 GET,一个用于最近的 POST),并使用依赖项来确保 POST 等待先前的 GET,反之亦然。同样,您将需要这个“最后一个请求类型是 GET 或 POST”变量,以了解您是否需要添加依赖项(同样,这一切都需要正确同步)。这种方法可以让您:

让你将异步网络请求包装在NSOperation子类中,从而避免上面第1点的丑陋;和

允许您控制并发程度,这是我在处理大量网络请求时总是喜欢做的事情(尤其是当它们可能很慢时)。


如果你原谅我这么说,虽然上述两种方法都有效,但整个想法感觉设计过度了。我建议您非常努力地挑战 GET 和 POST 不能同时发生的前提。这个决定感觉有点武断,就像一些工程师在吐槽,寻找一个简单的解决方案,并建议 GET 和 POST 不应该同时发生。我会花一点时间弄清楚替代方案的外观,然后再实施上述方法。我们中的许多人使用并发 GET 和 POST 编写应用程序,但没有这种复杂性。

例如,如果我有一系列想要执行的 POST 请求,并且想要在完成后发出最终的 GET 请求,我可能会推荐 DispatchGroup,其中使用组发送各个帖子,然后可以在您想要进行最终 GET 时收到通知:

let group = DispatchGroup()

networkQueue.async(group: group) 
    // POST 1


networkQueue.async(group: group) 
    // POST 2


group.notify(queue: networkQueue) 
    // final GET

或者,如果您使用简单的异步方法(没有信号量,从而避免不必要地阻塞线程),您仍然可以使用调度组:

let group = DispatchGroup()

group.enter()
performFirstPost 
    // POST 1 finished
    group.leave()


group.enter()
performSecondPost 
    // POST 2 finished
    group.leave()


group.notify(queue: networkQueue) 
    // final GET

【讨论】:

感谢您的详尽回答,以下是我的解释: 1. 为什么我需要这种同步?情况大致如下: GET 请求从后端检索项目列表。 POST 请求标记该列表中的一项(例如将其标记为收藏项)。如果您发送 POST 请求并且在收到响应之前发送 GET 请求,则无法保证后端接收这些请求的顺序以及应用程序接收响应的顺序. 最坏的情况是:1.POST请求,2.GET请求,3.后端收到GET请求,4.后端收到POST请求,5.应用收到GET响应,6.在应用程序中收到 POST 响应。在这种情况下,将会发生的情况是:用户将尝试将一个项目标记为收藏,然后请求刷新,获得该项目被标记为收藏的反馈,然后获得一个刷新列表,其中该项目未标记为收藏,而同一项目实际上在后端被标记为收藏。 2.我使用信号量使请求-响应过程成为一个阻塞过程。 3. 带有开关屏障任务的想法是一个非常有趣的想法。不幸的是,它不处理另一种特殊情况。每个 GET 请求-响应可能需要不同的时间来执行。我想要的是等待最新的,而忽略在最新的之前完成的。通过使用任何类型的屏障,我仍然必须等待旧的 GET 请求(可能比新的请求更长)完成,然后才能再次发送其他 POST 请求。 重新设置信号量以使其同步运行,这很棒。令人惊讶的是,我经常在 S.O. 上看到问题。他们不这样做的地方,他们想知道为什么它不等待响应。关于更大的问题,我的问题是为什么不在 POST 请求的完成处理程序中执行 GET 请求。那么你就不必担心了。或者,更好的是,如果您可以控制 Web 服务 API,让 POST 响应(如果您愿意,可以选择)从后续 GET 请求中返回您想要的内容,然后您消除整个请求并提高性能并解决问题。 在 POST 请求的完成处理程序中发送 GET 请求是不可行的,因为“我希望能够同时发送多个 POST 请求”。因此,应该在所有其他 POST 请求响应完成后发送 GET 请求。【参考方案2】:

经过我非常感谢@Rob的大量思考和大量帮助,结果是最好不要使用GCD而使用OperationQueue。 This is a gist with a playground of the implementation along with the tests. 这是它的代码:

import PlaygroundSupport

import Foundation

final class OperationsManager 

    private let serialQueue = DispatchQueue(label: "serialQueue")

    private let nonexclusiveOperationsQueue = OperationQueue()
    private let exclusiveOperationQueue = OperationQueue()

    func add(operation: Operation, exclusive: Bool) 
        serialQueue.async 
            if exclusive 
                self.exclusiveOperationQueue.cancelAllOperations()
                print("Ignore the finish of the previous exclusive operation")

                self.nonexclusiveOperationsQueue.operations.forEach 
                    nonexclusiveOperation in
                    operation.addDependency(nonexclusiveOperation)
                
                self.exclusiveOperationQueue.addOperation(operation)
             else 
                self.exclusiveOperationQueue.operations.forEach 
                    exclusiveOperation in
                    operation.addDependency(exclusiveOperation)
                
                self.nonexclusiveOperationsQueue.addOperation(operation)
            
        
    



final class BlockedAsynchronousOperation: Operation 

    private var semaphore: DispatchSemaphore?
    private let block: (@escaping () -> ()) -> ()

    init(block: @escaping (@escaping () -> ()) -> ()) 
        self.block = block
    

    override func cancel() 
        super.cancel()
        semaphore?.signal()
    

    override func main() 
        super.main()

        semaphore = DispatchSemaphore(value: 0)
        block 
            [weak self] in
            self?.semaphore?.signal()
        
        semaphore!.wait()
    



///////////////////////////////////////////////////////////////////
func longRunningOperation(
    seconds: Int, completionHandler: @escaping () -> ()) 
    DispatchQueue.global().asyncAfter(
        deadline: .now() + .seconds(seconds),
        execute: completionHandler)


func blockedAsynchronousOperation(
    withID id: String, seconds: Int) -> BlockedAsynchronousOperation 
    return BlockedAsynchronousOperation 
        unblockHandler in
        print("Operation with ID: \(id) started")
        longRunningOperation(seconds: seconds) 
            unblockHandler()
            print("Operation with ID: \(id) finished")
        
    


func addOperation(
    withID id: Int,
    exclusive: Bool,
    atSeconds startSeconds: Int,
    duration: Int,
    inOperationsManager operationsManager: OperationsManager) 
    let block = 
        operationsManager.add(operation:
            blockedAsynchronousOperation(
                withID: (exclusive ? "GET " : "POST ") +
                    String(id), seconds: duration), exclusive: exclusive)
    
    if startSeconds > 0 
        DispatchQueue.global().asyncAfter(
            deadline: .now() + .seconds(startSeconds), execute: block)
     else 
        block()
    


///////////////////////////////////////////////////////////////////
print("global start\n")

let operationsManager = OperationsManager()

addOperation(
    withID: 1,
    exclusive: false,
    atSeconds: 0,
    duration: 7,
    inOperationsManager: operationsManager)
addOperation(
    withID: 2,
    exclusive: false,
    atSeconds: 0,
    duration: 5,
    inOperationsManager: operationsManager)
addOperation(
    withID: 1,
    exclusive: true,
    atSeconds: 0,
    duration: 10,
    inOperationsManager: operationsManager)
addOperation(
    withID: 2,
    exclusive: true,
    atSeconds: 3,
    duration: 10,
    inOperationsManager: operationsManager)
addOperation(
    withID: 3,
    exclusive: true,
    atSeconds: 10,
    duration: 10,
    inOperationsManager: operationsManager)
addOperation(
    withID: 3,
    exclusive: false,
    atSeconds: 15,
    duration: 5,
    inOperationsManager: operationsManager)
addOperation(
    withID: 4,
    exclusive: false,
    atSeconds: 16,
    duration: 5,
    inOperationsManager: operationsManager)
addOperation(
    withID: 4,
    exclusive: true,
    atSeconds: 28,
    duration: 10,
    inOperationsManager: operationsManager)
addOperation(
    withID: 5,
    exclusive: true,
    atSeconds: 31,
    duration: 20,
    inOperationsManager: operationsManager)
addOperation(
    withID: 6,
    exclusive: true,
    atSeconds: 34,
    duration: 2,
    inOperationsManager: operationsManager)

print("\nglobal end\n")

PlaygroundPage.current.needsIndefiniteExecution = true

【讨论】:

一些小观察: 1. 在add 中,您可以使用serialQueue.async 而不是sync。 2.你的BlockedAsynchronousOperation显然不是异步操作。这是一个同步操作。 (一个操作是否为asynchronous,取决于它在操作完成之前是否从main返回。)这里不重要,但它只是暗示了对异步操作的误解。如果您使用Operation 路由,我还想确保您从网络代码中删除信号量。但是有了这些细微的观察,这应该可以满足您的需求。 我的考虑: 1.访问是同步的,目的是锁定添加独占操作的过程,否则独占操作队列可能一次有多个操作。 2. 在我的具体实现中,连接到网络的部分被抽象出来(它只是对资源的请求,在一段时间后返回一个值)——这就是我不想包含网络代码的原因。我的想法是让 BlockedAsynchronousOperation 成为调度本身异步的进程并使其轻松同步的通用方法。 1.如果它是一个串行队列,并且如果您对同步对象所做的一切都在该队列中,则没有理由等待它。例如。在读写器模式中,读取是同步完成的(因为您必须等待值),但写入可以通过屏障异步完成。 2. 好的,但是创建一个名为BlockedAsynchronousOperation 这不是异步操作。 (你明白我的意思吗,“异步”操作具有非常具体的含义,这与它相对于主线程是异步的事实无关,而是main?) ... BlockedAsynchronousOperation 演示的一个更好的例子可能是一个操作,它包装了一个像 GCD 计时器这样的东西(你安排计时器,main/start 然后返回,然后你在计时器触发或操作被取消之前,不要将操作状态更改为finished)。见Asynchronous vs Synchronous Operations。顺便说一句,请原谅相当次要的意见:但你确实问过我的想法。 ;) @Rob,非常感谢您的想法,如果不是您的想法,解决方案甚至都不存在:)

以上是关于在调度屏障中同时执行多个块的主要内容,如果未能解决你的问题,请参考以下文章

linux进程的管理和调度 --- 调度相关

Servlet容器如何同时来处理多个请求

在 Objective C 中调度块的执行

个人线程记录

操作系统核心原理-3.进程原理(中):进程调度

多线程