基本的 goroutine 和通道模式:一个通道的多个 goroutine

Posted

技术标签:

【中文标题】基本的 goroutine 和通道模式:一个通道的多个 goroutine【英文标题】:Basic goroutine and channel pattern: multiple goroutines for one channel 【发布时间】:2020-08-02 15:55:55 【问题描述】:

我是 Go 新手,想知道一些我无法弄清楚的非常基本的问题。

只是为了锻炼(真正需要的抽象),我需要:

用常量ITERATIONS 固定的一些元素初始化字符串切片 遍历这个切片并为每个元素运行一个 goroutine 每个 goroutine 将花费一定的时间来处理元素(随机的第二个持续时间) 工作完成后,我希望 goroutine 将结果推送到通道 然后我需要从这个通道(在从主 goroutine 调用的函数中)捕获所有结果,将它们附加到最终切片,然后,一旦完成 打印最终切片的长度并添加一些基本时间跟踪

下面是一个有效的代码。

毫不奇怪,程序的总时间总是或多或少等于 const MAX_SEC_SLEEP 的值,因为所有处理 goroutine 都是并行工作的。

但是呢:

    接收部分:

我真的需要将我的 select 语句包装在一个 for 循环中,迭代确切数量的 ITERATIONS ,以获得与将结束通道的 goroutine 数量完全相同的接收器数量吗?这是避免死锁的唯一方法吗?如果由于某种原因,其中一个 goroutine 失败了怎么办?

我找不到一种方法来让一个简单的 for (ever) 循环包装选择,有两种情况(一种从结果通道接收,另一种像 case <-done 这样将从函数返回)。会是更好的模式吗?

或者迭代通道并检测它是否从某个地方关闭会更好?

    发送部分

在所有迭代之后,我应该在某个地方关闭频道吗?但我肯定会在至少一个 gouroutine 完成之前关闭它,以恐慌错误结束(试图发送到关闭的频道)

如果我要插入done <- true 模式,它会在这里吗?

    等待组

我并没有真正尝试过等待组,我需要一种方法来捕获来自 goroutine 的所有“返回”值并将它们附加到最终切片;除了使用通道之外,我没有找到从 goroutine 返回的正确方法。

    杂项

我应该在 func 参数中传递通道还是让它们按原样对程序全局化?

    (错误的)代码
package main

import (
    "fmt"
    "log"
    "math/rand"
    "time"
)

const ITERATIONS = 200

var (
    results   chan string
    initial   []string
    formatted []string
)

func main() 
    defer timeTrack(time.Now(), "program")

    format()  //run format goroutines
    receive() //receive formatted strings

    log.Printf("final slice contains %d/%d elements", len(formatted), len(initial))


//gets all results from channel and appends them to formatted slice
func receive() 
    for i := 0; i < ITERATIONS; i++ 
        select 
        case result := <-results:
            formatted = append(formatted, result)
        
    


//loops over initial slice and runs a goroutine per element
//that does some formatting operation and then pushes result to channel
func format() 
    for i := 0; i < ITERATIONS; i++ 
        go func(i int) 
            //simulate some formatting code that can take a while
            sleep := time.Duration(rand.Intn(10)) * time.Second
            time.Sleep(sleep)
            //append formatted string to result chan
            results <- fmt.Sprintf("%s formatted", initial[i])
        (i)
    



//initialize chans and inital slice
func init() 
    results = make(chan string, ITERATIONS)
    for i := 0; i < ITERATIONS; i++ 
        initial = append(initial, fmt.Sprintf("string #%d", i))
    


func timeTrack(start time.Time, name string) 
    elapsed := time.Since(start)
    log.Printf("%s took %s", name, elapsed)



【问题讨论】:

【参考方案1】:

我找不到一种方法来让一个简单的 for (ever) 循环包装选择,有两种情况(一种从结果通道接收,另一种类似于 case

如果通道在所有写入器完成后关闭,您可以使用简单的for ... range 循环:

for result := range ch 
    ... do something with the result ...

为了让这个简单的变体起作用,通道必须关闭,否则for 循环将不会终止。

在所有迭代之后,我应该在某个地方关闭频道吗?

如果可能的话,是的。

我并没有真正尝试过等待组 ...

sync.WaitGroup,或类似的东西,几乎可以肯定是去这里的方式。每个可以写入通道的goroutine都应该被初始考虑,例如:

var wg Sync.WaitGroup
wg.Add(ITERATIONS)

然后,您可以生成所有编写并让它们运行的​​ goroutine。每次运行时,它都会调用wg.Done() 表示它已完成。

然后你——某处; where 部分有点棘手——调用wg.Wait() 以等待所有编写器完成。当所有作者都表示完成后,您可以close()频道。

请注意,如果您从正在读取通道的 same 协程调用wg.Wait(),即运行for result := range ... 循环的协程,则会遇到问题:您不能同时读取从频道等待作者写入频道。所以你要么必须在循环结束后调用wg.Wait(),这为时已晚;或者在循环开始之前,这还为时过早。

这使问题及其解决方案变得清晰:您必须在一个 goroutine 中读取通道,并在另一个 goroutine 中执行等待然后关闭。这些 goroutine 最多可以有一个是最初进入 main 函数的主要 goroutine。

将 wait-and-then-close goroutine 设为自己的私有 goroutine 往往非常简单:

go func() 
    wg.Wait()
    close(results)
()

例如。

如果由于某种原因,其中一个 goroutine 失败了怎么办?

您需要在此处准确定义 fails 的含义,但如果您的意思是:如果被调用的 goroutine 本身调用了 panic 并因此没有到达它的 @ 怎么办? 987654337@ 呼叫,您可以使用defer 来确保即使在紧急情况下也会发生wg.Done()

func(args) 
    defer wg.Done()
    ... do the work ...


wg.Add(1)
go func(args) // `func` will definitely call `wg.Done`, even if `func` panics

我应该在 func 参数中传递通道还是让它们按原样对程序全局化?

在风格上,全局变量总是有点混乱。这并不意味着您不能使用它们;这取决于你,只要记住所有的权衡。闭包变量没有那么杂乱,但记得要小心for循环迭代变量:

for i := 0; i < 10; i++ 
    go func() 
        time.Sleep(50 * time.Millisecond)
        fmt.Println(i)  // BEWARE BUG: this prints 10, not 0-9
    ()

表现不佳。 Try this on the Go playground;请注意,go vet 现在在这里抱怨i 的使用不当。

我将您的原始示例代码带到了 Go Playground,并如上所述对其进行了最小的更改。结果是here。 (为了降低速度,我让睡眠等待 n 秒而不是 n 秒。)

【讨论】:

以上是关于基本的 goroutine 和通道模式:一个通道的多个 goroutine的主要内容,如果未能解决你的问题,请参考以下文章

golang Goroutine安全模式使用通道来抽象使用频道。

信号 goroutine 在通道关闭时停止

与通道和 goroutine 同步

使用带有选择的通道时的 Goroutine 死锁

Golang入门到项目实战 golang并发变成之通道channel

从 goroutine 通道读取而不阻塞