go 中 限制 goroutine 数量以及使用协程池

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了go 中 限制 goroutine 数量以及使用协程池相关的知识,希望对你有一定的参考价值。

参考技术A Golang 开发需要协程池吗

goroutine 创建需要占用一定量的内存,开一个goroutine 只需要少量的内存空间,几KB,这也是golang能实现百万长链的原因.

但在实际中,goroutine 需要正确的关闭,而不是无限创建后,造成goroutine 泄露,进而引发系统崩溃

网友实现的方案: https://github.com/remeh/sizedwaitgroup

使用:

有些第三方库实现了协程池,同样也可以达到限制协程数量的目的,比如:

七天入门Go语言 通道 & Goroutine | 第四天 并发编程

1. 前言

在go社区有这样一句话

不要通过共享内存来通信,而是通过通信来共享内存。


go官方是建议使用管道通信的方式来进行并发。

通道 是用于协程间交流的通信载体。严格地来说,通道就是数据传输的管道,数据通过这根管道被 “传入” 或被 “读出”。 因此协程可以发送数据到通道中,而另一个协程可以从该通道中读取数据。

在这里就要引入一个新名词:协程
将线程再细分为多个协程,比如说是一条流水线上的多人协作。那么就可以减少各个线程内部的等待时间。

2. 通道简介

Go 提供一个 chan 关键词去创建一个通道。一个通道只能传入一种类型的数据,其他的数据类型不允许被传输。

将线程再分成更细的协程,使得中间等待时候更少,提高效率!

2.1 声明

package main

import "fmt"

func main(){
    var channel chan int //声明了一个可以传入 int 类型数据的通道 channel 。
    fmt.Println(channel)  
    //程序会打印nil, 因为通道的 0 值是 nil。
}

一个 nil 通道是没有用的。你不能向它传递数据或者读取数据。
因此,我们必须使用 make 函数器创建一个可以使用的通道。

package main

import "fmt"

func main(){
    channel := make(chan int) 
    //声明了一个可以传入 int 类型数据的通道 channel 。
    fmt.Println(channel)  
    //程序会打印channel的地址。 0xc0000180c0
}

它是一个指针内存地址。通道变量默认是一个指针。多数情况下,当你想要和一个协程沟通的时候,你可以给函数或者方法传递一个通道作为参数。当从协程接收到通道参数后,你不需要再对其进行解引用就可以从通道接收或者发送数据。

2.1 读写

Go 语言提供一个非常简洁的左箭头语法 <- 去从通道读写数据。

有变量接受管道值

  channel <- data

上面的代码意味着我们想要把 data 数据推入到通道 channel 中,注意看箭头的指向。
它表明是从 data数据 to到 通道 channel。
因此我们可以当作我们正在把 data 推入到通道 channel。

无变量接受管道值

<- data

这个语句不会把数据传输给任何变量,但是仍然是一个有效的语句。

上面的通道操作默认是阻塞的。

  • 在以前的课程中,我们知道可以使用 time.Sleep 去阻塞一个通道。通道操作本质上是阻塞的。当一些数据被写入通道,对应的协程将阻塞直到有其他的协程可以从此通道接收数据。
  • 通道操作会通知调度器去调度其他的协程,这就是为什么程序不会一直阻塞在一个协程。通道的这些特性在不同的协程沟通的时候非常有用,它避免了我们使用锁或者一些 hack 手段去达到阻塞协程的目的。

2.3 通道详解

2.3.1 例子

package main

import "fmt"

func Rush(c chan string) {
	fmt.Println("Hello "+ <-c + "!")
	// 声明一个函数 greet, 这个函数的参数 c 是一个 string 类型的通道。
	// 在这个函数中,我们从通道 c 中接收数据并打印到控制台上。
}


