Golang 基础:原生并发 goroutine channel 和 select 常见使用场景

Posted 拭心

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Golang 基础:原生并发 goroutine channel 和 select 常见使用场景相关的知识,希望对你有一定的参考价值。

本文为极客时间 Go 语言第一课 相关章节学习笔记及思考。

文章目录

Go 语言之父 Rob Pike 的经典名言:“不要通过共享内存来通信,应该通过通信来共享内存(Don’t communicate by sharing memory, share memory by communicating)

C/C++ 线程的复杂性:

线程退出时要考虑新创建的线程是否要与主线程分离(detach)
还是需要主线程等待子线程终止(join)并获取其终止状态?
又或者是否需要在新线程中设置取消点(cancel point)来保证被主线程取消(cancel)的时候能顺利退出

goroutine 是由 Go 运行时(runtime)负责调度的、轻量的用户级线程。

优势:

  1. 占用内存小,goroutine 初始栈只有 2k,比 Linux 线程小多了
  2. 用户态调度,不需要内核介入,代价更小
  3. 一退出就会被回收
  4. 提供 channel 通信

无论是 Go 自身运行时代码还是用户层 Go 代码,都无一例外地运行在 goroutine 中。

goroutine

调用函数、方法(具名、匿名、闭包都可以)时,前面加上 go 关键字,就会创建一个 goroutine。

goroutine 调度原理

Goroutine 调度器的任务:将 Goroutine 按照一定算法放到不同的操作系统线程中去执行。

演进:

  • G-M 模型(废弃):将 G(Goroutine) 调度到 M(Machine) 上运行
  • G-P-M 模型(使用中):增加中间层 P(Processor),提供队列管理多个 G,然后在合适的时候绑定 M。先后使用协作式、抢占式调度。
  • NUMA 调度模型(尚未实现)

图片来自:https://time.geekbang.org/column/article/476643

  • G:存储 Goroutine 的执行信息,包括:栈、状态
  • P:逻辑处理器,有一个待调度的 G 队列
  • M:真正的计算资源,Go 代码运行的真实载体(用户态线程),要执行 G 需要绑定 P,绑定后会从 P 的本地队列和全局队列获取 G 然后执行

//src/runtime/runtime2.go
type g struct 
    stack      stack   // offset known to runtime/cgo
    sched      gobuf
    goid       int64
    gopc       uintptr // pc of go statement that created this goroutine
    startpc    uintptr // pc of goroutine function
    ... ...


type p struct 
    lock mutex

    id          int32
    status      uint32 // one of pidle/prunning/...
  
    mcache      *mcache
    racectx     uintptr

    // Queue of runnable goroutines. Accessed without lock.
    runqhead uint32
    runqtail uint32
    runq     [256]guintptr

    runnext guintptr

    // Available G's (status == Gdead)
    gfree    *g
    gfreecnt int32

    ... ...




type m struct 
    g0            *g     // goroutine with scheduling stack
    mstartfn      func()
    curg          *g     // current running goroutine
    ... ...

从 Go 1.2 以后,Go 调度器使用 G-P-M 模型,调度目标:公平地将 G 调度到 P 上运行。

调度策略:

  1. 常规执行,G 运行超出时间片后抢占调度
  2. G 阻塞在 channel 或者 I/O 上时,会被放置到等待队列,M 会尝试运行 P 的下一个可运行 G;当 G 可运行时,会被唤醒并修改状态,然后放到某个 P 的队列中,等待被绑定 M、执行
  3. G 阻塞在 syscall 上时,执行 G 的 M 也会受影响,会解绑 P、进入挂起状态;syscall 返回后,G 会尝试获取可用的 P,没获取到的话,修改状态,等待被运行

如果一个 G 任务运行 10ms,sysmon 就会认为它的运行时间太久而发出抢占式调度的请求。
一旦 G 的抢占标志位被设为 true,那么等到这个 G 下一次调用函数或方法时,运行时就可以将 G 抢占并移出运行状态,放入队列中,等待下一次被调度。

// $GOROOT/src/runtime/proc.go


// forcePreemptNS is the time slice given to a G before it is
// preempted.
const forcePreemptNS = 10 * 1000 * 1000 // 10ms

channel

和线程一样,一个应用内部启动的所有 goroutine 共享进程空间的资源,如果多个 goroutine 访问同一块内存数据,将会存在竞争。

Go 提供了 channel 作为 goroutine 之间的通信方式,goroutine 可以从 channel 读取数据,处理后再把数据写到 channel 中。

channel 是和 切片、map 类似的复合类型,使用时需要指定具体的类型:

c := make(chan int) //c 是一个 int 类型的 channel

