是否有一些优雅的方式来暂停和恢复任何其他 goroutine?
Posted
技术标签:
【中文标题】是否有一些优雅的方式来暂停和恢复任何其他 goroutine?【英文标题】:Is there some elegant way to pause and resume any other goroutine? 【发布时间】:2013-04-12 15:46:34 【问题描述】:就我而言,我有数千个 goroutine 同时作为 work()
工作。我还有一个sync()
goroutine。当sync
启动时,我需要任何其他 goroutine 在同步作业完成后暂停一段时间。这是我的代码:
var channels []chan int
var channels_mutex sync.Mutex
func work()
channel := make(chan int, 1)
channels_mutex.Lock()
channels = append(channels, channel)
channels_mutex.Unlock()
for
for
sync_stat := <- channel // blocked here
if sync_stat == 0 // if sync complete
break
// Do some jobs
if (some condition)
return
func sync()
channels_mutex.Lock()
// do some sync
for int i := 0; i != len(channels); i++
channels[i] <- 0
channels_mutex.Unlock()
现在的问题是,由于<-
总是在读取时阻塞,所以每次去sync_stat := <- channel
都是阻塞的。我知道如果频道关闭了它不会被阻止,但是由于我必须在work()
退出之前使用这个频道,并且我没有找到任何方法来重新打开已关闭的频道。
我怀疑自己走错路了,因此感谢您的帮助。是否有一些“优雅”的方式来暂停和恢复任何其他 goroutine?
【问题讨论】:
【参考方案1】:如果我理解正确,您需要 N 个工人和一个控制器,可以随意暂停、恢复和停止工人。下面的代码就可以做到这一点。
package main
import (
"fmt"
"runtime"
"sync"
)
// Possible worker states.
const (
Stopped = 0
Paused = 1
Running = 2
)
// Maximum number of workers.
const WorkerCount = 1000
func main()
// Launch workers.
var wg sync.WaitGroup
wg.Add(WorkerCount + 1)
workers := make([]chan int, WorkerCount)
for i := range workers
workers[i] = make(chan int, 1)
go func(i int)
worker(i, workers[i])
wg.Done()
(i)
// Launch controller routine.
go func()
controller(workers)
wg.Done()
()
// Wait for all goroutines to finish.
wg.Wait()
func worker(id int, ws <-chan int)
state := Paused // Begin in the paused state.
for
select
case state = <-ws:
switch state
case Stopped:
fmt.Printf("Worker %d: Stopped\n", id)
return
case Running:
fmt.Printf("Worker %d: Running\n", id)
case Paused:
fmt.Printf("Worker %d: Paused\n", id)
default:
// We use runtime.Gosched() to prevent a deadlock in this case.
// It will not be needed of work is performed here which yields
// to the scheduler.
runtime.Gosched()
if state == Paused
break
// Do actual work here.
// controller handles the current state of all workers. They can be
// instructed to be either running, paused or stopped entirely.
func controller(workers []chan int)
// Start workers
setState(workers, Running)
// Pause workers.
setState(workers, Paused)
// Unpause workers.
setState(workers, Running)
// Shutdown workers.
setState(workers, Stopped)
// setState changes the state of all given workers.
func setState(workers []chan int, state int)
for _, w := range workers
w <- state
【讨论】:
` 使用通道进行通信是 Go 语言的惯用方式。如果需要,您可以使用全局变量。但我建议不要使用sync.Mutex
来锁定它。在处理大量 goroutines 时,它们的扩展性不是很好,每个 goroutines 都获取 R/W 锁。在这种情况下,我将使用 sync/atomic
包以原子方式读取/写入状态。
@jimt 不错的例子;很好地提醒了频道阻塞时如何运行“默认”。我要补充一点,如果工作人员有时需要一段时间才能回来阅读状态,最好让与他们交谈的通道的缓冲区为 1,这样你就可以匆忙写给他们所有人在 1 号停止之前不暂停或停止 2 号,等等。还是我误解了它是如何工作的?
也许我遗漏了一些东西,但暂停的工作人员不会继续运行(由于选择的默认部分)。我认为这会导致很多 unnec。线程上下文切换。
是的,它将继续进入默认情况,但它并不像您预期的那样昂贵。它的作用是将state
设置为适当的值。由您决定是否以及如何限制循环。至于上下文切换,here's 非常详尽地解释了它在 Go 中的工作原理。以上是关于是否有一些优雅的方式来暂停和恢复任何其他 goroutine?的主要内容,如果未能解决你的问题,请参考以下文章