func main(){
	fmt.Println("Main Start") 
	// main 函数的第一个语句是打印 main start 到控制台。
	channel := make(chan string)
	// 在 main 函数中使用 make 函数创建一个 string 类型的通道赋值给 ‘ channel ’ 变量
	go Rush(channel)    	
	// 把 channel 通道传递给 greet 函数并用 go 关键词以协程方式运行它。
	// 此时,程序有两个协程并且正在调度运行的是 main goroutine 主函数 
	channel <- "DEMO"   	
	// 给通道 channel 传入一个数据 DEMO.
	// 此时主线程将阻塞直到有协程接收这个数据. Go 的调度器开始调度 greet 协程接收通道 channel 的数据 
	fmt.Println("Main Stop")   
	// 然后主线程激活并且执行后面的语句,打印 main stopped
}
/*
Main Start
Hello DEMO!
Main Stop
*/

2.3.2 死锁

当通道读写数据时,所在协程会阻塞并且调度控制权会转移到其他未阻塞的协程。

  • 如果当前协程正在从一个没有任何值的通道中读取数据,那么当前协程会阻塞并且等待其他协程往此通道写入值。
  • 因此,读操作将被阻塞。类似的,如果你发送数据到一个通道,它将阻塞当前协程直到有其他协程从通道中读取数据。此时写操作将阻塞 。

下面是一个主线程在进行通道操作的时候造成死锁的例子

package main

import "fmt"

func main() {
	fmt.Println("main start")
	// main 函数的第一个语句是打印 main start 到控制台。
	channel := make(chan string)
	// 在 main 函数中使用 make 函数创建一个 string 类型的通道赋值给 ‘ channel ’ 变量
	channel <- "GoLang"
	// 给通道 channel 传入一个数据 DEMO.
	// 此时主线程将阻塞直到有协程接收这个数据. Go 的调度器开始调度协程接收通道 channel 的数据
	// 但是由于没有协程接受,没有协程是可被调度的。所有协程都进入休眠状态,即是主程序阻塞了。
	fmt.Println("main stop")
}

/*
报错
main start
fatal error: all goroutines are asleep - deadlock!  //所有协程都进入休眠状态,死锁

goroutine 1 [chan send]:
main.main()
*/

2.3.3 关闭通道

package main

import "fmt"

func RushChan(c chan string) {
	<- c
	fmt.Println("1")
	<- c
	fmt.Println("2")
}

func main() {
	fmt.Println("main start")
	c := make(chan string, 1)
	go RushChan(c)
	c <- "Demo1"
	close(c)
	/*
	不能向一个关了的channel发信息
	main start
	panic: send on closed channel
	*/
	c <- "Demo2"
	//close(c)
	/*
	close 放这里的话可以
	main start
	1
	2
	Main Stop
	*/
	fmt.Println("Main Stop")
}
  • 第一个操作 c <- "Demo2" 将阻塞协程直到有其他协程从此通道中读取数据,因此 greet 会被调度器调度执行。
  • 第一个操作 <-c 是非阻塞的 因为现在通道c有数据可读。
  • 第二个操作 <-c将被阻塞因为通道c已经没数据可读.
  • 此时main协程将被激活并且程序执行close(c)关闭通道操作。

2.3.4 缓冲区

c := make(chan Type, n)
  • 当缓冲区参数不是 0 的时候。协程将不会阻塞除非缓冲区被填满
  • 当缓冲区满了之后,想要再往缓冲区发送数据只有等到有其他协程从缓冲区接收数据, 此时的发送协程是阻塞的。
  • 有一点需要注意, 读缓冲区的操作是渴望式读取,意味着一旦读操作开始它将读取缓冲区所有数据,直到缓冲区为空。
  • 原理上来说读操作的协程将不会阻塞直到缓冲区为空。
package main

import "fmt"

func RushChan(c chan string) {
	for {
		val ,_ := <-c
		fmt.Println(val)
	}
}

func main() {
	fmt.Println("Main Start")
	c := make(chan string, 1)
	go RushChan(c)
	c <- "Demo1" //结果1
	//c <- "Demo2"  //结果2
	fmt.Println("Main Stop")
}
/*
结果1:
Main Start
Main Stop
*/

/*
结果2:
Main Start
Join
Mike
Main Stop
*/

  • 由于这是一个缓冲的通道,当我只有c <- Demo1的时候,这里面只是满了,但是是不会阻塞的。所以子协程接受到了这个数据Demo1,但是由于是非阻塞,所以主线程没有被阻塞,并没有等子协程完成就结束了,结果1就是这样出现了。
  • 当加多一个c <- Demo2 的时候,这时就要等缓冲区空了,也就是等有协程把Demo1读取,所以就会导致主线程阻塞,此时的结果就是结果2了。
