是否有一些优雅的方式来暂停和恢复任何其他 goroutine?

Posted

技术标签:

【中文标题】是否有一些优雅的方式来暂停和恢复任何其他 goroutine?【英文标题】:Is there some elegant way to pause and resume any other goroutine? 【发布时间】:2013-04-12 15:46:34 【问题描述】:

就我而言,我有数千个 goroutine 同时作为 work() 工作。我还有一个sync() goroutine。当sync 启动时,我需要任何其他 goroutine 在同步作业完成后暂停一段时间。这是我的代码:

var channels []chan int
var channels_mutex sync.Mutex

func work() 
  channel := make(chan int, 1)
  channels_mutex.Lock()  
  channels = append(channels, channel)
  channels_mutex.Unlock()
  for 
    for 
      sync_stat := <- channel // blocked here
      if sync_stat == 0  // if sync complete
        break  
      
    
    // Do some jobs
    if (some condition) 
      return
    
  


func sync() 
  channels_mutex.Lock()
  // do some sync

  for int i := 0; i != len(channels); i++ 
    channels[i] <- 0
  
  channels_mutex.Unlock()

现在的问题是,由于&lt;- 总是在读取时阻塞,所以每次去sync_stat := &lt;- channel 都是阻塞的。我知道如果频道关闭了它不会被阻止,但是由于我必须在work()退出之前使用这个频道,并且我没有找到任何方法来重新打开已关闭的频道。

我怀疑自己走错路了,因此感谢您的帮助。是否有一些“优雅”的方式来暂停和恢复任何其他 goroutine?

【问题讨论】:

【参考方案1】:

如果我理解正确,您需要 N 个工人和一个控制器,可以随意暂停、恢复和停止工人。下面的代码就可以做到这一点。

package main

import (
    "fmt"
    "runtime"
    "sync"
)

// Possible worker states.
const (
    Stopped = 0
    Paused  = 1
    Running = 2
)

// Maximum number of workers.
const WorkerCount = 1000

func main() 
    // Launch workers.
    var wg sync.WaitGroup
    wg.Add(WorkerCount + 1)

    workers := make([]chan int, WorkerCount)
    for i := range workers 
        workers[i] = make(chan int, 1)

        go func(i int) 
            worker(i, workers[i])
            wg.Done()
        (i)
    

    // Launch controller routine.
    go func() 
        controller(workers)
        wg.Done()
    ()

    // Wait for all goroutines to finish.
    wg.Wait()


func worker(id int, ws <-chan int) 
    state := Paused // Begin in the paused state.

    for 
        select 
        case state = <-ws:
            switch state 
            case Stopped:
                fmt.Printf("Worker %d: Stopped\n", id)
                return
            case Running:
                fmt.Printf("Worker %d: Running\n", id)
            case Paused:
                fmt.Printf("Worker %d: Paused\n", id)
            

        default:
            // We use runtime.Gosched() to prevent a deadlock in this case.
            // It will not be needed of work is performed here which yields
            // to the scheduler.
            runtime.Gosched()

            if state == Paused 
                break
            

            // Do actual work here.
        
    


// controller handles the current state of all workers. They can be
// instructed to be either running, paused or stopped entirely.
func controller(workers []chan int) 
    // Start workers
    setState(workers, Running)

    // Pause workers.
    setState(workers, Paused)

    // Unpause workers.
    setState(workers, Running)

    // Shutdown workers.
    setState(workers, Stopped)


// setState changes the state of all given workers.
func setState(workers []chan int, state int) 
    for _, w := range workers 
        w <- state
    

【讨论】:

` 使用通道进行通信是 Go 语言的惯用方式。如果需要,您可以使用全局变量。但我建议不要使用sync.Mutex 来锁定它。在处理大量 goroutines 时,它们的扩展性不是很好,每个 goroutines 都获取 R/W 锁。在这种情况下,我将使用 sync/atomic 包以原子方式读取/写入状态。 @jimt 不错的例子;很好地提醒了频道阻塞时如何运行“默认”。我要补充一点,如果工作人员有时需要一段时间才能回来阅读状态,最好让与他们交谈的通道的缓冲区为 1,这样你就可以匆忙写给他们所有人在 1 号停止之前不暂停或停止 2 号,等等。还是我误解了它是如何工作的? 也许我遗漏了一些东西,但暂停的工作人员不会继续运行(由于选择的默认部分)。我认为这会导致很多 unnec。线程上下文切换。 是的,它将继续进入默认情况,但它并不像您预期​​的那样昂贵。它的作用是将state 设置为适当的值。由您决定是否以及如何限制循环。至于上下文切换,here's 非常详尽地解释了它在 Go 中的工作原理。

以上是关于是否有一些优雅的方式来暂停和恢复任何其他 goroutine?的主要内容,如果未能解决你的问题,请参考以下文章

暂停和恢复自包含的scrapy脚本

从 cpp 中的父线程暂停和恢复线程

NSURLSessionDownloadTask 从其他 View ios 恢复和暂停

延迟暂停、早期恢复和唤醒锁。请解释

暂停和恢复背景音乐,而其他播放之间

我需要 TThreads 吗?如果是这样,我可以暂停、恢复和停止它们吗?