如何关闭多个 goroutine 正在发送的通道?

Posted

技术标签:

【中文标题】如何关闭多个 goroutine 正在发送的通道?【英文标题】:How do I close a channel multiple goroutines are sending on? 【发布时间】:2013-03-20 20:10:40 【问题描述】:

我正在尝试并行进行一些计算。该程序的设计目的是让每个 worker goroutine 将已解决的难题的“碎片”发送回控制器 goroutine,该控制器 goroutine 等待接收并组装从 worker 例程发送的所有内容。

关闭单通道的惯用 Go 是什么?我不能在每个 goroutine 的通道上调用 close,因为那样我可能会在关闭的通道上发送。同样,没有办法预先确定哪个 goroutine 将首先完成。这里需要sync.WaitGroup吗?

【问题讨论】:

【参考方案1】:

这是一个使用sync.WaitGroup 来做你正在寻找的事情的例子,

这个例子接受一个很长的整数列表,然后通过给 N 个并行工作人员一个相同大小的输入数据块来将它们全部相加。可以在go playground上运行:

package main

import (
    "fmt"
    "sync"
)

const WorkerCount = 10

func main() 
    // Some input data to operate on.
    // Each worker gets an equal share to work on.
    data := make([]int, WorkerCount*10)

    for i := range data 
        data[i] = i
    

    // Sum all the entries.
    result := sum(data)

    fmt.Printf("Sum: %d\n", result)


// sum adds up the numbers in the given list, by having the operation delegated
// to workers operating in parallel on sub-slices of the input data.
func sum(data []int) int 
    var sum int

    result := make(chan int)
    defer close(result)

    // Accumulate results from workers.
    go func() 
        for 
            select 
            case value := <-result:
                sum += value
            
        
    ()

    // The WaitGroup will track completion of all our workers.
    wg := new(sync.WaitGroup)
    wg.Add(WorkerCount)

    // Divide the work up over the number of workers.
    chunkSize := len(data) / WorkerCount

    // Spawn workers.
    for i := 0; i < WorkerCount; i++ 
        go func(i int) 
            offset := i * chunkSize

            worker(result, data[offset:offset+chunkSize])
            wg.Done()
        (i)
    

    // Wait for all workers to finish, before returning the result.
    wg.Wait()

    return sum


// worker sums up the numbers in the given list.
func worker(result chan int, data []int) 
    var sum int

    for _, v := range data 
        sum += v
    

    result <- sum

【讨论】:

其中一些代码有点……奇怪。特别是,带有 for/single-case-select 的 goroutine 会累积结果并在不同步的情况下覆盖变量。一些小的重新安排和事情变得更可靠/更容易理解:play.golang.org/p/5bmlTbdIQa【参考方案2】:

是的,这是 sync.WaitGroup 的完美用例。

您的另一种选择是每个 goroutine 使用 1 个通道和一个多路复用器 goroutine,该多路复用器 goroutine 从每个通道馈送到单个通道。但这会很快变得笨拙,所以我只需要一个 sync.WaitGroup。

【讨论】:

以上是关于如何关闭多个 goroutine 正在发送的通道?的主要内容,如果未能解决你的问题,请参考以下文章

信号 goroutine 在通道关闭时停止

如何在处理结果时正确关闭 Goroutines 中的共享通道

单个通道上的多个接收器。谁得到数据?

使用通道作为队列的死锁

如何处理共享同一通道的多个 goroutine

如何返回第一个http响应以回答