当涉及多个通道时,选择如何工作?

Posted

技术标签:

【中文标题】当涉及多个通道时,选择如何工作?【英文标题】:How does select work when multiple channels are involved? 【发布时间】:2018-05-18 15:40:29 【问题描述】:

我在多个非缓冲通道上使用 select 时发现

select 
case <- chana:
case <- chanb:

即使两个通道都有数据,但是在处理这个选择时, case chana 和 case chanb 的呼叫不平衡。

package main

import (
    "fmt"
    _ "net/http/pprof"
    "sync"
    "time"
)

func main() 
    chana := make(chan int)
    chanb := make(chan int)

    go func() 
        for i := 0; i < 1000; i++ 
            chana <- 100 * i
        
    ()

    go func() 
        for i := 0; i < 1000; i++ 
            chanb <- i
        
    ()

    time.Sleep(time.Microsecond * 300)

    acount := 0
    bcount := 0
    wg := sync.WaitGroup
    wg.Add(1)
    go func() 
        for 
            select 
            case <-chana:
                acount++
            case <-chanb:
                bcount++
            
            if acount == 1000 || bcount == 1000 
                fmt.Println("finish one acount, bcount", acount, bcount)
                break
            
        
        wg.Done()
    ()

    wg.Wait()

运行这个demo,当一个chana,chanb完成读/写时,另一个可能还剩下999-1。

有什么方法可以保证平衡吗?

找到相关主题golang-channels-select-statement

【问题讨论】:

select 不保证它会选择哪个频道。 select 实际上与“平衡”相反。如果您想平均分配 - 从一个通道读取,然后从另一个通道读取,根本没有 select 如果你的任何 goroutine 真的在做任何工作,你就不会有任何公平问题。 @zerkms 是的,在runtime.selectgo 中找到,实际上它通过使用fastrand 的排序结果检查每个案例。 【参考方案1】:

Go select 声明不偏向于任何(就绪)案例。引用规范:

如果一个或多个通信可以继续,则通过统一伪随机选择选择一个可以继续的通信。否则,如果存在默认情况,则选择该情况。如果没有默认情况,“select”语句会阻塞,直到至少有一个通信可以继续。

如果可以进行多个通信,则随机选择一个。这不是一个完美的随机分布,规范也不能保证,但它是随机的。

您所体验的是 Go Playground 具有 GOMAXPROCS=1 (which you can verify here) 和 goroutine 调度程序没有抢占式的结果。这意味着默认情况下 goroutine 不会并行执行。如果遇到阻塞操作(例如从网络读取,或尝试在阻塞的通道上接收或发送),则将 goroutine 置于停放状态,而另一个准备运行的程序继续运行。

而且由于您的代码中没有阻塞操作,goroutines 可能不会被放置,它可能只有一个“生产者”goroutines 会运行,而另一个可能不会被调度(永远)。

GOMAXPROCS=4 的本地计算机上运行您的代码,我得到了非常“真实”的结果。运行几次,输出:

finish one acount, bcount 1000 901
finish one acount, bcount 1000 335
finish one acount, bcount 1000 872
finish one acount, bcount 427 1000

如果您需要优先处理单个案例,请查看此答案:Force priority of go select statement

select 的默认行为不保证同等优先级,但平均而言会接近。如果您需要保证同等优先级,那么您不应该使用select,但您可以从 2 个通道执行 2 个非阻塞接收序列,如下所示:

for 
    select 
    case <-chana:
        acount++
    default:
    
    select 
    case <-chanb:
        bcount++
    default:
    
    if acount == 1000 || bcount == 1000 
        fmt.Println("finish one acount, bcount", acount, bcount)
        break
    

如果两个提供值,上述 2 个非阻塞接收将以相同的速度(具有相同的优先级)耗尽 2 个通道,如果一个不提供,则持续接收另一个通道,而不会延迟或阻塞。

对此需要注意的一点是,如果 没有 通道提供任何要接收的值,这将基本上是一个“忙碌”循环,因此会消耗计算能力。为了避免这种情况,我们可能会检测到没有一个通道准备好,然后然后对两个接收使用select 语句,然后将阻塞直到其中一个准备好接收,不浪费任何 CPU 资源:

for 
    received := 0
    select 
    case <-chana:
        acount++
        received++
    default:
    
    select 
    case <-chanb:
        bcount++
        received++
    default:
    

    if received == 0 
        select 
        case <-chana:
            acount++
        case <-chanb:
            bcount++
        
    

    if acount == 1000 || bcount == 1000 
        fmt.Println("finish one acount, bcount", acount, bcount)
        break
    

有关 goroutine 调度的更多详细信息,请参阅以下问题:

Number of threads used by Go runtime

Goroutines 8kb and windows OS thread 1 mb