package main

import "fmt"

func RushChan(c chan string) {
	for {
		val ,_ := <-c
		fmt.Println(val)
	}
}

func main() {
	c := make(chan int,3)
	c <- 1
	c <- 2
	c <- 3
	close(c)
	for elem := range c {
		fmt.Println(elem)
	}
}
  • 这里虽然关闭了通道,但是其实数据不仅在通道里面,数据还在缓冲区中的,我们依然可以读取到这个数据。

2.3.5 通道的长度和容量

和切片类似,一个缓冲通道也有长度和容量。
通道的长度是其内部缓冲队列未读的数据量,而通道的容量是缓冲区可最大盛放的数据量。
我们可以使用 len 函数去计算通道的长度,使用 cap 函数去获得通道的容量。和切片用法神似

package main

import "fmt"

func RushChan(c chan string) {
	for {
		val ,_ := <-c
		fmt.Println(val)
	}
}

func main() {
	c := make(chan int,3)
	c <- 1
	c <- 2
	fmt.Println("长度: ",len(c))
	fmt.Println(<-c)
	fmt.Println("长度: ",len(c))
	fmt.Println(<-c)
	fmt.Println("长度: ",len(c))
	fmt.Println("容量: ",cap(c))
}
/*
结果:

长度:  2
1
长度:  1
2
长度:  0
容量:  3

*/
  • 这个 c 通道容量为 3,但只盛放了 2 个数据。Go 就不用去阻塞主线程去调度其他协程。
  • 你也可以在主线程中去读取这些数据,因为虽然通道没有放满,也不会阻止你去从通道读取数据。

2.3.6 单向通道

目前为止,我们已经学习到可以双向传递数据的通道,或者说,我们可以对通道做读操作和写操作。但是事实上我们也可以创建单向通道。比如只读通道只允许读操作,只写通道只允许写操作。

单向通道也可以使用 make 函数创建,不过需要额外加一个箭头语法。

roc := make(<-chan int)
soc := make(chan<- int)

在上面的程序中, roc 是一个只读通道,<- 在 chan 关键词前面。 soc is 只写通道,<- 在 chan 关键词后面。 他们也算不同的数据类型。

但是单向通道有什么作用呢 ?

使用单向通道可以 提高程序的类型安全性, 使得程序不容易出错。

但是假如你在一个协程中只需要读操作某通道,但是在主线程中却需要读写操作这个通道该怎么办呢?
幸运的是 Go 提供了一个简单的语法去把双向通道转化为单向通道。

package main

import "fmt"

func greet(roc <-chan string) {
	fmt.Println("Hello " + <-roc ,"!")
}

func main() {
	fmt.Println("Main Start")
	c := make(chan string)
	go greet(c)
	c <- "Demo"
	fmt.Println("Main Stop")
}
/*
结果
Main Start
Hello Demo !
Main Stop
*/

我们修改 greet 协程函数,把参数 c 类型从双向通道改成单向接收通道。

现在我们只能从通道中读取数据,通道上的任何写入操作将会发生错误:
“invalid operation: roc <- “Temp” (send to receive-only type <-chan string)”.

2.3.7 Select

select 和 switch 很像,它不需要输入参数,并且仅仅被使用在通道操作上。
Select 语句被用来执行多个通道操作的一个和其附带的 case 块代码。

原理

让我们来看下面的例子,讨论下其执行原理

package main

import (
	"fmt"
	"time"
)

var start time.Time

func init() {
	start = time.Now()
}

func service1(c chan string) {
	time.Sleep(3 * time.Second)
	c <- "Hello from service 1"
}

func service2(c chan string) {
	time.Sleep(5 * time.Second)
	c <- "Hello from service 2"
}

func main() {
	fmt.Println("main start", time.Since(start))

	chan1 := make(chan string)
	chan2 := make(chan string)

	go service1(chan1)
	go service2(chan2)

	select {
	case res := <-chan1:
		fmt.Println("Response form service 1", res, time.Since(start))
	case res := <-chan2:
		fmt.Println("Response form service 2", res, time.Since(start))
	}

	fmt.Println("main stop ",time.Since(start))
}