和函数一样,channel 也是“第一公民” 身份,可以做变量、参数、返回值等。


func spawn(f func() error) <-chan error 
    c := make(chan error)

    go func() 
        c <- f()
    ()

    return c


func main() 
    c := spawn(func() error 
        time.Sleep(2 * time.Second)
        return errors.New("timeout")
    )
    fmt.Println(<-c)

main goroutine 与子 goroutine 之间建立了一个元素类型为 error 的 channel,子 goroutine 退出时,会将它执行的函数的错误返回值写入这个 channel,main goroutine 可以通过读取 channel 的值来获取子 goroutine 的退出状态。

channel 的不同类型

通过 make 可以创建 2 种类型的 channel:

  1. 无缓冲:读写是同步进行,没有对接人的话会一直阻塞着
  2. 有缓冲:有数据时读不会阻塞;未满时写数据不会阻塞

下面是无 buffer channel 的测试例子:

func testNoBufferChannel() 
    var c chan int = make(chan int) //无缓冲,同步进行,没有对接人,就会阻塞住
    //var c chan int = make(chan int, 5)    //有缓冲,容量为 5

    //大多数时候,读写要在不同 goroutine,尤其是无缓冲 channel
    go func() 
        fmt.Println("goroutine run")
        b := <-c //读取 channel
        fmt.Println("read from channel: ", b)
    ()

    fmt.Println("main goroutine before write")
    c <- 1  //没有 buffer,写入 channel 时会阻塞,直到有读取
    fmt.Println("main goroutine finish")

运行结果:

main goroutine before write
goroutine run
read from channel:  1
main goroutine finish

和预期一致,主 goroutine 在写入无 buffer 的 channel 时会阻塞,直到 子 goroutine 读取。

下面是有 buffer channel 的测试例子:

func testBufferChannel() 
    c := make(chan int, 1)  //有缓冲,容量为 5

    //大多数时候,读写要在不同 goroutine,尤其是无缓冲 channel
    go func() 
        fmt.Println("child_goroutine run")
        b := <-c //读取 channel,有数据时不会阻塞
        fmt.Println("child_goroutine read from channel: ", b)
    ()

    fmt.Println("main goroutine before write first")
    c <- 1  //有 buffer,写入 channel 时不会阻塞,除非满了
    fmt.Println("main_goroutine first write finish, len:", len(c))

    fmt.Println("main_goroutine write second:")
    c <-2
    fmt.Println("main_goroutine finish, len:", len(c))

    time.Sleep( 3 * time.Second)    //不加这个子 goroutine 没执行就退出了

运行结果:

main goroutine before write first
main_goroutine first write finish, len: 1
main_goroutine write second:
child_goroutine run
child_goroutine read from channel:  1
main_goroutine finish, len: 1

可以看到

  1. 第一次写完立刻就返回;第二次写时,因为这个 goroutine 已经满了,所以阻塞在写上
  2. 子 goroutine 读取了一次,主 goroutine 才从写上返回

作为参数的单向类型

  1. 只发送, chan<-
  2. 只接收, <-chan
func testSingleDirectionChannel() 

    f := func(a chan<- int, b <- chan int)     //a 是只能写入,b 是只能读取
        x := <- a   //编译报错:Invalid operation: <- a (receive from send-only type chan<- int)
        b <- 2      //编译报错:nvalid operation: b <- 2 (send to receive-only type <-chan int)
    

通常只发送 channel 类型和只接收 channel 类型,会被用作函数的参数类型或返回值,用于限制对 channel 内的操作,或者是明确可对 channel 进行的操作的类型

普通channel,可以传入函数作为只发送或只接收类型

关闭 channel

close(channel) 后,不同语句的结果:

func testCloseChannel() 
    a := make(chan int)
    close(a)    //先关闭,然后看下几种读取关闭 channel 的结果
    b := <- a
    fmt.Println("关闭后直接读取:", b)  //0
    c, ok := <-a
    fmt.Println("关闭后通过逗号 ok 读取:", c, ok)    //0 false

    for v := range a   //关闭的话直接跳过
        fmt.Println("关闭后通过 for-range 读取", v)
    

通过“comma, ok” 我们可以知道 channel 是否被关闭。

一般由发送端负责关闭 channel,原因:

  1. 向一个关闭的 channel 中发送数据,会 panic (⚠️注意了!!!)
  2. 发送端没有办法判断 channel 是否已经关闭。

len(channel)

当 ch 为无缓冲 channel 时,len(ch) 总是返回 0;当 ch 为带缓冲 channel 时,len(ch) 返回当前 channel ch 中尚未被读取的元素个数。

