当涉及多个通道时,选择如何工作?
Posted
技术标签:
【中文标题】当涉及多个通道时,选择如何工作?【英文标题】:How does select work when multiple channels are involved? 【发布时间】:2018-05-18 15:40:29 【问题描述】:我在多个非缓冲通道上使用 select 时发现
select
case <- chana:
case <- chanb:
即使两个通道都有数据,但是在处理这个选择时, case chana 和 case chanb 的呼叫不平衡。
package main
import (
"fmt"
_ "net/http/pprof"
"sync"
"time"
)
func main()
chana := make(chan int)
chanb := make(chan int)
go func()
for i := 0; i < 1000; i++
chana <- 100 * i
()
go func()
for i := 0; i < 1000; i++
chanb <- i
()
time.Sleep(time.Microsecond * 300)
acount := 0
bcount := 0
wg := sync.WaitGroup
wg.Add(1)
go func()
for
select
case <-chana:
acount++
case <-chanb:
bcount++
if acount == 1000 || bcount == 1000
fmt.Println("finish one acount, bcount", acount, bcount)
break
wg.Done()
()
wg.Wait()
运行这个demo,当一个chana,chanb完成读/写时,另一个可能还剩下999-1。
有什么方法可以保证平衡吗?
找到相关主题golang-channels-select-statement
【问题讨论】:
select
不保证它会选择哪个频道。 select
实际上与“平衡”相反。如果您想平均分配 - 从一个通道读取,然后从另一个通道读取,根本没有 select
。
如果你的任何 goroutine 真的在做任何工作,你就不会有任何公平问题。
@zerkms 是的,在runtime.selectgo
中找到,实际上它通过使用fastrand
的排序结果检查每个案例。
【参考方案1】:
Go select
声明不偏向于任何(就绪)案例。引用规范:
如果一个或多个通信可以继续,则通过统一伪随机选择选择一个可以继续的通信。否则,如果存在默认情况,则选择该情况。如果没有默认情况,“select”语句会阻塞,直到至少有一个通信可以继续。
如果可以进行多个通信,则随机选择一个。这不是一个完美的随机分布,规范也不能保证,但它是随机的。
您所体验的是 Go Playground 具有 GOMAXPROCS=1
(which you can verify here) 和 goroutine 调度程序没有抢占式的结果。这意味着默认情况下 goroutine 不会并行执行。如果遇到阻塞操作(例如从网络读取,或尝试在阻塞的通道上接收或发送),则将 goroutine 置于停放状态,而另一个准备运行的程序继续运行。
而且由于您的代码中没有阻塞操作,goroutines 可能不会被放置,它可能只有一个“生产者”goroutines 会运行,而另一个可能不会被调度(永远)。
在GOMAXPROCS=4
的本地计算机上运行您的代码,我得到了非常“真实”的结果。运行几次,输出:
finish one acount, bcount 1000 901
finish one acount, bcount 1000 335
finish one acount, bcount 1000 872
finish one acount, bcount 427 1000
如果您需要优先处理单个案例,请查看此答案:Force priority of go select statement
select
的默认行为不保证同等优先级,但平均而言会接近。如果您需要保证同等优先级,那么您不应该使用select
,但您可以从 2 个通道执行 2 个非阻塞接收序列,如下所示:
for
select
case <-chana:
acount++
default:
select
case <-chanb:
bcount++
default:
if acount == 1000 || bcount == 1000
fmt.Println("finish one acount, bcount", acount, bcount)
break
如果两个提供值,上述 2 个非阻塞接收将以相同的速度(具有相同的优先级)耗尽 2 个通道,如果一个不提供,则持续接收另一个通道,而不会延迟或阻塞。
对此需要注意的一点是,如果 没有 通道提供任何要接收的值,这将基本上是一个“忙碌”循环,因此会消耗计算能力。为了避免这种情况,我们可能会检测到没有一个通道准备好,然后然后对两个接收使用select
语句,然后将阻塞直到其中一个准备好接收,不浪费任何 CPU 资源:
for
received := 0
select
case <-chana:
acount++
received++
default:
select
case <-chanb:
bcount++
received++
default:
if received == 0
select
case <-chana:
acount++
case <-chanb:
bcount++
if acount == 1000 || bcount == 1000
fmt.Println("finish one acount, bcount", acount, bcount)
break
有关 goroutine 调度的更多详细信息,请参阅以下问题:
Number of threads used by Go runtime
Goroutines 8kb and windows OS thread 1 mb
Why does it not create many threads when many goroutines are blocked in writing file in golang?
【讨论】:
您的示例是否会浪费更多的 CPU 时间来执行for
循环的空迭代,而通过通道的数据更加稀疏?所以基本上一个goroutine总是使用一个CPU来1)检查chana - 空2)选择默认3)检查chanb - 空4)选择默认5)评估if - false。一遍又一遍地而不是与其他 goroutine 共享那个 CPU 来做有用的工作?
@aiguy 不错的收获!你是对的,谢谢你指出这一点。我也为这种情况添加了一个解决方案。请参阅编辑后的答案。
@icza 如果我从 2 个频道中选择 ch1
、ch2
并且两者都有要同时阅读的元素。 go 例程会选择一个 case 来执行,比如ch2
。我的问题是:ch1
中的可用元素是否会被拉出?
@MạnhQuyếtNguyễn 不,不会,只会执行选择的通信操作。【参考方案2】:
正如评论中提到的,如果你想确保平衡,你可以在阅读 goroutine 中完全放弃使用select
并依赖无缓冲通道提供的同步:
go func()
for
<-chana
acount++
<-chanb
bcount++
if acount == 1000 || bcount == 1000
fmt.Println("finish one acount, bcount", acount, bcount)
break
wg.Done()
()
【讨论】:
【参考方案3】:已编辑:您也可以从供应方面进行平衡,但@icza 的回答对我来说似乎是比这更好的选择,并且还解释了首先导致这种情况的调度。令人惊讶的是,即使在我的(虚拟)机器上也是片面的。
这里有一些东西可以平衡供应方面的两个例程(不知何故似乎在 Playground 上不起作用)。
package main
import (
"fmt"
_ "net/http/pprof"
"sync"
"sync/atomic"
"time"
)
func main()
chana := make(chan int)
chanb := make(chan int)
var balanceSwitch int32
go func()
for i := 0; i < 1000; i++
for atomic.LoadInt32(&balanceSwitch) != 0
fmt.Println("Holding R1")
time.Sleep(time.Nanosecond * 1)
chana <- 100 * i
fmt.Println("R1: Sent i", i)
atomic.StoreInt32(&balanceSwitch, 1)
()
go func()
for i := 0; i < 1000; i++
for atomic.LoadInt32(&balanceSwitch) != 1
fmt.Println("Holding R2")
time.Sleep(time.Nanosecond * 1)
chanb <- i
fmt.Println("R2: Sent i", i)
atomic.StoreInt32(&balanceSwitch, 0)
()
time.Sleep(time.Microsecond * 300)
acount := 0
bcount := 0
wg := sync.WaitGroup
wg.Add(1)
go func()
for
select
case <-chana:
acount++
case <-chanb:
bcount++
fmt.Println("Acount Bcount", acount, bcount)
if acount == 1000 || bcount == 1000
fmt.Println("finish one acount, bcount", acount, bcount)
break
wg.Done()
()
wg.Wait()
通过更改atomic.LoadInt32(&balanceSwitch) != XX
和atomic.StoreInt32(&balanceSwitch, X)
或其他机制,您可以将其映射到任意数量的例程。这可能不是最好的做法,但如果这是一个要求,那么您可能必须考虑这些选项。希望这会有所帮助。
【讨论】:
虚拟机通常分配有 1 个 VCPU,这导致GOMAXPROCS
默认为 1,这可以解释您遇到的“单向”效应。您可以使用fmt.Println(runtime.GOMAXPROCS(0))
代码进行验证。
好的。我试过上面的代码。实际上,我分配了 2 个 VCPU。它在我的系统上看起来仍然是片面的。这些是 acount, bcount 的数字:19 1000, 243 1000, 1000 1, 1000 38, 1000 635, 262 1000, 1 1000, 1000 51, 1 1000, 1000 98, 1 1000, 245 1000, 1 1000 >
由于示例中有 3 个 goroutines(2 个生产者和 1 个消费者)在工作,只要可用 CPU 小于 3,则可以用相同的推理来解释副作用。在我的示例中,我有 4 个 CPU,因此没有经历过。以上是关于当涉及多个通道时,选择如何工作?的主要内容,如果未能解决你的问题,请参考以下文章