关闭和发送到通道之间的竞争条件

Posted

技术标签:

【中文标题】关闭和发送到通道之间的竞争条件【英文标题】:Race condition between close and send to channel 【发布时间】:2020-04-18 23:42:37 【问题描述】:

我正在尝试使用工作池构建通用管道库。我为源、管道和接收器创建了一个接口。您会看到,管道的工作是从输入通道接收数据,对其进行处理,然后将结果输出到通道上。这是它的预期行为:

    从输入通道接收数据。 将数据委托给可用的工作人员。 worker 将结果发送到输出通道。 所有工作人员完成后关闭输出通道。
func (p *pipe) Process(in chan interface) (out chan interface) 
    var wg sync.WaitGroup
    out = make(chan interface, 100)
    go func() 
        for i := 1; i <= 100; i++ 
            go p.work(in, out, &wg)
        
        wg.Wait()
        close(out)
    ()

    return


func (p *pipe) work(jobs <-chan interface, out chan<- interface, wg *sync.WaitGroup) 
    for j := range jobs 
        func(j Job) 
            defer wg.Done()
            wg.Add(1)

            res := doSomethingWith(j)

            out <- res
        (j)
    


但是,运行它可能会在不处理所有输入的情况下退出,或者会出现send on closed channel 消息的恐慌。使用-race 标志构建源会在close(out)out &lt;- res 之间发出数据竞争警告。

这就是我认为可能发生的事情。一旦许多工人完成了他们的工作,wg 的计数器会在一瞬间达到零。因此,wg.Wait() 完成,程序继续执行close(out)。同时,job 通道还没有完成数据生成,这意味着一些工作人员仍在另一个 goroutine 中运行。由于out 频道已经关闭,因此会导致恐慌。

是否应该将等待组放在其他地方?或者有没有更好的方法来等待所有工人完成?

【问题讨论】:

【参考方案1】:

作业完成的速度可能与发送它们的速度一样快。在这种情况下,即使有更多的项目要处理,WaitGroup 也会浮动在接近于零的位置。

对此的一种解决方法是在发送作业之前添加一个,并在发送完所有作业后减少一个,有效地将发件人视为“作业”之一。在这种情况下,我们最好在发件人中使用wg.Add

func (p *pipe) Process(in chan interface) (out chan interface) 
    var wg sync.WaitGroup
    out = make(chan interface, 100)
    go func() 
        for i := 1; i <= 100; i++ 
            wg.Add(1)
            go p.work(in, out, &wg)
        
        wg.Wait()
        close(out)
    ()

    return


func (p *pipe) work(jobs <-chan interface, out chan<- interface, wg *sync.WaitGroup) 
    for j := range jobs 
        func(j Job) 
            res := doSomethingWith(j)

            out <- res
            wg.Done()
        (j)
    

我在代码中注意到的一件事是每个作业都会启动一个 goroutine。同时,每个作业循环处理jobs 通道,直到清空/关闭。似乎没有必要两者都做。

【讨论】:

如果wg.Add(1)加在p.work()之前,wg不是只能达到100吗?假设队列中有 1000 个作业。 wg在完成100个作业后不会变为0吗? 会的,是的。 pipe.work 的 wg.Done 应该在其第一行被延迟,或者最好移动到一个匿名函数中,将 p.work 调用包装在 pipe.Process 中,这样 wg 根本不需要传递。 是的,没错。您可以将wg 传递给消耗out 的事物,并在每个事物消耗完之后调用wg.Done @icio 是的,那行得通。 @karmakaze 我试图避免out 的用户不得不处理有关wg 的任何细节。 在这种情况下,让单个发送者执行wg.Add,而处理器最后执行wg.Done 是有意义的。【参考方案2】:

目前尚不清楚为什么每个工作都需要一个工人,但如果你这样做了,你可以重组你的外循环设置(见下面未经测试的代码)。这种方式首先消除了对工作池的需求。

不过,总是要先执行wg.Add 剥离任何工人。在这里,您正在剥离 100 名工人:

var wg sync.WaitGroup
out = make(chan interface, 100)
go func() 
    for i := 1; i <= 100; i++ 
        go p.work(in, out, &wg)
    
    wg.Wait()
    close(out)
()

因此你可以这样做:

var wg sync.WaitGroup
out = make(chan interface, 100)
go func() 
    wg.Add(100)  // ADDED - count the 100 workers
    for i := 1; i <= 100; i++ 
        go p.work(in, out, &wg)
    
    wg.Wait()
    close(out)
()

请注意,您现在可以将 wg 本身向下移动到从工作人员分离出来的 goroutine 中。如果您放弃让每个工作人员将工作作为新的 goroutine 分拆的想法,这可以使事情变得更清洁。但是,如果每个 worker 都将衍生出另一个 goroutine,那么该 worker 本身也必须使用 wg.Add,如下所示:

for j := range jobs 
    wg.Add(1)  // ADDED - count the spun-off goroutines
    func(j Job) 
        res := doSomethingWith(j)

        out <- res
        wg.Done()  // MOVED (for illustration only, can defer as before)
    (j)

wg.Done() // ADDED - our work in `p.work` is now done

也就是说,每个匿名函数都是通道的另一个用户,因此在启动新的 goroutine 之前增加用户通道计数 (wg.Add(1))。当你读完输入通道jobs后,调用wg.Done()(可能是通过更早的defer,但我在最后展示了它)。

思考这个问题的关键是wg 计算此时可以写入通道的活动 goroutine 的数量。只有当 no 个 goroutine 打算再写时,它才会变为零。 这样可以安全地关闭通道。


考虑使用更简单(但未经测试):

func (p *pipe) Process(in chan interface) (out chan interface) 
    out = make(chan interface)
    var wg sync.WaitGroup
    go func() 
        defer close(out)
        for j := range in 
            wg.Add(1)
            go func(j Job) 
                res := doSomethingWith(j)
                out <- res
                wg.Done()
            (j)
        
        wg.Wait()
    ()
    return out

您现在有一个 goroutine 正在尽可能快地读取 in 频道,并在运行时分拆作业。每个传入的工作都会得到一个 goroutine,除非他们提前完成工作。没有池,每个作业只有一个工人(与您的代码相同,只是我们淘汰了无用的池)。


或者,由于只有一些可用的 CPU,所以在开始时像以前一样分拆一些 goroutine,但让每个 goroutine 运行 一个 作业以完成,并交付其结果,然后继续阅读下一份工作:

func (p *pipe) Process(in chan interface) (out chan interface) 
    out = make(chan interface)
    go func() 
        defer close(out)
        var wg sync.WaitGroup
        ncpu := runtime.NumCPU()  // or something fancier if you like
        wg.Add(ncpu)
        for i := 0; i < ncpu; i++ 
            go func() 
                defer wg.Done()
                for j := range in 
                    out <- doSomethingWith(j)
                
            ()
        
        wg.Wait()
    
    return out

通过使用runtime.NumCPU(),我们获得的工作人员读取作业的数量与运行作业的 CPU 数量一样多。这些是池,它们一次只做一项工作。

如果输出通道读取器结构良好(即不会导致管道阻塞),通常不需要缓冲输出通道。如果不是,则此处的缓冲深度会限制您可以“提前”完成多少工作,而不是使用结果的人。根据“提前工作”执行此操作的有用程度来设置它 - 不一定是 CPU 的数量,或预期作业的数量,或其他任何东西。

【讨论】:

澄清一下,我并不是要为每个工作分配一个工人。这个想法是,一旦一个工作人员完成了它的工作,它可以立即从工作队列中获取另一个工作。

以上是关于关闭和发送到通道之间的竞争条件的主要内容,如果未能解决你的问题,请参考以下文章

接收器关闭并在尝试通过通道发送时返回 SendError

将结构从一个进程发送到另一个进程的最简单方法是啥? [关闭]

为 Windows 应用程序实现 webrtc 数据通道 [关闭]

Android 中的两个并发 AsyncTasks - 如何处理竞争条件?

Netty 关闭/解除绑定服务器通道

如何关闭多个 goroutine 正在发送的通道?