如果只是想知道 channel 中是否有数据、不想阻塞,可以使用 len(channel) 先做检查:

nil channel

默认读取一个关闭的 channel,会返回零值。但是读取一个 nil channel,操作将阻塞。

所以在有些场景下,可能需要手动修改 channel 为 nil,以实现阻塞的效果,比如在 select 语句中。

无缓冲 channel 的常见用途 🔥

Go 语言倡导:

Do not communicate by sharing memory; instead, share memory by communicating.
不要通过共享内存来通信,而是通过通信来共享内存

多 goroutine 通信:信号

基于无 buffer channel,可以实现一对一和一对多的信号传递。

1.一对一

type signal struct

//接收一个函数,在子 routine 里执行,然后返回一个 channel,用于主 routine 等待
func spawn(f func()) <-chan signal 
    c := make(chan signal)
    go func() 
        fmt.Println("exec f in child_routine");
        f();
        fmt.Println("f exec finished, write to channel")
        c<- signal
    ()
    return c


//测试使用无 buffer channel 实现信号
func testUseNonBufferChannelImplSignal() 
    //模拟主 routine 等待子 routine

    worker := func() 
        fmt.Println("do some work")
        time.Sleep(3 * time.Second)
    

    fmt.Println("start a worker...")
    c := spawn(worker)

    fmt.Println("spawn finished, read channel...")
    <-c //读取,阻塞等待

    fmt.Println("worker finished")

上面的代码中,主 routine 创建了一个函数,然后在子 routine 中执行,主 routine 阻塞在一个 channel 上,等待子 routine 完成后继续执行。

执行结果:

start a worker...
spawn finished, read channel...
exec f in child_routine
do some work
f exec finished, write to channel
worker finished

可以看到,这样的确实现了类似“信号”的机制:在一个 routine 中通知另一个 routine。
如果 channel 的类型复杂些,就可以传递任意数据了!

struct 大小是0,不占内存

2.一对多

关闭一个无 buffer channel 会让所有阻塞在这个 channel 上的 read 操作返回,基于此我们可以实现 1 对 n 的“广播”机制。

var waitGroup sync.WaitGroup

func spawnGroup(f func(ind int), count int, groupSignal chan struct) <-chan signal 
	c := make(chan signal)	//用于让主 routine 阻塞的 channel
	waitGroup.Add(count)	//等待总数

	//创建 n 个 goroutine
	for i := 0; i < count; i++ 
		go func(index int) 
			<- groupSignal	//读取阻塞,等待通知执行

			//fmt.Println("exec f in child_routine, index: ", i);
			//⚠️注意上面注释的代码,这里不能直接访问 for 循环的 i,因为这个是复用的,会导致访问的值不是目标值

			fmt.Println("exec f in child_routine, index: ", index);
			f(index);
			fmt.Println(index , " exec finished, write to channel")

			waitGroup.Done()
		(i + 1)
	

	//创建通知主 routine 结束的 routine,不能阻塞当前函数
	go func() 
		//需要同步等待所有子 routine 执行完
		waitGroup.Wait()
		c <- signal	//写入数据
	()
	return c



func testUseNonBufferChannelImplGroupSignal() 
	worker := func(i int) 
		fmt.Println("do some work, index ", i)
		time.Sleep(3 * time.Second)
	

	groupSignal := make(chan struct)
	c := spawnGroup(worker, 5, groupSignal)

	fmt.Println("main routine: close channel")
	close(groupSignal)	//通知刚创建的所有 routine


	fmt.Println("main routine: read channel...")
	<- c	//阻塞在这里

	fmt.Println("main routine: all worker finished")


上面的代码做了这些事:

  1. 创建 channelA,传递给多个 goroutine
  2. 子 routine 中读取等待这个 channelA
  3. 主 routine 关闭 channel,然后阻塞在 channelB 上,此时所有子 routine 开始执行
  4. 所有子 routine 执行完后,通过 channelB 唤醒主 routine

运行结果:

main routine: close channel
main routine: read channel
exec f in child_routine, index:  2
do some work, index  2
exec f in child_routine, index:  1
do some work, index  1
exec f in child_routine, index:  3
do some work, index  3
exec f in child_routine, index:  4
do some work, index  4
exec f in child_routine, index:  5
do some work, index  5
4  exec finished, write to channel
5  exec finished, write to channel
3  exec finished, write to channel
1  exec finished, write to channel
2  exec finished, write to channel
main routine: all worker finished

一句话总结:
用 2 个 channel 实现了 【主 routine 通知所有子 routine 开始】 和【子 routine 通知主 routine 任务结束】。

多 goroutine 同步:通过阻塞,替代锁

