Golang 基础:原生并发 goroutine channel 和 select 常见使用场景
Posted 拭心
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Golang 基础:原生并发 goroutine channel 和 select 常见使用场景相关的知识,希望对你有一定的参考价值。
本文为极客时间 Go 语言第一课 相关章节学习笔记及思考。
文章目录
Go 语言之父 Rob Pike 的经典名言:“不要通过共享内存来通信,应该通过通信来共享内存(Don’t communicate by sharing memory, share memory by communicating)
C/C++ 线程的复杂性:
线程退出时要考虑新创建的线程是否要与主线程分离(detach)
还是需要主线程等待子线程终止(join)并获取其终止状态?
又或者是否需要在新线程中设置取消点(cancel point)来保证被主线程取消(cancel)的时候能顺利退出
goroutine 是由 Go 运行时(runtime)负责调度的、轻量的用户级线程。
优势:
- 占用内存小,goroutine 初始栈只有 2k,比 Linux 线程小多了
- 用户态调度,不需要内核介入,代价更小
- 一退出就会被回收
- 提供 channel 通信
无论是 Go 自身运行时代码还是用户层 Go 代码,都无一例外地运行在 goroutine 中。
goroutine
调用函数、方法(具名、匿名、闭包都可以)时,前面加上 go
关键字,就会创建一个 goroutine。
goroutine 调度原理
Goroutine 调度器的任务:将 Goroutine 按照一定算法放到不同的操作系统线程中去执行。
演进:
- G-M 模型(废弃):将 G(Goroutine) 调度到 M(Machine) 上运行
- G-P-M 模型(使用中):增加中间层 P(Processor),提供队列管理多个 G,然后在合适的时候绑定 M。先后使用协作式、抢占式调度。
- NUMA 调度模型(尚未实现)
图片来自:https://time.geekbang.org/column/article/476643
- G:存储 Goroutine 的执行信息,包括:栈、状态
- P:逻辑处理器,有一个待调度的 G 队列
- M:真正的计算资源,Go 代码运行的真实载体(用户态线程),要执行 G 需要绑定 P,绑定后会从 P 的本地队列和全局队列获取 G 然后执行
//src/runtime/runtime2.go
type g struct
stack stack // offset known to runtime/cgo
sched gobuf
goid int64
gopc uintptr // pc of go statement that created this goroutine
startpc uintptr // pc of goroutine function
... ...
type p struct
lock mutex
id int32
status uint32 // one of pidle/prunning/...
mcache *mcache
racectx uintptr
// Queue of runnable goroutines. Accessed without lock.
runqhead uint32
runqtail uint32
runq [256]guintptr
runnext guintptr
// Available G's (status == Gdead)
gfree *g
gfreecnt int32
... ...
type m struct
g0 *g // goroutine with scheduling stack
mstartfn func()
curg *g // current running goroutine
... ...
从 Go 1.2 以后,Go 调度器使用 G-P-M 模型,调度目标:公平地将 G 调度到 P 上运行。
调度策略:
- 常规执行,G 运行超出时间片后抢占调度
- G 阻塞在 channel 或者 I/O 上时,会被放置到等待队列,M 会尝试运行 P 的下一个可运行 G;当 G 可运行时,会被唤醒并修改状态,然后放到某个 P 的队列中,等待被绑定 M、执行
- G 阻塞在 syscall 上时,执行 G 的 M 也会受影响,会解绑 P、进入挂起状态;syscall 返回后,G 会尝试获取可用的 P,没获取到的话,修改状态,等待被运行
如果一个 G 任务运行 10ms,sysmon 就会认为它的运行时间太久而发出抢占式调度的请求。
一旦 G 的抢占标志位被设为 true,那么等到这个 G 下一次调用函数或方法时,运行时就可以将 G 抢占并移出运行状态,放入队列中,等待下一次被调度。
// $GOROOT/src/runtime/proc.go
// forcePreemptNS is the time slice given to a G before it is
// preempted.
const forcePreemptNS = 10 * 1000 * 1000 // 10ms
channel
和线程一样,一个应用内部启动的所有 goroutine 共享进程空间的资源,如果多个 goroutine 访问同一块内存数据,将会存在竞争。
Go 提供了 channel 作为 goroutine 之间的通信方式,goroutine 可以从 channel 读取数据,处理后再把数据写到 channel 中。
channel 是和 切片、map 类似的复合类型,使用时需要指定具体的类型:
c := make(chan int) //c 是一个 int 类型的 channel
和函数一样,channel 也是“第一公民” 身份,可以做变量、参数、返回值等。
func spawn(f func() error) <-chan error
c := make(chan error)
go func()
c <- f()
()
return c
func main()
c := spawn(func() error
time.Sleep(2 * time.Second)
return errors.New("timeout")
)
fmt.Println(<-c)
main goroutine 与子 goroutine 之间建立了一个元素类型为 error 的 channel,子 goroutine 退出时,会将它执行的函数的错误返回值写入这个 channel,main goroutine 可以通过读取 channel 的值来获取子 goroutine 的退出状态。
channel 的不同类型
通过 make
可以创建 2 种类型的 channel:
- 无缓冲:读写是同步进行,没有对接人的话会一直阻塞着
- 有缓冲:有数据时读不会阻塞;未满时写数据不会阻塞
下面是无 buffer channel 的测试例子:
func testNoBufferChannel()
var c chan int = make(chan int) //无缓冲,同步进行,没有对接人,就会阻塞住
//var c chan int = make(chan int, 5) //有缓冲,容量为 5
//大多数时候,读写要在不同 goroutine,尤其是无缓冲 channel
go func()
fmt.Println("goroutine run")
b := <-c //读取 channel
fmt.Println("read from channel: ", b)
()
fmt.Println("main goroutine before write")
c <- 1 //没有 buffer,写入 channel 时会阻塞,直到有读取
fmt.Println("main goroutine finish")
运行结果:
main goroutine before write
goroutine run
read from channel: 1
main goroutine finish
和预期一致,主 goroutine 在写入无 buffer 的 channel 时会阻塞,直到 子 goroutine 读取。
下面是有 buffer channel 的测试例子:
func testBufferChannel()
c := make(chan int, 1) //有缓冲,容量为 5
//大多数时候,读写要在不同 goroutine,尤其是无缓冲 channel
go func()
fmt.Println("child_goroutine run")
b := <-c //读取 channel,有数据时不会阻塞
fmt.Println("child_goroutine read from channel: ", b)
()
fmt.Println("main goroutine before write first")
c <- 1 //有 buffer,写入 channel 时不会阻塞,除非满了
fmt.Println("main_goroutine first write finish, len:", len(c))
fmt.Println("main_goroutine write second:")
c <-2
fmt.Println("main_goroutine finish, len:", len(c))
time.Sleep( 3 * time.Second) //不加这个子 goroutine 没执行就退出了
运行结果:
main goroutine before write first
main_goroutine first write finish, len: 1
main_goroutine write second:
child_goroutine run
child_goroutine read from channel: 1
main_goroutine finish, len: 1
可以看到
- 第一次写完立刻就返回;第二次写时,因为这个 goroutine 已经满了,所以阻塞在写上
- 子 goroutine 读取了一次,主 goroutine 才从写上返回
作为参数的单向类型
- 只发送, chan<-
- 只接收, <-chan
func testSingleDirectionChannel()
f := func(a chan<- int, b <- chan int) //a 是只能写入,b 是只能读取
x := <- a //编译报错:Invalid operation: <- a (receive from send-only type chan<- int)
b <- 2 //编译报错:nvalid operation: b <- 2 (send to receive-only type <-chan int)
通常只发送 channel 类型和只接收 channel 类型,会被用作函数的参数类型或返回值,用于限制对 channel 内的操作,或者是明确可对 channel 进行的操作的类型
普通channel,可以传入函数作为只发送或只接收类型
关闭 channel
close(channel) 后,不同语句的结果:
func testCloseChannel()
a := make(chan int)
close(a) //先关闭,然后看下几种读取关闭 channel 的结果
b := <- a
fmt.Println("关闭后直接读取:", b) //0
c, ok := <-a
fmt.Println("关闭后通过逗号 ok 读取:", c, ok) //0 false
for v := range a //关闭的话直接跳过
fmt.Println("关闭后通过 for-range 读取", v)
通过“comma, ok” 我们可以知道 channel 是否被关闭。
一般由发送端负责关闭 channel,原因:
- 向一个关闭的 channel 中发送数据,会 panic (⚠️注意了!!!)
- 发送端没有办法判断 channel 是否已经关闭。
len(channel)
当 ch 为无缓冲 channel 时,len(ch) 总是返回 0;当 ch 为带缓冲 channel 时,len(ch) 返回当前 channel ch 中尚未被读取的元素个数。
如果只是想知道 channel 中是否有数据、不想阻塞,可以使用 len(channel) 先做检查:
nil channel
默认读取一个关闭的 channel,会返回零值。但是读取一个 nil channel,操作将阻塞。
所以在有些场景下,可能需要手动修改 channel 为 nil,以实现阻塞的效果,比如在 select 语句中。
无缓冲 channel 的常见用途 🔥
Go 语言倡导:
Do not communicate by sharing memory; instead, share memory by communicating.
不要通过共享内存来通信,而是通过通信来共享内存
多 goroutine 通信:信号
基于无 buffer channel,可以实现一对一和一对多的信号传递。
1.一对一
type signal struct
//接收一个函数,在子 routine 里执行,然后返回一个 channel,用于主 routine 等待
func spawn(f func()) <-chan signal
c := make(chan signal)
go func()
fmt.Println("exec f in child_routine");
f();
fmt.Println("f exec finished, write to channel")
c<- signal
()
return c
//测试使用无 buffer channel 实现信号
func testUseNonBufferChannelImplSignal()
//模拟主 routine 等待子 routine
worker := func()
fmt.Println("do some work")
time.Sleep(3 * time.Second)
fmt.Println("start a worker...")
c := spawn(worker)
fmt.Println("spawn finished, read channel...")
<-c //读取,阻塞等待
fmt.Println("worker finished")
上面的代码中,主 routine 创建了一个函数,然后在子 routine 中执行,主 routine 阻塞在一个 channel 上,等待子 routine 完成后继续执行。
执行结果:
start a worker...
spawn finished, read channel...
exec f in child_routine
do some work
f exec finished, write to channel
worker finished
可以看到,这样的确实现了类似“信号”的机制:在一个 routine 中通知另一个 routine。
如果 channel 的类型复杂些,就可以传递任意数据了!
struct 大小是0,不占内存
2.一对多
关闭一个无 buffer channel 会让所有阻塞在这个 channel 上的 read 操作返回,基于此我们可以实现 1 对 n 的“广播”机制。
var waitGroup sync.WaitGroup
func spawnGroup(f func(ind int), count int, groupSignal chan struct) <-chan signal
c := make(chan signal) //用于让主 routine 阻塞的 channel
waitGroup.Add(count) //等待总数
//创建 n 个 goroutine
for i := 0; i < count; i++
go func(index int)
<- groupSignal //读取阻塞,等待通知执行
//fmt.Println("exec f in child_routine, index: ", i);
//⚠️注意上面注释的代码,这里不能直接访问 for 循环的 i,因为这个是复用的,会导致访问的值不是目标值
fmt.Println("exec f in child_routine, index: ", index);
f(index);
fmt.Println(index , " exec finished, write to channel")
waitGroup.Done()
(i + 1)
//创建通知主 routine 结束的 routine,不能阻塞当前函数
go func()
//需要同步等待所有子 routine 执行完
waitGroup.Wait()
c <- signal //写入数据
()
return c
func testUseNonBufferChannelImplGroupSignal()
worker := func(i int)
fmt.Println("do some work, index ", i)
time.Sleep(3 * time.Second)
groupSignal := make(chan struct)
c := spawnGroup(worker, 5, groupSignal)
fmt.Println("main routine: close channel")
close(groupSignal) //通知刚创建的所有 routine
fmt.Println("main routine: read channel...")
<- c //阻塞在这里
fmt.Println("main routine: all worker finished")
上面的代码做了这些事:
- 创建 channelA,传递给多个 goroutine
- 子 routine 中读取等待这个 channelA
- 主 routine 关闭 channel,然后阻塞在 channelB 上,此时所有子 routine 开始执行
- 所有子 routine 执行完后,通过 channelB 唤醒主 routine
运行结果:
main routine: close channel
main routine: read channel
exec f in child_routine, index: 2
do some work, index 2
exec f in child_routine, index: 1
do some work, index 1
exec f in child_routine, index: 3
do some work, index 3
exec f in child_routine, index: 4
do some work, index 4
exec f in child_routine, index: 5
do some work, index 5
4 exec finished, write to channel
5 exec finished, write to channel
3 exec finished, write to channel
1 exec finished, write to channel
2 exec finished, write to channel
main routine: all worker finished
一句话总结:
用 2 个 channel 实现了 【主 routine 通知所有子 routine 开始】 和【子 routine 通知主 routine 任务结束】。
多 goroutine 同步:通过阻塞,替代锁
type NewCounter struct
c chan int
i int
func CreateNewCounter() *NewCounter
counter := &NewCounter
c: make(chan int),
i: 0,
go func()
for
counter.i ++
counter.c <- counter.i //每次加一,阻塞在这里
()
return counter
func (c *NewCounter)Increase() int
return <- c.c //读取到的值,是上一次加一
//多协程并发增加计数,通过 channel 写入阻塞,读取时加一
func testCounterWithChannel()
fmt.Println("\\ntestCounterWithChannel ->>>")
group := sync.WaitGroup
counter := CreateNewCounter()
for i:=0 ; i<10 ; i++
group.Add(1)
go func(i int)
count := counter.Increase()
fmt.Printf("Goroutine-%d, count %d \\n", i, count)
(i)
group.Wait()
上面的代码中,我们创建了一个单独的协程,在其中循环增加计数,但每次加一后,就会尝试写入 channel(无 buffer 的),在没有人读取时,会阻塞在这个方法上。
然后在 10 个协程里并发读取 channel,从而实现每次读取递增。
带缓冲 channel 的常见用途 🔥
消息队列
channel 的特性符合对消息队列的要求:
- 跨 goroutine 访问安全
- FIFO
- 可设置容量
- 异步收发
Go 支持 channel 的初衷是将它作为 Goroutine 间的通信手段,它并不是专门用于消息队列场景的。
如果你的项目需要专业消息队列的功能特性,比如支持优先级、支持权重、支持离线持久化等,那么 channel 就不合适了,可以使用第三方的专业的消息队列实现。
计数信号量
由于带 buffer channel 的特性(容量满时写入会阻塞),可以用它的容量表示同时最大并发数量。
下面是一个例子:
var active = make(chan struct, 3) //"信号量",最多 3 个
var jobs = make(chan int, 10)
//使用带缓存的 channel,容量就是信号量的大小
func testSemaphoreWithBufferChannel()
//先写入数据,用作表示任务
go func()
for i:= 0; i < 9; i++
jobs <- i + 1
close(jobs)
()
var wg sync.WaitGroup
for j := range jobs
wg.Add(1)
//执行任务
go func(i int)
//通知开始执行,当容量用完时,阻塞
active <- struct
//fmt.Println("exec job ", i)
log.Printf("exec job: %d, length of active: %d \\n", i, len(active))
time.Sleep(2 * time.Second)
//执行完,通知结束
<- active
wg.Done()
(j)
wg.Wait()
上面的代码中,我们用 channel jobs 表示要执行的任务(这里为 8 个),然后用 channel active 表示信号量(最多三个)。
然后在 8 个 goroutine 里执行任务,每个任务耗时 2s。在每次执行任务前,先写入 channel 表示获取信号量;执行完后读取,表示释放信号量。
由于信号量最多三个,所以同一时刻最多能有 3 个任务得以执行。
运行结果如下,符合预期:
2022/04/20 19:14:26 exec job: 1, length of active: 1
2022/04/20 19:14:26 exec job: 9, length of active: 2
2022/04/20 19:14:26 exec job: 5, length of active: 3
2022/04/20 19:14:28 exec job: 6, length of active: 3
2022/04/20 19:14:28 exec job: 7, length of active: 3
2022/04/20 19:14:28 exec job: 8, length of active: 3
2022/04/20 19:14:30 exec job: 3, length of active: 3
2022/04/20 19:14:30 exec job: 2, length of active: 3
2022/04/20 19:14:30 exec job: 4, length of active: 3
select
当需要在一个 goroutine 同时读/写多个 channel 时,可以使用 select:
类似 Linux 的 I/O 多路复用思路,我们可以叫它:goroutine 多路复用。
func testSelect()
channelA := make(chan int)
channelB := make(chan int)
go func()
var readA bool
var readB bool
for
select
case x := <- channelA:
fmt.Println("child_routine: read from channelA:", x)
readA = true
case y := <- channelB:
fmt.Println("child_routine: read from channelB:", y)
readB = true
//default:
// //其他 case 阻塞,就执行 default
// fmt.Println("default")
if readA && readB
fmt.Println("child_goroutine finish")
return;
else
fmt.Println("child_goroutine still loop, ", readA, readB)
()
fmt.Println("main goroutine")
time.Sleep(2 * time.Second)
fmt.Println("main goroutine, write to channelA")
channelA <- 111
fmt.Println("main goroutine, write to channelA finish")
time.Sleep(1 * time.Second)
fmt.Println("main goroutine, write to channelB")
channelB <- 111
fmt.Println("main goroutine, write to channelB finish")
time.Sleep( 5 * time.Second)
fmt.Println("main goroutinefinish")
输出:
main goroutine
main goroutine, write to channelA
main goroutine, write to channelA finish
child_routine: read from channelA: 111
child_goroutine still loop, true false
main goroutine, write to channelB
main goroutine, write to channelB finish
child_routine: read from channelB: 111
child_goroutine finish
main goroutinefinish
可以看到:
- 使用 select 在一个 goroutine 里读取了 2 个 channel
- 这 2 个 case 里的 channel 都不可读时,select 阻塞,只会执行 default,不会执行 select 代码块以外的
- 主 goroutine 写入数据后,select 的其中一个 case 返回,然后继续执行 select 后面的逻辑
- 下一轮循环后 2 个 case 都不可读,继续阻塞
- 然后主 goroutine 写入后,另外一个 case 也返回,循环结束
channel 与 select 结合的常见用途 🔥
利用 default 分支避免阻塞
select 的 default 分支语义:当所有 case 语句里读/写 channel 阻塞时,会执行 default!
无论 channel 是否有 buffer。
有些时候,我们可能不希望阻塞在写入 channel 上,那可以利用 select default 的特性,这样封装一个函数,当写入阻塞时,返回一个 false,让外界可以处理阻塞的情况:
func tryWriteChannel(c chan<- int, value int) bool
select
case c <- value
return true
default: //其他没就绪时,会执行
return false
这样使用:
//active <- 1 //之前直接写 channel,如果满了,就会阻塞
writed := tryWriteChannel(active, 1) //改成这样,可以在阻塞时,处理相关逻辑
if !writed
log.Println("failed to write channel")
return
实现超时
假如我们想在一个 channel 的读/写操作上加一个超时逻辑,可以通过这样实现:
在 select 代码块中,加一个 case,这个 case 会在超时后执行,这样会结束其他 case。
比如这样:
func tryGetSemaphore(c chan<- struct) bool
select
case c <- struct :
return true
case <- time.After(1 * time.Second): //在写 channel 的基础上,额外加一个情况,超时情况
log.Println("timeout!!!")
//1s 后返回,可以在这里做超时处理
return true
及时调用 timer 的 Stop 方法回收 Timer 资源。
心跳机制
循环执行一个额外的 case,这个 case 会定时返回。
func worker()
heartbeat := time.NewTicker(30 * time.Second)
defer heartbeat.Stop()
for
select
case <-c:
// ... do some stuff
case <- heartbeat.C:
//... do heartbeat stuff
time.NewTicker
会创建一个定时执行的心跳,可以把这个 ticker channel 读取的操作放到一个 case 里,这样 select 代码块就会定时执行一次。
ticker 也要及时 Stop。
总结
本文介绍了 Golang 中通过 goroutine channel 和 select 实现并发操作的一些典型场景。
可以看到,通过 goroutine 实现并发是如此的简单;通过 channel 无 buffer 和有 buffer,实现一些 goroutine 同步机制也比较方便;结合 select,实现 goroutine 的统一管理。
在学习一门语言时,既要结合已有的语言知识,也要吸收新语言的设计思想。
需要记住的是,Go 提倡通过 CSP 模型(communicating sequential processes 通信顺序进程)进行通信,而不是传统语言的共享内存方式。
CSP:两个独立的并发实体通过共享 channel(管道)进行通信的并发模型。
我们在遇到多 goroutine 通信、同步的情况,可以尽量多使本文的内容进行处理。
不过对于某些情况,也可以使用 go 提供的 sync 包下的内容,进行局部同步。下篇文章我们就来看看这些内容。
对于局部情况,比如涉及性能敏感的区域或需要保护的结构体数据时,我们可以使用更为高效的低级同步原语(如 mutex),保证 goroutine 对数据的同步访问。
以上是关于Golang 基础:原生并发 goroutine channel 和 select 常见使用场景的主要内容,如果未能解决你的问题,请参考以下文章
golang goroutine例子[golang并发代码片段]