Codewalk之Golang并发代码回顾

Posted

技术标签:

【中文标题】Codewalk之Golang并发代码回顾【英文标题】:Golang Concurrency Code Review of Codewalk 【发布时间】:2021-08-30 21:27:38 【问题描述】:

我正在尝试了解 Golang 并发的最佳实践。我阅读了 O'Reilly 关于 Go 并发的书,然后又回到了 Golang Codewalks,特别是这个例子:

https://golang.org/doc/codewalk/sharemem/

这是我希望与您一起查看的代码,以便更多地了解 Go。我的第一印象是这段代码破坏了一些最佳实践。这当然是我(非常)没有经验的意见,我想讨论并获得对该过程的一些见解。这不是关于谁对谁错,请友善,我只是想分享我的观点并获得一些反馈。也许这个讨论会帮助其他人明白我为什么错了,并教给他们一些东西。

我完全清楚这段代码的目的是教初学者,而不是完美的代码。

问题 1 - 没有 Goroutine 清理逻辑

func main() 
    // Create our input and output channels.
    pending, complete := make(chan *Resource), make(chan *Resource)

    // Launch the StateMonitor.
    status := StateMonitor(statusInterval)

    // Launch some Poller goroutines.
    for i := 0; i < numPollers; i++ 
        go Poller(pending, complete, status)
    

    // Send some Resources to the pending queue.
    go func() 
        for _, url := range urls 
            pending <- &Resourceurl: url
        
    ()

    for r := range complete 
        go r.Sleep(pending)
    

main 方法无法清理 Goroutine,这意味着如果这是库的一部分,它们就会被泄露。

问题 2 - 作家没有产生频道

我认为作为最佳实践,createwritecleanup 通道的逻辑应该由单个实体控制(或实体组)。这背后的原因是作者在写到封闭的频道时会感到恐慌。因此,作者最好创建通道、写入通道并控制何时关闭通道。如果有多个写入器,它们可以与 WaitGroup 同步。

func StateMonitor(updateInterval time.Duration) chan<- State 
    updates := make(chan State)
    urlStatus := make(map[string]string)
    ticker := time.NewTicker(updateInterval)
    go func() 
        for 
            select 
            case <-ticker.C:
                logState(urlStatus)
            case s := <-updates:
                urlStatus[s.url] = s.status
            
        
    ()
    return updates

这个函数不应该负责创建更新通道,因为它是通道的读取者,而不是写入者。这个通道的作者应该创建它并将它传递给这个函数。基本上是对功能说“我将通过此渠道将更新传递给您”。但相反,这个函数正在创建一个通道,不清楚谁负责清理它。

问题 3 - 异步写入通道

这个函数:

func (r *Resource) Sleep(done chan<- *Resource) 
    time.Sleep(pollInterval + errTimeout*time.Duration(r.errCount))
    done <- r

在这里被引用:

for r := range complete 
    go r.Sleep(pending)

这似乎是个糟糕的主意。当这个通道关闭时,我们将有一个 goroutine 在我们无法到达的某个地方等待写入该通道。假设这个 goroutine 休眠了 1 小时,当它醒来时,它会尝试写入一个在清理过程中关闭的通道。这是为什么频道的编写者应该负责清理过程的另一个例子。在这里,我们有一个完全免费的作家,不知道频道何时关闭。

如果我错过了该代码中的任何问题(与并发相关),请列出它们。这不一定是一个客观的问题,如果您出于任何原因以不同的方式设计代码,我也有兴趣了解它。

这段代码的最大教训

对我来说,查看这段代码给我的最大教训是,通道的清理和写入通道必须同步。对于,它们必须相同或至少以某种方式进行通信(可能通过其他通道或原语)以避免写入已关闭的通道。

【问题讨论】:

Poller function 在做什么?如果这是 1 个文件中的简单应用程序,您可以使用 GoPlayground 链接更新帖子吗? 您可以享受Bryan Mills's GopherCon 2018 talk: Rethinking Classical Concurrency Patterns。 @jub0bs 那段视频让我大吃一惊。我从没想过 workerpool 会是个坏习惯!你还有其他类似的视频吗? @AFP_555 试试这个:Alan Shreve - Principles of designing Go APIs with channels (GopherCon India 2015)。 @AFP_555 和Peter Bourgon - Ways To Do Things (Release Party #GoSF 2017) 【参考方案1】:

    main方法,所以不需要清理。当main 返回时,程序退出。如果这不是main,那么您是正确的。

    没有适合所有用例的最佳实践。您在此处显示的代码是一种非常常见的模式。该函数创建一个 goroutine,并返回一个通道,以便其他人可以与该 goroutine 通信。没有规定必须如何创建频道的规则。但是,没有办法终止该 goroutine。这种模式非常适合的一个用例是从 数据库。通道允许流式传输数据,因为它是从 数据库。在这种情况下,通常有其他方法可以终止 不过 goroutine,就像传递上下文一样。

    同样,对于如何创建/关闭频道没有硬性规定。通道可以保持打开状态,不再使用时会被垃圾回收。如果用例需要,通道可以无限期开放,您担心的场景永远不会发生。