/*
结果:
main start 0s
Response form service 1 Hello from service 1 3.0018445s
main stop  3.0019815s
*/

从上面的程序来看,我们知道 select 语句和 switch 很像,不同点是用通道读写操作代替了布尔操作。通道将被阻塞,除非它有默认的 default 块 (之后将介绍)。一旦某个 case 条件执行,它将不阻塞。

所以一个 case 条件什么时候执行呢 ?

如果所有的 case 语句(通道操作)被阻塞,那么 select 语句将阻塞直到这些 case 条件的一个不阻塞(通道操作),case 块执行。
如果有多个 case 块(通道操作)都没有阻塞,那么运行时将随机选择一个不阻塞的 case 块立即执行。

为了演示上面的程序,我们开启两个协程并传入对应的通道变量。然后我们写一个带有两个 case 操作的 select 语句。 一个 case 操作从 chan1 读数据,另外一个从 chan2 读数据。这两个通道都是无缓冲的 , 读操作将被阻塞 。所以 select 语句将阻塞。因此 select 将等待,直到有 case 语句不阻塞。

  • 当程序执行到select语句后,主线程将阻塞并开始调度 service1service2协程。 service1 休眠 3 秒 后未阻塞的把数据写入通道 chan1 与其类似,service2等待 5 秒 后未阻塞的把数据写入通道chan2
  • 因为 service1service2 早一步执行完毕,case 1 将首先调度执行,其他的 cases 块 (这里指 case 2) 将被忽略。 一旦 case 块执行完毕, main 线程将开始继续执行。

所以并没有输出case2的结果

上述程序真实模拟了一个数百万请求的服务器负载均衡的例子,它从多个有效服务中返回其中一个响应。
使用协程,通道和 select 语句,我们可以向多个服务器请求数据并获取其中最快响应的那个。

为了模拟上面哪个 case 块率先返回数据,我们可以直接去掉 Sleep 函数调用。

package main

import (
	"fmt"
	"time"
)

var start time.Time

func init() {
	start = time.Now()
}

func service1(c chan string) {
	c <- "Hello from service 1"
}

func service2(c chan string) {
	c <- "Hello from service 2"
}

func main() {
	fmt.Println("main start", time.Since(start))

	chan1 := make(chan string)
	chan2 := make(chan string)

	go service1(chan1)
	go service2(chan2)

	select {
	case res := <-chan1:
		fmt.Println("Response form service 1", res, time.Since(start))
	case res := <-chan2:
		fmt.Println("Response form service 2", res, time.Since(start))
	}

	fmt.Println("main stop ",time.Since(start))
}

结果一:
main start 0s
Response form service 1 Hello from service 1 539.3µs
main stop 539.3µs
结果二:
main start 0s
Response form service 2 Hello from service 2 0s
main stop 0s

结果一共有2!个不同的结果

为了证明当所有 case 块都是非阻塞的时候,golang 会随机选择一个代码块执行打印 response,我们使用缓冲通道来改造程序。

package main

import (
	"fmt"
	"time"
)

var start time.Time

func init() {
	start = time.Now()
}

func service1(c chan string) {
	c <- "Hello from service 1"
}

func service2(c chan string) {
	c <- "Hello from service 2"
}

func main() {
	fmt.Println("main start", time.Since(start))

	chan1 := make(chan string,2)
	chan2 := make(chan string,2)


	chan1 <- "Value 1"
	chan1 <- "Value 2"

	chan2 <- "Value 1"
	chan2 <- "Value 2"

	select {
	case res := <-chan1:
		fmt.Println("Response form service 1", res, time.Since(start))
	case res := <-chan2:
		fmt.Println("Response form service 2", res, time.Since(start))
	}

	fmt.Println("main stop ",time.Since(start))
}

上述的程序的结果是有不同的

结果一:
main start 0s
Response form service 1 Value 1 496.2µs
main stop 496.2µs
结果二:
main start 0s
Response form service 2 Value 1 0s
main stop 0s

