Grand Central Dispatch 用于复杂流程?

Posted

技术标签:

【中文标题】Grand Central Dispatch 用于复杂流程?【英文标题】:Grand Central Dispatch for complex flow? 【发布时间】:2018-10-06 16:53:30 【问题描述】:

我有 a、b、c、d、e 耗时的任务函数和完成处理程序。

它们之间有约束:

    bc 都等待 a 完成 最后一个任务 e 等待 b & c & d 完成

如果没有任务d,我可以像这样快速编写代码(尚未测试)

let group = DispatchGroup()

group.enter()
a()  group.leave() 
group.wait()

group.enter()
b()  group.leave() 

group.enter()
c()  group.leave() 

group.notify(queue: .main) 
    e()

如何在不等待a完成的情况下添加任务d


编辑于 4/30 10:00 (+8)

代码不同

最简单的方法是使下载函数同步,并在其文档中添加一条警告,即永远不要从主线程调用它。

所以我根据它做了一个版本。这种方式无法处理来自并发调用的返回值。但它看起来真的很像 async/await。所以我现在很满意。谢谢各位。

async/await like 部分是

    myQueue.async 
        downloadSync("A")
        downloadSync("B", isConcurrent: true)
        downloadSync("C", isConcurrent: true)
        downloadSync("D", 4, isConcurrent: true)
        waitConcurrentJobs()
        downloadSync("E")
    

完整的代码如下。

    let myGroup = DispatchGroup()
    let myQueue = DispatchQueue(label: "for Sync/Blocking version of async functions")

    func waitConcurrentJobs() 
        myGroup.wait()
    

    // original function (async version, no source code)
    func download(_ something: String, _ seconds: UInt32 = 1, completionHandler: @escaping ()->Void = ) 
        print("Downloading \(something)")
        DispatchQueue.global().async 
            sleep(seconds)
            print("\(something) is downloaded")
            completionHandler()
        
    

    // wrapped function (synced version)
    // Warning:
    // It blocks current thead !!!
    // Do not call it on main thread
    func downloadSync(
        _ something: String,
        _ seconds: UInt32 = 1,
        isConcurrent: Bool = false
        )
        myGroup.enter()
        download(something, seconds)  myGroup.leave() 
        if !isConcurrent 
            myGroup.wait()
        
    

    // Now it really looks like ES8 async/await
    myQueue.async 
        downloadSync("A")
        downloadSync("B", isConcurrent: true)
        downloadSync("C", isConcurrent: true)
        downloadSync("D", 4, isConcurrent: true)
        waitConcurrentJobs()
        downloadSync("E")
    

结果

【问题讨论】:

对不起,我只想问一下,为什么不使用操作和依赖项。操作确实存在于 GCD 之上,并且依赖项可以完全满足您的要求。 因为我从来没有听说过他们。我正在开发我的第一个 Swift 应用程序(从 react native 切换)谢谢~我会用谷歌搜索它们。 :-) Swift 中的操作是 Apple 的 Foundation API 的一部分。它曾经是 NSOperation,但前段时间已重命名为 Operation。 developer.apple.com/documentation/foundation/operation 如果您必须将上一个任务的结果传递给下一个任务,使用Operation 将变得非常复杂和麻烦(== 容易出错),因为您需要同步数据传输(例如再次使用调度队列)。 请关注developer.apple.com/videos/play/wwdc2015/226 【参考方案1】:

编辑:最简单的方法是使download 函数同步,并在其文档中添加一条警告,即永远不要从主线程调用它。异步函数的厄运金字塔是coroutines were proposed 的原因,这正是 Swift 的创造者 Chris Lattner 所为。截至 2018 年 4 月,它还不是一个等待审核的正式提案,因此您很可能不会在 Swift 5 中看到它。

同步下载功能:

// Never call this from main thread
func download(_ something: String, _ seconds: UInt32 = 1, completionHandler: @escaping ()->Void = ) 
    let group = DispatchGroup()

    print("Downloading \(something)")
    group.enter()
    DispatchQueue.global().async 
        sleep(seconds)
        print("\(something) is downloaded")
        completionHandler()
        group.leave()
    
    group.wait()

还有NSOperation / NSOperationQueue 设置:

let opA = BlockOperation() 
    self.download("A")

let opB = BlockOperation() 
    self.download("B")

let opC = BlockOperation() 
    self.download("C")

let opD = BlockOperation() 
    self.download("D", 4)

let opE = BlockOperation() 
    self.download("E")


opB.addDependency(opA)
opC.addDependency(opA)

opE.addDependency(opB)
opE.addDependency(opC)
opE.addDependency(opD)

let operationQueue = OperationQueue()
operationQueue.addOperations([opA, opB, opC, opD, opE], waitUntilFinished: false)

【讨论】:

a, b, c, d 这里其实是一些异步函数。这种方式是否支持网络请求等异步功能? 我发现它不适用于异步函数。查看编辑后的帖子。 谢谢~我根据你的建议做了一个版本。它看起来很棒:-) 我最终决定否决这个解决方案,因为它无法通过为每个操作创建一个线程来浪费宝贵的资源来扩展,并且因为有一些解决方案实际上可以避免这种情况。当我们有协程时,我再次投票 ;) 我假设您有一些基准来支持该声明?【参考方案2】:

您最初的努力似乎与我非常接近。你可以做一个小的调整:使BCD成为完成触发E的组。

A 可能是另一组,但由于这是一项任务,我不明白这一点。完成后触发BC

请注意,与您的问题和其他答案中的某些示例代码不同,在下面的代码中,DA 可以立即启动并并行运行。

let q = DispatchQueue(label: "my-queue", attributes: .concurrent)
let g = DispatchGroup()
func taskA()   print("A")  
func taskB()   print("B"); g.leave()  
func taskC()   print("C"); g.leave()  
func taskD()   print("D"); g.leave()  
func taskE()   print("E")  
g.enter()
g.enter()
g.enter()
q.async 
    taskA()
    q.async(execute: taskB)
    q.async(execute: taskC)

q.async(execute: taskD)
g.notify(queue: q, execute: taskE)

【讨论】:

【参考方案3】:

您可以使用此框架来实现异步/等待模式 - https://github.com/belozierov/SwiftCoroutine

当你调用 await 时,它不会阻塞线程,只会挂起协程,所以你也可以在主线程中使用它。

func awaitAPICall(_ url: URL) throws -> String? 
    let future = URLSession.shared.dataTaskFuture(for: url)
    let data = try future.await().data
    return String(data: data, encoding: .utf8)


func load(url: URL) 
    DispatchQueue.main.startCoroutine 
        let result1 = try self.awaitAPICall(url)
        let result2 = try self.awaitAPICall2(result1)
        let result3 = try self.awaitAPICall3(result2)
        print(result3)
    

【讨论】:

【参考方案4】:

我想展示一个使用 Scala 的替代解决方案,例如期货:

let result = funcA().flatMap  resultA in
    return [funcB(param: resultA.0),
            funcC(param: resultA.1),
            funcD()]
        .fold(initial: [String]())  (combined, element) in
            return combined + [element]
    
.flatMap  result in
    return funcE(param: result)
.map  result in
    print(result)

基本上就是这样。它处理错误(隐式)并且是线程安全的。没有操作子类;)

注意,funcD 只有在 A 成功完成时才会被调用。由于funcA() 可能会失败,因此调用它是没有意义的。但如果需要,代码也可以很容易地进行调整以实现这一点。

请将此与我使用调度组和调度队列的其他解决方案中的函数foo() 进行比较。

下面是异步函数的定义示例,每个函数将其结果传递给下一个:

func funcA() -> Future<(String, String)> 
    print("Start A")
    let promise = Promise<(String, String)>()
    DispatchQueue.global().asyncAfter(deadline: .now() + 3) 
        print("Complete A")
        promise.complete(("A1", "A2"))
    
    return promise.future


func funcB(param: String) -> Future<String> 
    print("Start B")
    let promise = Promise<String>()
    DispatchQueue.global().asyncAfter(deadline: .now() + 1) 
        print("Complete B")
        promise.complete("\(param) -> B")
    
    return promise.future


func funcC(param: String) -> Future<String> 
    print("Start C")
    let promise = Promise<String>()
    DispatchQueue.global().asyncAfter(deadline: .now() + 2) 
        print("Complete C")
        promise.complete("\(param) -> C")
    
    return promise.future


func funcD() -> Future<String> 
    print("Start D")
    let promise = Promise<String>()
    DispatchQueue.global().asyncAfter(deadline: .now() + 4) 
        print("Complete D")
        promise.complete("D")
    
    return promise.future


func funcE(param: [String]) -> Future<String> 
    print("Start E")
    let promise = Promise<String>()
    DispatchQueue.global().asyncAfter(deadline: .now() + 4) 
        print("Complete E")
        promise.complete("\(param) -> E")
    
    return promise.future

将其打印到控制台:

Start A Complete A Start B Start C Start D Complete B Complete C Complete D Start E Complete E ["A1 -> B", "A2 -> C", "D"] -> E

提示:有几个 Future 和 Promise 库可用。

【讨论】:

对不起,我仍然难以理解 flatMap 或... 为什么 funcA 与其他人不同?这里的 Promise 和 Future 的库是什么? 当完成函数返回另一个未来时,你使用flatMap(基本上)像这样的期货。你使用map当它直接返回一个结果(即一个值或一个错误)。 map 返回完成函数结果的未来,其中 flatmap 返回一个由完成函数返回的未来最终结果解析的未来。 对于像期货这样的 scala,我推荐 BrightFutures(在 github 上)。但实际上还有一些看起来不错的。 Scala like futures 的优势在于,它们非常强大并且产生简洁的代码。不过,可能需要一段时间才能理解其背后的功能范式(map、flatMap 和其他组合器)。

以上是关于Grand Central Dispatch 用于复杂流程?的主要内容,如果未能解决你的问题,请参考以下文章

使用 Grand Central Dispatch 进行文件监控

暂停和恢复 Grand Central Dispatch 线程

swift Grand Central Dispatch(GCD)发送信号量示例

NSOperation 与 Grand Central Dispatch

使用 Grand Central Dispatch (GCD) 创建恰好 N 个线程

核心数据和线程/ Grand Central Dispatch