【讨论】:

但是我们不应该一直努力对我们创建的 goroutine 进行清理吗?假设今天这个逻辑是主要的,但明天我想在其他地方使用它。如果我事先没有考虑清理,这可能意味着要彻底改变代码的设计以使其可重用。 如果您正在处理需要清理的资源,则需要正常关闭。如果这段代码不在 main 中,它应该担心清理。 但如果不再需要它们,最好关闭它们,对吧? 不,不是。关闭通道通常用于传达数据流的结束。您不必关闭频道。换句话说:关闭通道并不会释放资源。该频道仍然坐在那里,上面挂着一个表明它已关闭的标志。 哇,我不知道关闭频道并没有清理资源。很高兴知道。因此,清理这些资源的唯一方法是当我们不再引用它并且 GC 清理它时,不管通道状态(打开、关闭)如何。【参考方案2】:
    当您询问此代码是否是库的一部分时,是的,在库函数内部生成没有清理的 goroutine 将是一种糟糕的做法。如果这些 goroutine 执行库的记录行为,那么调用者不知道该行为何时发生是有问题的。如果您有任何典型的“即发即弃”的行为,则应该由调用者选择何时忘记它。例如:
func doAfter5Minutes(f func()) 
   go func() 
       time.Sleep(5 * time.Minute)
       f()
       log.Println("done!")
   ()

有道理,对吧?当您调用该函数时,它会在 5 分钟后执行某些操作。问题是很容易像这样误用这个函数:

// do the important task every 5 minutes
for 
    doAfter5Minutes(importantTaskFunction)

乍一看,这似乎很好。我们每 5 分钟做一次重要的任务,对吧?实际上,我们正在非常快速地生成许多 goroutine,可能会在它们开始下降之前消耗所有可用内存。

我们可以实现某种回调或通道来在任务完成时发出信号,但实际上,函数应该简化如下:

func doAfter5Minutes(f func()) 
   time.Sleep(5 * time.Minute)
   f()
   log.Println("done!")

现在调用者可以选择如何使用它:

// call synchronously
doAfter5Minutes(importantTaskFunction)
// fire and forget
go doAfter5Minutes(importantTaskFunction)
    这个功能可以说也应该改变。正如您所说,作者应该有效地拥有频道,因为他们应该是关闭频道的人。这个通道读取函数坚持创建它读取的通道的事实实际上迫使自己进入上面提到的这种糟糕的“即发即弃”模式。注意该函数需要如何从通道中读取,但它还需要在读取之前返回通道。因此,它必须将读取行为放在一个新的、非托管的 goroutine 中,以允许自己立即返回通道。
func StateMonitor(updates chan State, updateInterval time.Duration) 
    urlStatus := make(map[string]string)
    ticker := time.NewTicker(updateInterval)
    defer ticker.Stop() // not stopping the ticker is also a resource leak

    for 
        select 
        case <-ticker.C:
            logState(urlStatus)
        case s := <-updates:
            urlStatus[s.url] = s.status
        
    


请注意,该功能现在更简单、更灵活且同步。前一个版本真正完成的唯一一件事是,它(大部分)保证StateMonitor 的每个实例都将拥有一个自己的通道,并且您不会遇到多个监视器竞争读取相同的情况渠道。虽然这可能可以帮助您避免某些类别的错误,但它也会使函数的灵活性大大降低,并且更有可能发生资源泄漏。

    我不确定我是否真的理解这个例子,但关闭频道的黄金法则是作者应该始终负责关闭频道。请牢记此规则,并注意有关此代码的几点:
Sleep 方法写入r Sleep 方法是并发执行的,没有任何方法可以跟踪正在运行的实例数量、它们处于什么状态等。

仅基于这些点,我们可以说程序中可能没有任何地方可以安全地关闭r,因为似乎无法知道它是否会再次使用。

【讨论】:

以上是关于Codewalk之Golang并发代码回顾的主要内容,如果未能解决你的问题,请参考以下文章

Golang 之 WaitGroup 源码解析

golang goroutine例子[golang并发代码片段]

Go语言学习之旅--并发编程

Go语言学习之旅--并发编程

Go语言学习之旅--并发编程

为啥添加并发会减慢这个 golang 代码?