惯用的 goroutine 终止和错误处理

Posted

技术标签:

【中文标题】惯用的 goroutine 终止和错误处理【英文标题】:Idiomatic goroutine termination and error handling 【发布时间】:2017-04-10 02:54:38 【问题描述】:

我在 go 中有一个简单的并发用例,但我想不出一个优雅的解决方案来解决我的问题。

我想编写一个方法fetchAll,从远程服务器并行查询未指定数量的资源。如果任何提取失败,我想立即返回第一个错误。

我最初的实现泄露了 goroutine:

    package main

    import (
      "fmt"
      "math/rand"
      "sync"
      "time"
    )

    func fetchAll() error 
      wg := sync.WaitGroup
      errs := make(chan error)
      leaks := make(map[int]struct)
      defer fmt.Println("these goroutines leaked:", leaks)

      // run all the http requests in parallel
      for i := 0; i < 4; i++ 
        leaks[i] = struct
        wg.Add(1)
        go func(i int) 
          defer wg.Done()
          defer delete(leaks, i)

          // pretend this does an http request and returns an error
          time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
          errs <- fmt.Errorf("goroutine %d's error returned", i)
        (i)
      

      // wait until all the fetches are done and close the error
      // channel so the loop below terminates
      go func() 
        wg.Wait()
        close(errs)
      ()

      // return the first error
      for err := range errs 
        if err != nil 
          return err
        
      

      return nil
    

    func main() 
      fmt.Println(fetchAll())
    

游乐场:https://play.golang.org/p/Be93J514R5

通过阅读https://blog.golang.org/pipelines 我知道我可以创建一个信号通道来清理其他线程。或者,我可以使用context 来完成它。但似乎这样一个简单的用例应该有一个我所缺少的更简单的解决方案。

【问题讨论】:

【参考方案1】:

使用Error Group 使这更加简单。这会自动等待所有提供的 Go 例程成功完成,或者在任何一个例程返回错误的情况下取消所有剩余的例程(在这种情况下,该错误是返回给调用者的一个气泡)。

package main

import (
        "context"
        "fmt"
        "math/rand"
        "time"

        "golang.org/x/sync/errgroup"
)

func fetchAll(ctx context.Context) error 
        errs, ctx := errgroup.WithContext(ctx)

        // run all the http requests in parallel
        for i := 0; i < 4; i++ 
                errs.Go(func() error 
                        // pretend this does an http request and returns an error                                                  
                        time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)                                               
                        return fmt.Errorf("error in go routine, bailing")                                                      
                )
        

        // Wait for completion and return the first error (if any)                                                                 
        return errs.Wait()


func main() 
        fmt.Println(fetchAll(context.Background()))

【讨论】:

+1,这太棒了。如此干净和地道。不过是一个小附录——“剩余的”goroutines 没有被 errgroup 触及。 Errgroup 通常只是等待它们完成。对于 fail-fast,每个 goroutine 应该有相同的 ctx 并积极关注 ctx.Done。换句话说,Go 语言中的动词“取消”并不意味着“杀死 goroutine”。 但是我们无法将参数传递给 errs.Go @here,如果在到达return errs.Wait() 之前返回了像 fetchAll() 这样的情况,是否会有任何 goroutine 泄漏 - (虽然这是一个不好的做法) 我喜欢这个,但有一件事引起了我的注意。如果您有一个想要传递给 Go 例程函数的变量怎么办。 errs.Go 需要没有参数的函数。您需要遵守关闭规则golang.org/doc/faq#closures_and_goroutines【参考方案2】:

除了一个你的 goroutine 外,所有的 goroutine 都被泄露了,因为它们仍在等待发送到 errs 通道——你永远不会完成清空它的 for-range。您还泄露了负责关闭 errs 通道的 goroutine,因为等待组永远不会完成。

(另外,正如 Andy 所指出的,从 map 中删除不是线程安全的,因此需要对互斥体进行保护。)

但是,我认为这里甚至不需要映射、互斥体、等待组、上下文等。我会重写整个事情,只使用基本的频道操作,如下所示:

package main

import (
    "fmt"
    "math/rand"
    "time"
)

func fetchAll() error 
    var N = 4
    quit := make(chan bool)
    errc := make(chan error)
    done := make(chan error)
    for i := 0; i < N; i++ 
        go func(i int) 
            // dummy fetch
            time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
            err := error(nil)
            if rand.Intn(2) == 0 
                err = fmt.Errorf("goroutine %d's error returned", i)
            
            ch := done // we'll send to done if nil error and to errc otherwise
            if err != nil 
                ch = errc
            
            select 
            case ch <- err:
                return
            case <-quit:
                return
            
        (i)
    
    count := 0
    for 
        select 
        case err := <-errc:
            close(quit)
            return err
        case <-done:
            count++
            if count == N 
                return nil // got all N signals, so there was no error
            
        
    


func main() 
    rand.Seed(time.Now().UnixNano())
    fmt.Println(fetchAll())

游乐场链接:https://play.golang.org/p/mxGhSYYkOb

编辑:确实有一个愚蠢的错误,感谢您指出。我修复了上面的代码(我认为......)。我还为添加的 Realism™ 添加了一些随机性。

另外,我想强调的是,确实有多种方法可以解决这个问题,而我的解决方案只是一种方法。归根结底,这取决于个人品味,但总的来说,您希望努力实现“惯用”代码 - 以及一种让您感觉自然且易于理解的风格。

【讨论】:

ec := chan error(nil) 很有趣,我以前没见过这种模式。我认为select 原因是按随机顺序执行的。是否存在done&lt;-trueec&lt;-err 之前发送的竞争条件? 好球,绝对有比赛!我写的很匆忙,就像我提到的那样,没有测试它(你应该总是这样做)。幸运的是,修复错误只会使整个代码更加简单,在这种情况下,不需要 chan error(nil) 技巧(有时当您想要阻止来自 select 语句的发送时,这很有用,因此您不必编写多个条件选择)。感谢您指出我的错误:) 这可以进一步简化。您不需要单独的完成和错误通道,还有其他一些事情需要改进。 play.golang.org/p/1a0ZXuy3Dz @MennoSmits 您的简化很有趣,事实上我想知道为什么我们甚至需要quit 频道和select 声明? play.golang.org/p/PmovudTviF quit 频道对于在其中一个失败时立即停止所有 goroutine 是必要的。 OP的问题陈述暗示了这一点。如果在一个失败时其他 goroutine 可以继续运行,那么您的解决方案就是正确的。【参考方案3】:

这是一个使用errgroup 的更完整示例,由joth 建议。它显示处理成功的数据,并会在第一个错误时退出。

https://play.golang.org/p/rU1v-Mp2ijo

package main

import (
    "context"
    "fmt"
    "golang.org/x/sync/errgroup"
    "math/rand"
    "time"
)

func fetchAll() error 
    g, ctx := errgroup.WithContext(context.Background())
    results := make(chan int)
    for i := 0; i < 4; i++ 
        current := i
        g.Go(func() error 
            // Simulate delay with random errors.
            time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
            if rand.Intn(2) == 0 
                return fmt.Errorf("goroutine %d's error returned", current)
            
            // Pass processed data to channel, or receive a context completion.
            select 
            case results <- current:
                return nil
            // Close out if another error occurs.
            case <-ctx.Done():
                return ctx.Err()
            
        )
    

    // Elegant way to close out the channel when the first error occurs or
    // when processing is successful.
    go func() 
        g.Wait()
        close(results)
    ()

    for result := range results 
        fmt.Println("processed", result)
    

    // Wait for all fetches to complete.
    return g.Wait()


func main() 
    fmt.Println(fetchAll())


【讨论】:

那个“优雅的方式”评论让我几乎可以肯定额外的 goroutine 是一场比赛。现在我把它解释为只是一个小的额外生产者,它只会关上门。这理顺了我的直觉。与往常一样,生产者关闭而不是消费者。【参考方案4】:

只要每个 goroutine 完成,你就不会泄漏任何东西。您应该将错误通道创建为缓冲的缓冲区大小等于 goroutine 的数量,以便通道上的发送操作不会阻塞。每个 goroutine 应该总是在它完成时在通道上发送一些东西,无论它是成功还是失败。然后底部的循环可以迭代 goroutine 的数量,如果它得到一个非零错误则返回。您不需要 WaitGroup 或关闭通道的其他 goroutine。

我认为 goroutine 泄漏的原因是当你遇到第一个错误时返回,所以其中一些仍在运行。

顺便说一下,map 不是 goroutine 安全的。如果你在 goroutines 之间共享一个 map 并且其中一些正在对 map 进行更改,你需要使用互斥锁来保护它。

【讨论】:

我同意使用缓冲通道是可行的,但我试图避免这种解决方案,尽管事先并不容易知道提取次数(实际代码比示例更复杂)。 【参考方案5】:

此答案包括将回复返回到 doneData 的能力 -

package main

import (
    "fmt"
    "math/rand"
    "os"
    "strconv"
)

var doneData []string // responses

func fetchAll(n int, doneCh chan bool, errCh chan error) 
    partialDoneCh := make(chan string)

    for i := 0; i < n; i++ 
        go func(i int) 

            if r := rand.Intn(100); r != 0 && r%10 == 0 
                // simulate an error
                errCh <- fmt.Errorf("e33or for reqno=" + strconv.Itoa(r))
             else 
                partialDoneCh <- strconv.Itoa(i)
            
        (i)
    

    // mutation of doneData
    for d := range partialDoneCh 
        doneData = append(doneData, d)
        if len(doneData) == n 
            close(partialDoneCh)
            doneCh <- true
        
    


func main() 
    // rand.Seed(1)
    var n int
    var e error
    if len(os.Args) > 1 
        if n, e = strconv.Atoi(os.Args[1]); e != nil 
            panic(e)
        
     else 
        n = 5
    

    doneCh := make(chan bool)
    errCh := make(chan error)

    go fetchAll(n, doneCh, errCh)
    fmt.Println("main: end")

    select 
    case <-doneCh:
        fmt.Println("success:", doneData)
    case e := <-errCh:
        fmt.Println("failure:", e, doneData)
    

使用go run filename.go 50 执行,其中 N=50 即并行度

【讨论】:

这不是惯用的。这不是一个正确的答案,它不是问题,它没有利用语言提供的 dioms 来编写解决方案,这不符合 OP。此外,它是令人费解的,至少,doneDaa 是无用的,而 doneCh 使用适当的习语是完全可选的。 @mh-cbon 如果您提供了具体的更正,将会很有帮助。

以上是关于惯用的 goroutine 终止和错误处理的主要内容,如果未能解决你的问题,请参考以下文章

惯用节点错误处理

Clojure 中的惯用错误处理

在golang中处理逻辑错误与编程错误的惯用方法

如何设计 goroutines 程序来处理 api 限制错误

如何使用适用于 DynamoDb 的 AWS Rust 开发工具包编写惯用的 Rust 错误处理?

如果成功,从没有结果的函数返回错误的惯用方法是啥?