type NewCounter struct 
	c chan int
	i int


func CreateNewCounter() *NewCounter 
	counter := &NewCounter
		c: make(chan int),
		i: 0,
	

	go func() 
		for 
			counter.i ++
			counter.c <- counter.i		//每次加一,阻塞在这里
		
	()

	return counter


func (c *NewCounter)Increase() int 
	return <- c.c		//读取到的值,是上一次加一


//多协程并发增加计数,通过 channel 写入阻塞,读取时加一
func testCounterWithChannel() 
	fmt.Println("\\ntestCounterWithChannel ->>>")

	group := sync.WaitGroup
	counter := CreateNewCounter()

	for i:=0 ; i<10 ; i++ 
		group.Add(1)

		go func(i int) 
			count := counter.Increase()
			fmt.Printf("Goroutine-%d, count %d \\n", i, count)
		(i)
	

	group.Wait()


上面的代码中,我们创建了一个单独的协程,在其中循环增加计数,但每次加一后,就会尝试写入 channel(无 buffer 的),在没有人读取时,会阻塞在这个方法上。

然后在 10 个协程里并发读取 channel,从而实现每次读取递增。

带缓冲 channel 的常见用途 🔥

消息队列

channel 的特性符合对消息队列的要求:

  1. 跨 goroutine 访问安全
  2. FIFO
  3. 可设置容量
  4. 异步收发

Go 支持 channel 的初衷是将它作为 Goroutine 间的通信手段,它并不是专门用于消息队列场景的。
如果你的项目需要专业消息队列的功能特性,比如支持优先级、支持权重、支持离线持久化等,那么 channel 就不合适了,可以使用第三方的专业的消息队列实现。

计数信号量

由于带 buffer channel 的特性(容量满时写入会阻塞),可以用它的容量表示同时最大并发数量。

下面是一个例子:

var active = make(chan struct, 3)	//"信号量",最多 3 个
var jobs = make(chan int, 10)

//使用带缓存的 channel,容量就是信号量的大小
func testSemaphoreWithBufferChannel() 

	//先写入数据,用作表示任务
	go func() 
		for i:= 0; i < 9; i++ 
			jobs <- i + 1
		
		close(jobs)
	()

	var wg sync.WaitGroup

	for j := range jobs 
		wg.Add(1)

		//执行任务
		go func(i int) 
			//通知开始执行,当容量用完时,阻塞
			active <- struct

			//fmt.Println("exec job ", i)
			log.Printf("exec job: %d, length of active: %d \\n", i, len(active))
			time.Sleep(2 * time.Second)

			//执行完,通知结束
			<- active
			wg.Done()

		(j)
	

	wg.Wait()

上面的代码中,我们用 channel jobs 表示要执行的任务(这里为 8 个),然后用 channel active 表示信号量(最多三个)。

然后在 8 个 goroutine 里执行任务,每个任务耗时 2s。在每次执行任务前,先写入 channel 表示获取信号量;执行完后读取,表示释放信号量。

由于信号量最多三个,所以同一时刻最多能有 3 个任务得以执行。

运行结果如下,符合预期:

2022/04/20 19:14:26 exec job: 1, length of active: 1 
2022/04/20 19:14:26 exec job: 9, length of active: 2 
2022/04/20 19:14:26 exec job: 5, length of active: 3 
2022/04/20 19:14:28 exec job: 6, length of active: 3 
2022/04/20 19:14:28 exec job: 7, length of active: 3 
2022/04/20 19:14:28 exec job: 8, length of active: 3 
2022/04/20 19:14:30 exec job: 3, length of active: 3 
2022/04/20 19:14:30 exec job: 2, length of active: 3 
2022/04/20 19:14:30 exec job: 4, length of active: 3 

select

当需要在一个 goroutine 同时读/写多个 channel 时,可以使用 select:

类似 Linux 的 I/O 多路复用思路,我们可以叫它:goroutine 多路复用。


func testSelect() 
    channelA := make(chan int)
    channelB := make(chan int)

    go func() 
        var readA bool
        var readB bool

        for 
            select 
            case x := <- channelA:
                fmt.Println("child_routine: read from channelA:", x)
                readA = true
            case y := <- channelB:
                fmt.Println("child_routine:  read from channelB:", y)
                readB = true
            //default:
            //  //其他 case 阻塞,就执行 default
            //  fmt.Println("default")
            

            if readA && readB 

                fmt.Println("child_goroutine finish")
                return;
             else 
                fmt.Println("child_goroutine still loop, ", readA, readB)
            
        
    ()

    fmt.Println("main goroutine")

    time.Sleep(2 * time.Second)

    fmt.Println("main goroutine, write to channelA")
    channelA <- 111
    fmt.Println("main goroutine, write to channelA finish")

    time.Sleep(1 * time.Second)

    fmt.Println("main goroutine, write to channelB")
    channelB <- 111
    fmt.Println("main goroutine, write to channelB finish")

    time.Sleep( 5 * time.Second)
    fmt.Println("main goroutinefinish")