Why does it not create many threads when many goroutines are blocked in writing file in golang?

【讨论】:

您的示例是否会浪费更多的 CPU 时间来执行 for 循环的空迭代,而通过通道的数据更加稀疏?所以基本上一个goroutine总是使用一个CPU来1)检查chana - 空2)选择默认3)检查chanb - 空4)选择默认5)评估if - false。一遍又一遍地而不是与其他 goroutine 共享那个 CPU 来做有用的工作? @aiguy 不错的收获!你是对的,谢谢你指出这一点。我也为这种情况添加了一个解决方案。请参阅编辑后的答案。 @icza 如果我从 2 个频道中选择 ch1ch2 并且两者都有要同时阅读的元素。 go 例程会选择一个 case 来执行,比如ch2。我的问题是:ch1 中的可用元素是否会被拉出? @MạnhQuyếtNguyễn 不,不会,只会执行选择的通信操作。【参考方案2】:

正如评论中提到的,如果你想确保平衡,你可以在阅读 goroutine 中完全放弃使用select 并依赖无缓冲通道提供的同步:

go func() 
    for 
        <-chana
        acount++
        <-chanb
        bcount++

        if acount == 1000 || bcount == 1000 
            fmt.Println("finish one acount, bcount", acount, bcount)
            break
        
    
    wg.Done()
()

【讨论】:

【参考方案3】:

已编辑:您也可以从供应方面进行平衡,但@icza 的回答对我来说似乎是比这更好的选择,并且还解释了首先导致这种情况的调度。令人惊讶的是,即使在我的(虚拟)机器上也是片面的。

这里有一些东西可以平衡供应方面的两个例程(不知何故似乎在 Playground 上不起作用)。

package main

import (
    "fmt"
    _ "net/http/pprof"
    "sync"
    "sync/atomic"
    "time"
)

func main() 
    chana := make(chan int)
    chanb := make(chan int)
    var balanceSwitch int32

    go func() 
        for i := 0; i < 1000; i++ 
            for atomic.LoadInt32(&balanceSwitch) != 0 
                fmt.Println("Holding R1")
                time.Sleep(time.Nanosecond * 1)
            
            chana <- 100 * i
            fmt.Println("R1: Sent i", i)
            atomic.StoreInt32(&balanceSwitch, 1)

        
    ()

    go func() 
        for i := 0; i < 1000; i++ 

            for atomic.LoadInt32(&balanceSwitch) != 1 
                fmt.Println("Holding R2")
                time.Sleep(time.Nanosecond * 1)
            
            chanb <- i
            fmt.Println("R2: Sent i", i)
            atomic.StoreInt32(&balanceSwitch, 0)

        
    ()

    time.Sleep(time.Microsecond * 300)

    acount := 0
    bcount := 0
    wg := sync.WaitGroup
    wg.Add(1)
    go func() 
        for 
            select 
            case <-chana:
                acount++
            case <-chanb:
                bcount++
            
            fmt.Println("Acount Bcount", acount, bcount)
            if acount == 1000 || bcount == 1000 
                fmt.Println("finish one acount, bcount", acount, bcount)
                break
            
        
        wg.Done()
    ()

    wg.Wait()

通过更改atomic.LoadInt32(&amp;balanceSwitch) != XXatomic.StoreInt32(&amp;balanceSwitch, X) 或其他机制,您可以将其映射到任意数量的例程。这可能不是最好的做法,但如果这是一个要求,那么您可能必须考虑这些选项。希望这会有所帮助。

【讨论】:

虚拟机通常分配有 1 个 VCPU,这导致 GOMAXPROCS 默认为 1,这可以解释您遇到的“单向”效应。您可以使用fmt.Println(runtime.GOMAXPROCS(0)) 代码进行验证。 好的。我试过上面的代码。实际上,我分配了 2 个 VCPU。它在我的系统上看起来仍然是片面的。这些是 acount, bcount 的数字:19 1000, 243 1000, 1000 1, 1000 38, 1000 635, 262 1000, 1 1000, 1000 51, 1 1000, 1000 98, 1 1000, 245 1000, 1 1000 > 由于示例中有 3 个 goroutines(2 个生产者和 1 个消费者)在工作,只要可用 CPU 小于 3,则可以用相同的推理来解释副作用。在我的示例中,我有 4 个 CPU,因此没有经历过。

以上是关于当涉及多个通道时,选择如何工作?的主要内容,如果未能解决你的问题,请参考以下文章

Java NIO Selector

它是如何工作的? - 查询中的多个子选择

Java NIO系列教程 Selector

如何连接多个音频输出通道以与 PyAudio 一起使用?

如何编写涉及多个选择的 Oracle SQL 查询

VHDL:当涉及时钟信号时,多路复用器输出不跟随输入