使用通道作为队列的死锁

Posted

技术标签:

【中文标题】使用通道作为队列的死锁【英文标题】:Deadlock using channels as queues 【发布时间】:2021-08-20 23:37:15 【问题描述】:

我正在学习 Go,我正在尝试实现一个作业队列。

我想做的是:

让主要的 goroutine 通过一个通道为多个解析器提供行(将一行解析为 s 结构),并让每个解析器将结构发送到其他工作器(goroutine)将处理的结构通道(发送到数据库等)。

代码如下:

lineParseQ := make(chan string, 5)
jobProcessQ := make(chan myStruct, 5)
doneQ := make(chan myStruct, 5)

fileName := "myfile.csv"

file, err := os.Open(fileName)
if err != nil 
    log.Fatal(err)


defer file.Close()

reader := bufio.NewReader(file)

// Start line parsing workers and send to jobProcessQ
for i := 1; i <= 2; i++ 
    go lineToStructWorker(i, lineParseQ, jobProcessQ)


// Process myStruct from jobProcessQ
for i := 1; i <= 5; i++ 
    go WorkerProcessStruct(i, jobProcessQ, doneQ)


lineCount := 0 
countSend := 0

for 
    line, err := reader.ReadString('\n')
    
    if err != nil && err != io.EOF 
        log.Fatal(err)
    
    
    if err == io.EOF 
        break
    
    
    lineCount++
    
    if lineCount > 1 
        countSend++
        lineParseQ <- line[:len(line)-1]    // Avoid last char '\n'
    



for i := 0; i < countSend; i++ 
    fmt.Printf("Received %+v.\n", <-doneQ)


close(doneQ)
close(jobProcessQ)
close(lineParseQ)

这是一个简化的游乐场:https://play.golang.org/p/yz84g6CJraa

工人看起来像这样:

func lineToStructWorker(workerID int, lineQ <-chan string, strQ chan<- myStruct ) 

    for j := range lineQ 
        strQ <- lineToStruct(j) // just parses the csv to a struct...
    



func WorkerProcessStruct(workerID int, strQ <-chan myStruct, done chan<- myStruct) 

    for a := range strQ 
        time.Sleep(time.Millisecond * 500) // fake long operation...
        done <- a
    

我知道问题与“完成”频道有关,因为如果我不使用它,就没有错误,但我不知道如何解决它。

【问题讨论】:

【参考方案1】:

在您将所有行发送到lineParseQ 之前,您不会开始doneQ 读取,这比缓冲区空间还多.因此,一旦doneQ 缓冲区已满,发送块就会开始填充lineParseQ 缓冲区,一旦已满,它就会死锁。将发送到lineParseQ 的循环、从doneQ 读取的循环或两者都移动到单独的goroutine,例如:

go func() 
    for _, line := range lines 
        countSend++
        lineParseQ <- line
    
    close(lineParseQ)
()

这最终仍然会死锁,因为你在一个频道上有一个range,在同一个goroutine中它后面有一个close;由于range 一直持续到通道关闭,并且在range 完成后关闭,所以您仍然存在死锁。你需要把关门放在合适的地方;也就是说,在发送例程中,或者在WaitGroup 上阻止,如果给定通道有多个发送者,则监视发送例程。

// Start line parsing workers and send to jobProcessQ
wg := new(sync.WaitGroup)
for i := 1; i <= 2; i++ 
    wg.Add(1)
    go lineToStructWorker(i, lineParseQ, jobProcessQ, wg)


// Process myStruct from jobProcessQ
for i := 1; i <= 5; i++ 
    go WorkerProcessStruct(i, jobProcessQ, doneQ)


countSend := 0

go func() 
    for _, line := range lines 
        countSend++
        lineParseQ <- line
    
    close(lineParseQ)
()

go func() 
    wg.Wait()
    close(jobProcessQ)
()

for a := range doneQ 
    fmt.Printf("Received %v.\n", a)


// ...

func lineToStructWorker(workerID int, lineQ <-chan string, strQ chan<- myStruct, wg *sync.WaitGroup) 
    for j := range lineQ 
        strQ <- lineToStruct(j) // just parses the csv to a struct...
    
    wg.Done()


