如何创建队列并手动启动它

Posted

技术标签:

【中文标题】如何创建队列并手动启动它【英文标题】:How to create queue and start it manually 【发布时间】:2020-04-20 11:12:43 【问题描述】:

在我的应用程序中,我必须实现刷新令牌逻辑。我希望在刷新令牌过程中发送的所有请求都保存在队列中,一旦我的过程完成,我就会启动队列

例如,我想要这样的东西:

let queue = DispatchQueue(label: "myQueue", attributes: .concurrent)  

queue.async 
    // request One


queue.async 
    // request Two

当刷新令牌过程完成时:

queue.send()

【问题讨论】:

您是否正在寻找一个调度组? 你可以通过这样的函数来摆脱它:public func getAccessToken(completion: @escaping (token?) -> Void) ... 这比排队和拿着东西要简单得多。 【参考方案1】:

在我的应用程序中,我必须实现刷新令牌逻辑。我希望在刷新令牌过程中,所有发送的请求都保存在一个队列中,一旦我的过程完成,我就会启动队列

如果要创建队列并延迟其任务的启动,只需将其挂起即可,例如:

let queue = DispatchQueue(label: "myQueue", attributes: .concurrent)  
queue.suspend()

queue.async 
    // request One


queue.async 
    // request Two


fetchToken  result in
    switch result 
    case .success(let token):
        // do something with token
        print(token)
        queue.resume()

    case .failure(let error):
        // handle the error
        print(error)
    

这就是 suspendresume 调度队列的方式。请注意,suspend 仅阻止项目在队列中启动,但对已经运行的任务没有影响。这就是为什么我在向它分派项目之前暂停队列的原因。

但上述问题引出了你想在那个failure 场景中做什么的问题。你只是有一个队列坐在那里有一堆计划任务。从理论上讲,您可以保留对那些分派块的引用(通过使用DispatchWorkItem 模式而不仅仅是简单的闭包,您可以cancel 这些项目),但我可能会使用操作队列,例如

let queue = OperationQueue()
queue.isSuspended = true

queue.addOperation 
    // request One


queue.addOperation 
    // request Two


fetchToken  result in
    switch result 
    case .success(let token):
        // do something with token
        print(token)
        queue.isSuspended = false

    case .failure(let error):
        // handle the error
        print(error)
        queue.cancelAllOperations()
    

这和上面的一样,但是我们可以用cancelAllOperations取消所有那些排队的操作。


顺便说一句,您可以创建自定义Operation 子类来处理本身是异步的任务。而且我假设您的“请求一”和“请求二”是异步网络请求。请参阅 looking for a specific example where Operation is preferred over GCD or vice-versa 讨论何时可能更喜欢 OperationQueue 而不是 DispatchQueue

【讨论】:

【参考方案2】:

你可以像这样构建一个类

class ConcurrentQueue 

    typealias Task = () -> ()
    private var tasks: [Task] = []
    private let serialQueue = DispatchQueue(label: "Serial queue")

    func enqueueTask(_ task: @escaping Task) 
        serialQueue.sync 
            tasks.append(task)
        
    

    func runAndRemoveAllTasks() 
        serialQueue.sync 
            tasks.forEach  task in
                task()
            
            tasks.removeAll()
        
    

该类允许您将多个Task(s) 排队。

每个任务都是像() -> Void 这样的闭包。

当你将一个任务加入队列时它不会被执行,它只是被附加到内部数组中。

let concurrentQueue = ConcurrentQueue()

concurrentQueue.enqueueTask 
    print("1")


concurrentQueue.enqueueTask 
    print("2")


concurrentQueue.enqueueTask 
    print("3")

这段代码的结果只是保存了 3 个任务,它们没有被执行。

然后当你打电话时

concurrentQueue.runAndRemoveAllTasks()

所有的任务都被执行并在你得到的控制台中

1
2
3

线程安全

enqueueTask(_)runAndRemoveAllTasks() 方法是线程安全的。

事实上,它们只在内部与内部tasks 数组(不是隐式线程安全的)交互

serialQueue.sync 
   ...

这保证了对tasks 数组的一致访问。

【讨论】:

你的回答很好,但是我找到了使用BlockOperation的解决方案,它是专门针对这种情况制作的。 为什么要同步而不是异步?因为这是一个串行队列操作顺序,如果从中调用这些方法也不会阻塞主线程(GCD队列是线程安全的) @giorashc 好点。我使用了sync,因为我希望concurrentQueue.enqueueTask 的调用者确保在执行下一行之前附加任务。有意义吗? @giorashc - 你是对的:如果这个serialQueue 是为了同步与这个本地任务队列的交互,那么async 是正确的模式。更深层次的问题是runAndRemoveAllTasks 是同步的,如果这些任务可能很慢,那么您将长时间阻塞调用队列。另请注意,以上假设两个网络请求不会相对于彼此并发运行,并且 OP 明确指定队列是并发的。【参考方案3】:

解决方法是使用BlockOperation:

let pendingRequests = BlockOperation()

pendingRequests.addExecutionBlock 
    //Adding first request


pendingRequests.addExecutionBlock 
    //Adding second request

当刷新令牌完成时:

pendingRequests.start()

【讨论】:

我会非常警惕addExecutionBlock。例如,如果您在队列上使用maxConcurrentOperationCount,那是在操作级别,而不是执行块级别(即这两个执行块将同时运行而不管队列的maxConcurrentOperationCount)。或者,如果您只是 start 没有 OperationQueue 的操作,如图所示,请注意 start 是一个阻塞调用。执行块有其用途,但这可能不是最好的用例。

以上是关于如何创建队列并手动启动它的主要内容,如果未能解决你的问题,请参考以下文章

RabbitMQ确认机制问题处理

在activemq中自动创建队列[关闭]

如何使用带有Jhipster的RabbitMQ创建新的队列?

使用队列时如何在张量流中训练期间测试网络

停止 Redis 后如何继续执行 celery 队列,然后再启动它?

如何将 Laravel 队列工作限制为一个执行结束出口?