如何在处理结果时正确关闭 Goroutines 中的共享通道
Posted
技术标签:
【中文标题】如何在处理结果时正确关闭 Goroutines 中的共享通道【英文标题】:How do I correctly close shared channels in Goroutines while processing results 【发布时间】:2020-03-19 22:48:25 【问题描述】:我正在尝试找到正确的方法来使用 worker go 例程产生的结果,同时在所有工作完成后优雅地退出结果循环。为了说明,我做了下面的例子。我的真实案例与这个示例略有不同,因为我不知道每个工作程序 go 例程将返回多少“工作”,显然这些 for 循环正在执行固定数量的结果 (5)。
我是 goroutines 和 channel 的新手,但以下是我理解的基本租户;
只有发件人才能关闭频道 在频道上执行range
将继续,直到频道关闭
package main
import (
"fmt"
"sync"
)
func worker1(r chan string, wg *sync.WaitGroup)
for i := 0; i < 5; i++
r <- fmt.Sprintf("1.%d", i)
wg.Done()
func worker2(r chan string, wg *sync.WaitGroup)
for i := 0; i < 5; i++
r <- fmt.Sprintf("2.%d", i)
wg.Done()
func main()
var wg sync.WaitGroup
r := make(chan string)
wg.Add(2)
go worker1(r, &wg)
go worker2(r, &wg)
for i := range r
fmt.Printf("Got job result: %s\n", i)
wg.Wait()
这个例子是死锁的,因为范围循环永远不会退出,因为通道永远不会关闭。工作完成后,我可以在通道上执行关闭(即将wg.Done()
替换为close(r)
),但是当另一个工作协程尝试将进一步的结果发送到已经关闭的通道时,我会感到恐慌。
最后我想我可以将wg.Wait()
移到结果循环上方,完成后关闭通道,然后开始打印结果,但这意味着在所有工作都完成之前我无法打印任何结果线程。
在所有工作线程完成后优雅地退出结果循环的正确方法是什么,而不是等到所有工作都完成后再开始打印结果?
【问题讨论】:
你go func() wg.Wait();close(r); ()
将wg
WaitGroup 视为writer 组 的计数器。您需要一组实体(统称为“作者”)写入通道并在完成后关闭它。为了使这简单,您可以拥有 n(目前 n==2)实际写作的“作家”,加上一个额外的非写作“作家组成员”,它只是等待其他作家说“完成”,然后执行close
。这是 mh-cbon 的评论和 Nick Corin 的回答中的匿名函数。
【参考方案1】:
我已经编辑了您的代码,使其可以在没有死锁的情况下工作。问题是在通道上接收阻塞了主线程,你的两个goroutines
都没有发送更多数据。
此解决方案运行一个新的goroutine
,它会在WaitGroup
完成后关闭结果通道。
package main
import (
"fmt"
"sync"
)
func worker1(r chan string, wg *sync.WaitGroup)
for i := 0; i < 5; i++
r <- fmt.Sprintf("1.%d", i)
wg.Done()
func worker2(r chan string, wg *sync.WaitGroup)
for i := 0; i < 5; i++
r <- fmt.Sprintf("2.%d", i)
wg.Done()
func main()
var wg sync.WaitGroup
r := make(chan string)
wg.Add(2)
go worker1(r, &wg)
go worker2(r, &wg)
go func()
defer close(r)
wg.Wait()
()
for i := range r
fmt.Printf("Got job result: %s\n", i)
(Go Playground)
【讨论】:
谢谢!对不起,我没有想到这么基本的东西。很感激,现在一切都说得通了。以上是关于如何在处理结果时正确关闭 Goroutines 中的共享通道的主要内容,如果未能解决你的问题,请参考以下文章
如何在Golang中实现正确的并行性? goroutines是否与Go1.5 +并行?
如何设计 goroutines 程序来处理 api 限制错误