输出:

main goroutine
main goroutine, write to channelA
main goroutine, write to channelA finish
child_routine: read from channelA: 111
child_goroutine still loop,  true false
main goroutine, write to channelB
main goroutine, write to channelB finish
child_routine:  read from channelB: 111
child_goroutine finish
main goroutinefinish

可以看到:

  1. 使用 select 在一个 goroutine 里读取了 2 个 channel
  2. 这 2 个 case 里的 channel 都不可读时,select 阻塞,只会执行 default,不会执行 select 代码块以外的
  3. 主 goroutine 写入数据后,select 的其中一个 case 返回,然后继续执行 select 后面的逻辑
  4. 下一轮循环后 2 个 case 都不可读,继续阻塞
  5. 然后主 goroutine 写入后,另外一个 case 也返回,循环结束

channel 与 select 结合的常见用途 🔥

利用 default 分支避免阻塞

select 的 default 分支语义:当所有 case 语句里读/写 channel 阻塞时,会执行 default!

无论 channel 是否有 buffer。

有些时候,我们可能不希望阻塞在写入 channel 上,那可以利用 select default 的特性,这样封装一个函数,当写入阻塞时,返回一个 false,让外界可以处理阻塞的情况:

func tryWriteChannel(c chan<- int, value int) bool 
	select 
	case c <- value
		return true
	default:	//其他没就绪时,会执行
		return false
	

这样使用:

			//active <- 1		//之前直接写 channel,如果满了,就会阻塞
			writed := tryWriteChannel(active, 1)	//改成这样,可以在阻塞时,处理相关逻辑
			if !writed 
				log.Println("failed to write channel")
				return
			

实现超时

假如我们想在一个 channel 的读/写操作上加一个超时逻辑,可以通过这样实现:
在 select 代码块中,加一个 case,这个 case 会在超时后执行,这样会结束其他 case。

比如这样:

func tryGetSemaphore(c chan<- struct) bool 
	select 
	case c <- struct :
		return true
	case <- time.After(1 * time.Second):		//在写 channel 的基础上,额外加一个情况,超时情况
		log.Println("timeout!!!")
		//1s 后返回,可以在这里做超时处理
		return true
	

及时调用 timer 的 Stop 方法回收 Timer 资源。

心跳机制

循环执行一个额外的 case,这个 case 会定时返回。


func worker() 
  heartbeat := time.NewTicker(30 * time.Second)
  defer heartbeat.Stop()
  for 
    select 
    case <-c:
      // ... do some stuff
    case <- heartbeat.C:
      //... do heartbeat stuff
    
  

time.NewTicker 会创建一个定时执行的心跳,可以把这个 ticker channel 读取的操作放到一个 case 里,这样 select 代码块就会定时执行一次。

ticker 也要及时 Stop。

总结

本文介绍了 Golang 中通过 goroutine channel 和 select 实现并发操作的一些典型场景。

可以看到,通过 goroutine 实现并发是如此的简单;通过 channel 无 buffer 和有 buffer,实现一些 goroutine 同步机制也比较方便;结合 select,实现 goroutine 的统一管理。

在学习一门语言时,既要结合已有的语言知识,也要吸收新语言的设计思想。

需要记住的是,Go 提倡通过 CSP 模型(communicating sequential processes 通信顺序进程)进行通信,而不是传统语言的共享内存方式。

CSP:两个独立的并发实体通过共享 channel(管道)进行通信的并发模型。

我们在遇到多 goroutine 通信、同步的情况,可以尽量多使本文的内容进行处理。

不过对于某些情况,也可以使用 go 提供的 sync 包下的内容,进行局部同步。下篇文章我们就来看看这些内容。

对于局部情况,比如涉及性能敏感的区域或需要保护的结构体数据时,我们可以使用更为高效的低级同步原语(如 mutex),保证 goroutine 对数据的同步访问。

以上是关于Golang 基础:原生并发 goroutine channel 和 select 常见使用场景的主要内容,如果未能解决你的问题,请参考以下文章

Golang基础_11-并发concurrency

sync:与golang的并发息息相关的包

Golang百万级高并发实例

golang goroutine例子[golang并发代码片段]

GO的并发之道-Goroutine调度原理&Channel详解

golang 碎片整理之 并发