func WorkerProcessStruct(workerID int, strQ <-chan myStruct, done chan<- myStruct) 
    for a := range strQ 
        time.Sleep(time.Millisecond * 500) // fake long operation...
        done <- a
    
    close(done)

完整的工作示例:https://play.golang.org/p/XsnewSZeb2X

【讨论】:

谢谢!我为 lineParseQ 和 jobProcessQ 添加了一个更长的缓冲区(15),在消除睡眠后,我一直陷入僵局......具体的例子有数十万行,我打算使用 50 个 goroutines 之类的东西 此解决方案在所有结果处理完毕之前退出。 @Luis 增加缓冲区大小通常不能解决任何问题。您只需要有效地管理并发。正如 colm 指出的那样,我仍然有一个错误 - 我没有注意到有多个 WorkerProcessStruct 工人,所以它需要与 lineToStructWorker 相同的 WaitGroup 设置。我更新了 Playground 示例以解决该问题,它现在处理所有条目:play.golang.org/p/XsnewSZeb2X【参考方案2】:

sync.WaitGroup 协调管道,将每个部分分成多个阶段。当您知道管道的一部分已完成(并且没有人正在写入特定通道)时,关闭通道以指示所有“工人”退出,例如

var wg sync.WaitGroup
for i := 1; i <= 5; i++ 
    i := i
    wg.Add(1)
    go func() 
        Worker(i)
        wg.Done()
    ()


// wg.Wait() signals the above have completed

缓冲通道可以方便地处理突发工作负载,但有时它们用于避免糟糕设计中的死锁。如果您想避免在 goroutine 中运行管道的某些部分,您可以缓冲一些通道(通常与工作人员的数量相匹配)以避免主 goroutine 中的阻塞。

如果您有可读写的依赖片段并希望避免死锁 - 请确保它们位于单独的 goroutine 中。让管道的所有部分都有自己的 goroutine 甚至可以消除对缓冲通道的需求:

// putting all channel work into separate goroutines
// removes the need for buffered channels
lineParseQ := make(chan string, 0)
jobProcessQ := make(chan myStruct, 0)
doneQ := make(chan myStruct, 0)

当然,这是一个折衷方案——一个 goroutine 需要大约 2K 的资源——而缓冲通道的资源要少得多。与大多数设计一样,这取决于它的使用方式。

也不要被臭名昭著的 Go for-loop gotcha 抓住,所以使用闭包分配来避免这种情况:

for i := 1; i <= 5; i++ 
    i := i       // new i (not the i above)
    go func() 
        myfunc(i) // otherwise all goroutines will most likely get '5'
    ()

最后确保在退出之前等待所有结果处理完毕。 从基于通道的函数返回并认为所有结果都已处理是一个常见的错误。在服务中,这最终将成为现实。但在独立的可执行文件中,处理循环可能仍在处理结果。

go func() 
    wgW.Wait()   // waiting on worker goroutines to finish
    close(doneQ) // safe to close results channel now
()

// ensure we don't return until all results have been processed
for a := range doneQ 
    fmt.Printf("Received %v.\n", a)

通过在主 goroutine 中处理结果,我们确保我们不会在未处理完所有内容的情况下过早返回。

综合起来:

https://play.golang.org/p/MjLpQ5xglP3

【讨论】:

这里没有对go myfunc(i) 的竞争,因为myfunc 的参数在调用产生新的goroutine 之前被评估。 go func() use(i) () 发生竞争,因为 func() ... 序列之后的空括号意味着 no 参数被评估; use(i) 使用闭包 i。但是go myfunc(i) 导致i 被评估,并且没有传递闭包变量。 (不过,这很小,实际上并没有引入任何错误。) 啊,确实!在添加了 sync.WaitGroup 逻辑并切换到使用匿名函数之后,我一定是在投影。

以上是关于使用通道作为队列的死锁的主要内容,如果未能解决你的问题,请参考以下文章

为啥在使用等待组和通道时会出现死锁?

为啥在同一个 goroutine 中使用无缓冲通道会导致死锁?

死锁问题+使用通道时增加goroutine的数量

Go中缓冲通道的死锁

去通道和死锁

如何在没有超时/死锁的情况下在PROMELA进程中发送和接收?