在上面的程序中,两个通道在其缓冲区中都有两个值。因为我们向容量为 2 的缓冲区通道分别发送了两个值,所以这些通道发送操作不会阻塞并且会执行下面的 select 块。 select 块中的所有 case 操作都不会阻塞,因为每个通道中都有两个值,而我们的 case 操作只需要取出其中一个值。因此,go 运行时会随机选择一个 case 操作并执行其中的代码。

2.3.8 default case 块

switch 一样, select 语句也有 default case 块。default case 块 是非阻塞的,不仅如此, default case 块可以使 select 语句永不阻塞,这意味着, 任何通道的 发送 和 接收 操作 (不管是缓冲或者非缓冲) 都不会阻塞当前线程。

如果有 case块的通道操作是非阻塞,那么 select会执行其case 块。如果没有那么 select将默认执行 default块.

package main

import (
	"fmt"
	"time"
)

var start time.Time

func init() {
	start = time.Now()
}

func service1(c chan string) {
	c <- "Hello from service 1"
}

func service2(c chan string) {
	c <- "Hello from service 2"
}

func main() {
	fmt.Println("main start", time.Since(start))

	chan1 := make(chan string)
	chan2 := make(chan string)

	go service1(chan1)
	go service2(chan2)

	select {
	case res := <-chan1:
		fmt.Println("Response form service 1", res, time.Since(start))
	case res := <-chan2:
		fmt.Println("Response form service 2", res, time.Since(start))
	default:
		fmt.Println("No Response received",time.Since(start))
	}

	fmt.Println("main stop ",time.Since(start))
}

/*
结果:
main start 0s
No Response received 0s
main stop  0s
*/
  • 在上面的程序中,因为通道是非缓冲的,case 块的通道操作都是阻塞的,所有 default 块将被执行。

  • 如果上面的 select 语句没有 default 块,select 将阻塞,没有 response 会被打印出来,知道通道变成非阻塞。

  • 如果带有 default, select 将是非阻塞的,调度器将不会从主线程转而调度其他协程。

  • 但是我们可以使用 time.Sleep 改变这一点。 通过这种方式,主线程将把调度权转移到其他协程,在其他协程执行完毕后,调度权从新回到主线程手里。

  • 当主线程重新执行的时候,通道里面已经有值了,case 操作将不会阻塞。

package main

import (
	"fmt"
	"time"
)

var start time.Time

func init() {
	start = time.Now()
}

func service1(c chan string) {
	fmt.Println("service1 start")
	c <- "Hello from service 1"
}

func service2(c chan string) {
	fmt.Println("service2 start")
	c <- "Hello from service 2"
}

func main() {
	fmt.Println("main start", time.Since(start))

	chan1 := make(chan string)
	chan2 := make(chan string)

	go service1(chan1)
	go service2(chan2)

	time.Sleep(3*time.Second)

	select {
	case res := <-chan1:
		fmt.Println("Response form service 1", res, time.Since(start))
	case res := <-chan2:
		fmt.Println("Response form service 2", res, time.Since(start))
	default:
		fmt.Println("No Response received",time.Since(start))
	}

	fmt.Println("main stop ",time.Since(start))
}
/*
结果不唯一。
main start 0s
service2 start
service1 start
Response form service 1 Hello from service 1 3.0006729s
main stop  3.0006729s
*/

2.3.9 空 select

和 for{} 这样的空循环很像,空 select{} 语法也是有效的。但是有一点必须要说明。
我们知道 select 将被阻塞除非有 case 块没有阻塞。因为 select{} 没有 case 非阻塞语句,主线程将阻塞并可能会导致死锁。

package main

import "fmt"

func service() {
	fmt.Println("Hello from service")
}

func main() {
	fmt.Println("main started")
	go<

以上是关于go 中 限制 goroutine 数量以及使用协程池的主要内容,如果未能解决你的问题,请参考以下文章

golang中最大协程数的限制(线程)

推荐很好用的Goroutine连接池

进程线程轻量级进程协程和go中的Goroutine

go语言之行--golang核武器goroutine调度原理channel详解

[转帖]go 的goroutine 以及 channel 的简介.

Go语言——goroutine并发模型