基本的 goroutine 和通道模式:一个通道的多个 goroutine
Posted
技术标签:
【中文标题】基本的 goroutine 和通道模式:一个通道的多个 goroutine【英文标题】:Basic goroutine and channel pattern: multiple goroutines for one channel 【发布时间】:2020-08-02 15:55:55 【问题描述】:我是 Go 新手,想知道一些我无法弄清楚的非常基本的问题。
只是为了锻炼(真正需要的抽象),我需要:
用常量ITERATIONS
固定的一些元素初始化字符串切片
遍历这个切片并为每个元素运行一个 goroutine
每个 goroutine 将花费一定的时间来处理元素(随机的第二个持续时间)
工作完成后,我希望 goroutine 将结果推送到通道
然后我需要从这个通道(在从主 goroutine 调用的函数中)捕获所有结果,将它们附加到最终切片,然后,一旦完成
打印最终切片的长度并添加一些基本时间跟踪
下面是一个有效的代码。
毫不奇怪,程序的总时间总是或多或少等于 const MAX_SEC_SLEEP 的值,因为所有处理 goroutine 都是并行工作的。
但是呢:
-
接收部分:
我真的需要将我的 select 语句包装在一个 for 循环中,迭代确切数量的 ITERATIONS ,以获得与将结束通道的 goroutine 数量完全相同的接收器数量吗?这是避免死锁的唯一方法吗?如果由于某种原因,其中一个 goroutine 失败了怎么办?
我找不到一种方法来让一个简单的 for (ever) 循环包装选择,有两种情况(一种从结果通道接收,另一种像 case <-done
这样将从函数返回)。会是更好的模式吗?
或者迭代通道并检测它是否从某个地方关闭会更好?
-
发送部分
在所有迭代之后,我应该在某个地方关闭频道吗?但我肯定会在至少一个 gouroutine 完成之前关闭它,以恐慌错误结束(试图发送到关闭的频道)
如果我要插入done <- true
模式,它会在这里吗?
-
等待组
我并没有真正尝试过等待组,我需要一种方法来捕获来自 goroutine 的所有“返回”值并将它们附加到最终切片;除了使用通道之外,我没有找到从 goroutine 返回的正确方法。
-
杂项
我应该在 func 参数中传递通道还是让它们按原样对程序全局化?
-
(错误的)代码
package main
import (
"fmt"
"log"
"math/rand"
"time"
)
const ITERATIONS = 200
var (
results chan string
initial []string
formatted []string
)
func main()
defer timeTrack(time.Now(), "program")
format() //run format goroutines
receive() //receive formatted strings
log.Printf("final slice contains %d/%d elements", len(formatted), len(initial))
//gets all results from channel and appends them to formatted slice
func receive()
for i := 0; i < ITERATIONS; i++
select
case result := <-results:
formatted = append(formatted, result)
//loops over initial slice and runs a goroutine per element
//that does some formatting operation and then pushes result to channel
func format()
for i := 0; i < ITERATIONS; i++
go func(i int)
//simulate some formatting code that can take a while
sleep := time.Duration(rand.Intn(10)) * time.Second
time.Sleep(sleep)
//append formatted string to result chan
results <- fmt.Sprintf("%s formatted", initial[i])
(i)
//initialize chans and inital slice
func init()
results = make(chan string, ITERATIONS)
for i := 0; i < ITERATIONS; i++
initial = append(initial, fmt.Sprintf("string #%d", i))
func timeTrack(start time.Time, name string)
elapsed := time.Since(start)
log.Printf("%s took %s", name, elapsed)
【问题讨论】:
【参考方案1】:我找不到一种方法来让一个简单的 for (ever) 循环包装选择,有两种情况(一种从结果通道接收,另一种类似于 case
如果通道在所有写入器完成后关闭,您可以使用简单的for ... range
循环:
for result := range ch
... do something with the result ...
为了让这个简单的变体起作用,通道必须关闭,否则for
循环将不会终止。
在所有迭代之后,我应该在某个地方关闭频道吗?
如果可能的话,是的。
我并没有真正尝试过等待组 ...
sync.WaitGroup
,或类似的东西,几乎可以肯定是去这里的方式。每个可以写入到通道的goroutine都应该被初始考虑,例如:
var wg Sync.WaitGroup
wg.Add(ITERATIONS)
然后,您可以生成所有编写并让它们运行的 goroutine。每次运行时,它都会调用wg.Done()
表示它已完成。
然后你——某处; where 部分有点棘手——调用wg.Wait()
以等待所有编写器完成。当所有作者都表示完成后,您可以close()
频道。
请注意,如果您从正在读取通道的 same 协程调用wg.Wait()
,即运行for result := range ...
循环的协程,则会遇到问题:您不能同时读取从频道和等待作者写入频道。所以你要么必须在循环结束后调用wg.Wait()
,这为时已晚;或者在循环开始之前,这还为时过早。
这使问题及其解决方案变得清晰:您必须在一个 goroutine 中读取通道,并在另一个 goroutine 中执行等待然后关闭。这些 goroutine 最多可以有一个是最初进入 main
函数的主要 goroutine。
将 wait-and-then-close goroutine 设为自己的私有 goroutine 往往非常简单:
go func()
wg.Wait()
close(results)
()
例如。
如果由于某种原因,其中一个 goroutine 失败了怎么办?
您需要在此处准确定义 fails 的含义,但如果您的意思是:如果被调用的 goroutine 本身调用了 panic
并因此没有到达它的 @ 怎么办? 987654337@ 呼叫,您可以使用defer
来确保即使在紧急情况下也会发生wg.Done()
:
func(args)
defer wg.Done()
... do the work ...
wg.Add(1)
go func(args) // `func` will definitely call `wg.Done`, even if `func` panics
我应该在 func 参数中传递通道还是让它们按原样对程序全局化?
在风格上,全局变量总是有点混乱。这并不意味着您不能使用它们;这取决于你,只要记住所有的权衡。闭包变量没有那么杂乱,但记得要小心for
循环迭代变量:
for i := 0; i < 10; i++
go func()
time.Sleep(50 * time.Millisecond)
fmt.Println(i) // BEWARE BUG: this prints 10, not 0-9
()
表现不佳。 Try this on the Go playground;请注意,go vet
现在在这里抱怨i
的使用不当。
我将您的原始示例代码带到了 Go Playground,并如上所述对其进行了最小的更改。结果是here。 (为了降低速度,我让睡眠等待 n 秒而不是 n 秒。)
【讨论】:
以上是关于基本的 goroutine 和通道模式:一个通道的多个 goroutine的主要内容,如果未能解决你的问题,请参考以下文章
golang Goroutine安全模式使用通道来抽象使用频道。