使用通道作为队列的死锁
Posted
技术标签:
【中文标题】使用通道作为队列的死锁【英文标题】:Deadlock using channels as queues 【发布时间】:2021-08-20 23:37:15 【问题描述】:我正在学习 Go,我正在尝试实现一个作业队列。
我想做的是:
让主要的 goroutine 通过一个通道为多个解析器提供行(将一行解析为 s 结构),并让每个解析器将结构发送到其他工作器(goroutine)将处理的结构通道(发送到数据库等)。
代码如下:
lineParseQ := make(chan string, 5)
jobProcessQ := make(chan myStruct, 5)
doneQ := make(chan myStruct, 5)
fileName := "myfile.csv"
file, err := os.Open(fileName)
if err != nil
log.Fatal(err)
defer file.Close()
reader := bufio.NewReader(file)
// Start line parsing workers and send to jobProcessQ
for i := 1; i <= 2; i++
go lineToStructWorker(i, lineParseQ, jobProcessQ)
// Process myStruct from jobProcessQ
for i := 1; i <= 5; i++
go WorkerProcessStruct(i, jobProcessQ, doneQ)
lineCount := 0
countSend := 0
for
line, err := reader.ReadString('\n')
if err != nil && err != io.EOF
log.Fatal(err)
if err == io.EOF
break
lineCount++
if lineCount > 1
countSend++
lineParseQ <- line[:len(line)-1] // Avoid last char '\n'
for i := 0; i < countSend; i++
fmt.Printf("Received %+v.\n", <-doneQ)
close(doneQ)
close(jobProcessQ)
close(lineParseQ)
这是一个简化的游乐场:https://play.golang.org/p/yz84g6CJraa
工人看起来像这样:
func lineToStructWorker(workerID int, lineQ <-chan string, strQ chan<- myStruct )
for j := range lineQ
strQ <- lineToStruct(j) // just parses the csv to a struct...
func WorkerProcessStruct(workerID int, strQ <-chan myStruct, done chan<- myStruct)
for a := range strQ
time.Sleep(time.Millisecond * 500) // fake long operation...
done <- a
我知道问题与“完成”频道有关,因为如果我不使用它,就没有错误,但我不知道如何解决它。
【问题讨论】:
【参考方案1】:在您将所有行发送到lineParseQ
之前,您不会开始从doneQ
读取,这比缓冲区空间还多.因此,一旦doneQ
缓冲区已满,发送块就会开始填充lineParseQ
缓冲区,一旦已满,它就会死锁。将发送到lineParseQ
的循环、从doneQ
读取的循环或两者都移动到单独的goroutine,例如:
go func()
for _, line := range lines
countSend++
lineParseQ <- line
close(lineParseQ)
()
这最终仍然会死锁,因为你在一个频道上有一个range
,在同一个goroutine中它后面有一个close
;由于range
一直持续到通道关闭,并且在range
完成后关闭,所以您仍然存在死锁。你需要把关门放在合适的地方;也就是说,在发送例程中,或者在WaitGroup
上阻止,如果给定通道有多个发送者,则监视发送例程。
// Start line parsing workers and send to jobProcessQ
wg := new(sync.WaitGroup)
for i := 1; i <= 2; i++
wg.Add(1)
go lineToStructWorker(i, lineParseQ, jobProcessQ, wg)
// Process myStruct from jobProcessQ
for i := 1; i <= 5; i++
go WorkerProcessStruct(i, jobProcessQ, doneQ)
countSend := 0
go func()
for _, line := range lines
countSend++
lineParseQ <- line
close(lineParseQ)
()
go func()
wg.Wait()
close(jobProcessQ)
()
for a := range doneQ
fmt.Printf("Received %v.\n", a)
// ...
func lineToStructWorker(workerID int, lineQ <-chan string, strQ chan<- myStruct, wg *sync.WaitGroup)
for j := range lineQ
strQ <- lineToStruct(j) // just parses the csv to a struct...
wg.Done()
func WorkerProcessStruct(workerID int, strQ <-chan myStruct, done chan<- myStruct)
for a := range strQ
time.Sleep(time.Millisecond * 500) // fake long operation...
done <- a
close(done)
完整的工作示例:https://play.golang.org/p/XsnewSZeb2X
【讨论】:
谢谢!我为 lineParseQ 和 jobProcessQ 添加了一个更长的缓冲区(15),在消除睡眠后,我一直陷入僵局......具体的例子有数十万行,我打算使用 50 个 goroutines 之类的东西 此解决方案在所有结果处理完毕之前退出。 @Luis 增加缓冲区大小通常不能解决任何问题。您只需要有效地管理并发。正如 colm 指出的那样,我仍然有一个错误 - 我没有注意到有多个WorkerProcessStruct
工人,所以它需要与 lineToStructWorker
相同的 WaitGroup
设置。我更新了 Playground 示例以解决该问题,它现在处理所有条目:play.golang.org/p/XsnewSZeb2X【参考方案2】:
与sync.WaitGroup
协调管道,将每个部分分成多个阶段。当您知道管道的一部分已完成(并且没有人正在写入特定通道)时,关闭通道以指示所有“工人”退出,例如
var wg sync.WaitGroup
for i := 1; i <= 5; i++
i := i
wg.Add(1)
go func()
Worker(i)
wg.Done()
()
// wg.Wait() signals the above have completed
缓冲通道可以方便地处理突发工作负载,但有时它们用于避免糟糕设计中的死锁。如果您想避免在 goroutine 中运行管道的某些部分,您可以缓冲一些通道(通常与工作人员的数量相匹配)以避免主 goroutine 中的阻塞。
如果您有可读写的依赖片段并希望避免死锁 - 请确保它们位于单独的 goroutine 中。让管道的所有部分都有自己的 goroutine 甚至可以消除对缓冲通道的需求:
// putting all channel work into separate goroutines
// removes the need for buffered channels
lineParseQ := make(chan string, 0)
jobProcessQ := make(chan myStruct, 0)
doneQ := make(chan myStruct, 0)
当然,这是一个折衷方案——一个 goroutine 需要大约 2K 的资源——而缓冲通道的资源要少得多。与大多数设计一样,这取决于它的使用方式。
也不要被臭名昭著的 Go for-loop gotcha 抓住,所以使用闭包分配来避免这种情况:
for i := 1; i <= 5; i++
i := i // new i (not the i above)
go func()
myfunc(i) // otherwise all goroutines will most likely get '5'
()
最后确保在退出之前等待所有结果处理完毕。 从基于通道的函数返回并认为所有结果都已处理是一个常见的错误。在服务中,这最终将成为现实。但在独立的可执行文件中,处理循环可能仍在处理结果。
go func()
wgW.Wait() // waiting on worker goroutines to finish
close(doneQ) // safe to close results channel now
()
// ensure we don't return until all results have been processed
for a := range doneQ
fmt.Printf("Received %v.\n", a)
通过在主 goroutine 中处理结果,我们确保我们不会在未处理完所有内容的情况下过早返回。
综合起来:
https://play.golang.org/p/MjLpQ5xglP3
【讨论】:
这里没有对go myfunc(i)
的竞争,因为myfunc
的参数在调用产生新的goroutine 之前被评估。 go func() use(i) ()
发生竞争,因为 func() ...
序列之后的空括号意味着 no 参数被评估; use(i)
使用闭包 i
。但是go myfunc(i)
导致i
被评估,并且没有传递闭包变量。
(不过,这很小,实际上并没有引入任何错误。)
啊,确实!在添加了 sync.WaitGroup 逻辑并切换到使用匿名函数之后,我一定是在投影。以上是关于使用通道作为队列的死锁的主要内容,如果未能解决你的问题,请参考以下文章