在调度屏障中同时执行多个块
Posted
技术标签:
【中文标题】在调度屏障中同时执行多个块【英文标题】:Execute multiple blocks concurrently in a dispatch barrier 【发布时间】:2018-05-15 00:55:40 【问题描述】:我想向 Web 服务发送 2 种类型的请求。第一个是更改后端状态的 POST。第二个是从后端检索数据的 GET。我希望能够同时发送多个 POST 请求,因为它们不会导致不同步。但是,我希望与 POST 请求相关联地串行发送 GET 请求(当发送 GET 请求时,在没有收到 GET 响应时不能发送 POST 请求。我使用 GET 请求的调度屏障实现了这一点. 我的问题是,在执行一个 GET 请求时,我希望同时发送更多 GET 请求的选项以及在收到最后一个发送的 GET 请求的响应时打破障碍。 我一直在尝试使用调度障碍来完成这项工作,但到目前为止还没有找到解决方案。也许应该在其他地方寻找解决方案。
【问题讨论】:
【参考方案1】:如果使用GCD屏障方式,有两点需要注意:
在网络请求完成之前,您的分派任务不应完成(否则您正在同步请求的发出,而不是整个请求到响应的过程)。
你需要有三种类型的任务可以被分派:GETs(没有障碍),POSTs(没有障碍),和一些“切换”任务(有障碍)你每当您从 GET 切换到 POST 时使用。这个“切换”任务不需要做任何事情,但它就在那里,所以你有一个障碍。
因此,请跟踪最后一个请求是 GET 还是 POST,例如lastRequestType
,如果新任务不属于同一类型,那么在调度新的网络请求任务之前先调度你的“切换”屏障任务。
显然,“检查lastRequestType
,发出'switch'屏障并在必要时更新最后一个请求类型,并发出新请求”的整个过程需要同步以使其线程安全。
还有其他方法。例如,您可以使用操作队列(可能一个用于最近的 GET,一个用于最近的 POST),并使用依赖项来确保 POST 等待先前的 GET,反之亦然。同样,您将需要这个“最后一个请求类型是 GET 或 POST”变量,以了解您是否需要添加依赖项(同样,这一切都需要正确同步)。这种方法可以让您:
让你将异步网络请求包装在NSOperation
子类中,从而避免上面第1点的丑陋;和
允许您控制并发程度,这是我在处理大量网络请求时总是喜欢做的事情(尤其是当它们可能很慢时)。
如果你原谅我这么说,虽然上述两种方法都有效,但整个想法感觉设计过度了。我建议您非常努力地挑战 GET 和 POST 不能同时发生的前提。这个决定感觉有点武断,就像一些工程师在吐槽,寻找一个简单的解决方案,并建议 GET 和 POST 不应该同时发生。我会花一点时间弄清楚替代方案的外观,然后再实施上述方法。我们中的许多人使用并发 GET 和 POST 编写应用程序,但没有这种复杂性。
例如,如果我有一系列想要执行的 POST 请求,并且想要在完成后发出最终的 GET 请求,我可能会推荐 DispatchGroup
,其中使用组发送各个帖子,然后可以在您想要进行最终 GET 时收到通知:
let group = DispatchGroup()
networkQueue.async(group: group)
// POST 1
networkQueue.async(group: group)
// POST 2
group.notify(queue: networkQueue)
// final GET
或者,如果您使用简单的异步方法(没有信号量,从而避免不必要地阻塞线程),您仍然可以使用调度组:
let group = DispatchGroup()
group.enter()
performFirstPost
// POST 1 finished
group.leave()
group.enter()
performSecondPost
// POST 2 finished
group.leave()
group.notify(queue: networkQueue)
// final GET
【讨论】:
感谢您的详尽回答,以下是我的解释: 1. 为什么我需要这种同步?情况大致如下: GET 请求从后端检索项目列表。 POST 请求标记该列表中的一项(例如将其标记为收藏项)。如果您发送 POST 请求并且在收到响应之前发送 GET 请求,则无法保证后端接收这些请求的顺序以及应用程序接收响应的顺序. 最坏的情况是:1.POST请求,2.GET请求,3.后端收到GET请求,4.后端收到POST请求,5.应用收到GET响应,6.在应用程序中收到 POST 响应。在这种情况下,将会发生的情况是:用户将尝试将一个项目标记为收藏,然后请求刷新,获得该项目被标记为收藏的反馈,然后获得一个刷新列表,其中该项目未标记为收藏,而同一项目实际上在后端被标记为收藏。 2.我使用信号量使请求-响应过程成为一个阻塞过程。 3. 带有开关屏障任务的想法是一个非常有趣的想法。不幸的是,它不处理另一种特殊情况。每个 GET 请求-响应可能需要不同的时间来执行。我想要的是等待最新的,而忽略在最新的之前完成的。通过使用任何类型的屏障,我仍然必须等待旧的 GET 请求(可能比新的请求更长)完成,然后才能再次发送其他 POST 请求。 重新设置信号量以使其同步运行,这很棒。令人惊讶的是,我经常在 S.O. 上看到问题。他们不这样做的地方,他们想知道为什么它不等待响应。关于更大的问题,我的问题是为什么不在 POST 请求的完成处理程序中执行 GET 请求。那么你就不必担心了。或者,更好的是,如果您可以控制 Web 服务 API,让 POST 响应(如果您愿意,可以选择)从后续 GET 请求中返回您想要的内容,然后您消除整个请求并提高性能并解决问题。 在 POST 请求的完成处理程序中发送 GET 请求是不可行的,因为“我希望能够同时发送多个 POST 请求”。因此,应该在所有其他 POST 请求响应完成后发送 GET 请求。【参考方案2】:经过我非常感谢@Rob的大量思考和大量帮助,结果是最好不要使用GCD而使用OperationQueue。 This is a gist with a playground of the implementation along with the tests. 这是它的代码:
import PlaygroundSupport
import Foundation
final class OperationsManager
private let serialQueue = DispatchQueue(label: "serialQueue")
private let nonexclusiveOperationsQueue = OperationQueue()
private let exclusiveOperationQueue = OperationQueue()
func add(operation: Operation, exclusive: Bool)
serialQueue.async
if exclusive
self.exclusiveOperationQueue.cancelAllOperations()
print("Ignore the finish of the previous exclusive operation")
self.nonexclusiveOperationsQueue.operations.forEach
nonexclusiveOperation in
operation.addDependency(nonexclusiveOperation)
self.exclusiveOperationQueue.addOperation(operation)
else
self.exclusiveOperationQueue.operations.forEach
exclusiveOperation in
operation.addDependency(exclusiveOperation)
self.nonexclusiveOperationsQueue.addOperation(operation)
final class BlockedAsynchronousOperation: Operation
private var semaphore: DispatchSemaphore?
private let block: (@escaping () -> ()) -> ()
init(block: @escaping (@escaping () -> ()) -> ())
self.block = block
override func cancel()
super.cancel()
semaphore?.signal()
override func main()
super.main()
semaphore = DispatchSemaphore(value: 0)
block
[weak self] in
self?.semaphore?.signal()
semaphore!.wait()
///////////////////////////////////////////////////////////////////
func longRunningOperation(
seconds: Int, completionHandler: @escaping () -> ())
DispatchQueue.global().asyncAfter(
deadline: .now() + .seconds(seconds),
execute: completionHandler)
func blockedAsynchronousOperation(
withID id: String, seconds: Int) -> BlockedAsynchronousOperation
return BlockedAsynchronousOperation
unblockHandler in
print("Operation with ID: \(id) started")
longRunningOperation(seconds: seconds)
unblockHandler()
print("Operation with ID: \(id) finished")
func addOperation(
withID id: Int,
exclusive: Bool,
atSeconds startSeconds: Int,
duration: Int,
inOperationsManager operationsManager: OperationsManager)
let block =
operationsManager.add(operation:
blockedAsynchronousOperation(
withID: (exclusive ? "GET " : "POST ") +
String(id), seconds: duration), exclusive: exclusive)
if startSeconds > 0
DispatchQueue.global().asyncAfter(
deadline: .now() + .seconds(startSeconds), execute: block)
else
block()
///////////////////////////////////////////////////////////////////
print("global start\n")
let operationsManager = OperationsManager()
addOperation(
withID: 1,
exclusive: false,
atSeconds: 0,
duration: 7,
inOperationsManager: operationsManager)
addOperation(
withID: 2,
exclusive: false,
atSeconds: 0,
duration: 5,
inOperationsManager: operationsManager)
addOperation(
withID: 1,
exclusive: true,
atSeconds: 0,
duration: 10,
inOperationsManager: operationsManager)
addOperation(
withID: 2,
exclusive: true,
atSeconds: 3,
duration: 10,
inOperationsManager: operationsManager)
addOperation(
withID: 3,
exclusive: true,
atSeconds: 10,
duration: 10,
inOperationsManager: operationsManager)
addOperation(
withID: 3,
exclusive: false,
atSeconds: 15,
duration: 5,
inOperationsManager: operationsManager)
addOperation(
withID: 4,
exclusive: false,
atSeconds: 16,
duration: 5,
inOperationsManager: operationsManager)
addOperation(
withID: 4,
exclusive: true,
atSeconds: 28,
duration: 10,
inOperationsManager: operationsManager)
addOperation(
withID: 5,
exclusive: true,
atSeconds: 31,
duration: 20,
inOperationsManager: operationsManager)
addOperation(
withID: 6,
exclusive: true,
atSeconds: 34,
duration: 2,
inOperationsManager: operationsManager)
print("\nglobal end\n")
PlaygroundPage.current.needsIndefiniteExecution = true
【讨论】:
一些小观察: 1. 在add
中,您可以使用serialQueue.async
而不是sync
。 2.你的BlockedAsynchronousOperation
显然不是异步操作。这是一个同步操作。 (一个操作是否为asynchronous
,取决于它在操作完成之前是否从main
返回。)这里不重要,但它只是暗示了对异步操作的误解。如果您使用Operation
路由,我还想确保您从网络代码中删除信号量。但是有了这些细微的观察,这应该可以满足您的需求。
我的考虑: 1.访问是同步的,目的是锁定添加独占操作的过程,否则独占操作队列可能一次有多个操作。 2. 在我的具体实现中,连接到网络的部分被抽象出来(它只是对资源的请求,在一段时间后返回一个值)——这就是我不想包含网络代码的原因。我的想法是让 BlockedAsynchronousOperation 成为调度本身异步的进程并使其轻松同步的通用方法。
1.如果它是一个串行队列,并且如果您对同步对象所做的一切都在该队列中,则没有理由等待它。例如。在读写器模式中,读取是同步完成的(因为您必须等待值),但写入可以通过屏障异步完成。 2. 好的,但是创建一个名为BlockedAsynchronousOperation
这不是异步操作。 (你明白我的意思吗,“异步”操作具有非常具体的含义,这与它相对于主线程是异步的事实无关,而是main
?)
... BlockedAsynchronousOperation
演示的一个更好的例子可能是一个操作,它包装了一个像 GCD 计时器这样的东西(你安排计时器,main
/start
然后返回,然后你在计时器触发或操作被取消之前,不要将操作状态更改为finished
)。见Asynchronous vs Synchronous Operations。顺便说一句,请原谅相当次要的意见:但你确实问过我的想法。 ;)
@Rob,非常感谢您的想法,如果不是您的想法,解决方案甚至都不存在:)以上是关于在调度屏障中同时执行多个块的主要内容,如果未能解决你的问题,请参考以下文章