浅谈 golang channel
Posted 点融黑帮
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了浅谈 golang channel相关的知识,希望对你有一定的参考价值。
golang(后文简称go)在语言层面提供了goroutine这种并发机制,goroutine底层基于I/O复用并可以有效利用多核CPU,多线程/多进程的应用意味着我们需要处理进程通信,常见的IPC方式包括共享内存和消息传递,两种方式都有各自的应用场景,而这两种方式在go中都有相应的实现,go为前者提供了sync包用于同步操作,为后者实现了channel用于goroutine之间的通信,下面将简单介绍go channel的基本用法。
channel 分为无缓冲(unbuffered)和缓冲的(buffered),语法上的区别在于声明时的长度,并且默认是可读可写的,关键字chan后面跟上相应的类型表示仅可读写此类型的值,另外加上箭头表示只读或只写。
c0 := make(chan int) // unbuffered c1 := make(chan int, 0) // unbuffered c2 := make(chan string, 1) // buffered c3 := make(<-chan int) // can only read c4 := make(chan<- int) // can only write c5 := make(chan int) // can write and read
使用箭头 <- 来读写一个channel,无缓冲的channel 常用于同步的场景,当向一个无缓冲的channel 发送消息,另一端没有人接收,或者当从一个无缓冲的channel 接收消息,另一端没有人发送时,当前goroutine都会阻塞。下面是一个简单的例子,主goroutine会在接收c的时候阻塞,直到匿名函数中向c写入一个0。
c := make(chan int)
go func(ch chan int) { ch <- 0 }(c) <-c
也可以使用range读取一个channel,循环会持续到channel关闭
c := make(chan int)
go func(ch chan int) { c <- 1 close(c) }(c)
for i := range c { fmt.Println(i) }
而缓冲的channel只有当缓冲区满或者空,写入和读取的时候才会发生阻塞,所以缓冲的channel可以当做简单的队列来使用
c := make(chan int, 10)
c <- 1
c <- 2
fmt.Println(<-c, <-c)
如果以上代码稍作更改,在channel为空时再读取一次,则会发生死锁,代码如下
c := make(chan int, 10)
c <- 1
c <- 2
fmt.Println(<-c, <-c,<-c)
而运行错误信息为
fatal error: all goroutines are asleep - deadlock!
死锁的原因在于channel的另一端没有goroutine写入了,当缓冲的channel为空时,主goroutine会一直阻塞在读取,同时也没有其他goroutine可以调度。
go提供了一个内置的方法close()用于关闭一个channel,需要注意的是:
只能关闭一个双向或者可写的channel。
对于同一个channel,多次调用close(),会导致panic。
对一个已关闭的channel写数据,会导致panic。
从一个已关闭的channel中读数据,不会panic,会读到channel对应类型的0值,比如int为0,bool就为false,但是这样无法确定读取到的是否是正确的数据,所以一般会使用channel返回的第二个可选参数来判断channel是否关闭。
c := make(chan int)
close(c)
if _, ok := <-c; !ok {
fmt.Println("channel closed")
}
鉴于向一个已关闭的channel发送数据会导致panic,所以一般由发送者调用close()关闭channel,因为发送数据的一方清楚什么时候应该停止数据的写入;同时,在channel关闭之后,所有因为读取这个channel而阻塞的goroutine会立即往后执行,利用这一点可以实现简单的广播。
stopCh := make(chan struct{})
for i := 0; i < 5; i++ {
go func(i int) {
<-stopCh
fmt.Printf("goroutine %d stopped\n", i)
}(i)
}
close(stopCh)
time.Sleep(time.Second * 1)
以上代码在主goroutine中关闭channel,其他goroutine会立即退出
goroutine 2 stopped
goroutine 4 stopped
goroutine 1 stopped
goroutine 0 stopped
goroutine 3 stopped
需要注意的一种情况是只声明未赋值的channel,即 nil channel,close 一个nil channel会导致panic,而读写一个nil channel会永久阻塞。
从一个无缓冲的channel读取数据会阻塞,如果需要从多个channel读取数据呢?这个时候就需要配合select关键词使用
select关键字的灵感来源与unix中的I/O多路复用函数select(现已被epoll、kqueue等替代),unix中的select函数监听多个文件描述符,当select返回时,会得到可读或可写的描述符集合,这种技术实现了在一个线程内处理多个套接字连接。
在go中使用select - case 可以在多个channel上监听读写事件,某个case产生了读写事件时,则执行相应case中的代码
下面例子中第二个case从c1读取,c1另一端有一个goroutine写入,所以执行第二个case中的代码
c1 := make(chan int)
c2 := make(chan int)
go func() {
c1<-1
}()
select {
case c2 <- 1:
fmt.Println("write to c2")
case <-c1:
fmt.Println("read from c1")
}
在实际的应用场景中,需要循环对多个channel中的某一个读写数据,比如服务器编程中,经常使用for - select的方式循环检测多个channel的事件,下面是一个简单的tcp服务的例子。
在主goroutine中使用for select 循环检测msg和sig的事件,并优先匹配系统信号的事件。
func handleConn(conn net.Conn, msg chan string) {
defer conn.Close()
io.WriteString(conn,"hello")
msg <- conn.RemoteAddr().String()
}
func main() {
msg := make(chan string)
sig := make(chan os.Signal)
signal.Notify(sig, os.Interrupt)
go func() {
l, err := net.Listen("tcp", ":8000")
if err != nil {
panic(err)
}
for {
conn, err := l.Accept()
if err != nil {
if netErr, ok := err.(net.Error); ok && netErr.Temporary() {
continue
} else {
panic(err)
}
}
go handleConn(conn, msg)
}
}()
for {
select {
case <-sig:
fmt.Println("exit")
os.Exit(0)
case addr := <-msg:
fmt.Println(addr)
}
}
}
在web开发的时候,可以很容易地使用关键字go将长耗时的任务放到其他goroutine中执行,比如
http.HandleFunc("/", func(rw http.ResponseWriter, r *http.Request) { //long time task
go func() { //...
}()
rw.WriteHeader(http.StatusOK)
})
但是这样做的缺点也很明显:无法控制并发数量,标准库的http server在一个单独的goroutine中处理每一个用户请求,每个请求又可能会创建1到多个goroutine,如果某一段时间用户请求量很高,会导致服务器在短时间内创建大量的goroutine,在高并发的场景中,这样做是会影响性能的,这个时候,我们希望用一些手段来控制并发数量,比较常见的做法是使用queue+worker pool的方式,这样后台可以任意调整worker的数量来控制任务的处理速度。
以下是一个简单的worker pool,以一个缓冲的channel作为任务队列,用一个函数来表示相应的任务,worker循环从queue中读取并执行相应的任务,直到收到stop的信号。
type Task func()
type Pool struct {
maxWorkers int
queue chan Task
done chan struct{}
}
func (p Pool) Start() {
for i := 0; i < p.maxWorkers; i++ {
worker := Worker{i}
go worker.Consume(p.queue, p.done)
}
}
func (p Pool) Stop() {
close(p.done)
}
type Worker struct{ id int }
func (w Worker) Consume(queue chan Task, stopCh chan struct{}) {
for {
select {
case <-stopCh:
fmt.Printf("worker %d stopped\n", w.id)
return
case task := <-queue:
task()
}
}
}
func main() {
done := make(chan struct{})
queue := make(chan Task, 100)
for i := 0; i < 100; i++ {
i := i
queue <- func() {
fmt.Println(i)
} //just print something
}
pool := Pool{
maxWorkers: 5,
queue: queue,
done: done,
}
pool.Start() //stop pool after 5 seconds
time.Sleep(time.Second * 5)
pool.Stop()
}
go 1.5之后,GOMAXPROCS被默认设置为CPU的核心数量,goroutine会被调度到多个系统线程之上,这意味着我们可以利用多核CPU的性能做一些并行运算,比如:计算10000个文件的MD5值,生成10W个图片的缩略图等等,在这些场景中,我们可以将任务切分为小块,分派个多个goroutine执行,最后通过channel将结果汇合到一起。
下面是一个使用蒙特卡洛法计算PI近似值的一个例子,基本的思想是利用圆的面积与其外接正方形的面积之比为PI/4,通过产生大量均匀分布的二维点,计算落在圆和正方形内的点的比例,再乘以4,就可以得到PI的近似值,随着样本数量的增加,结果会越来越接近PI。
首先是一个单核的版本
func MonteCarloPI(samples int) float64 {
inside := 0 //indicates point is inside the circle
r := rand.New(rand.NewSource(time.Now().UnixNano()))
for i := 0; i < samples; i++ {
x := r.Float64()
y := r.Float64()
if x*x+y*y < 1 {
inside++
}
}
return float64(inside) / float64(samples) * 4}
func main() {
fmt.Println(MonteCarloPI(10000000))
fmt.Println(MonteCarloPI(100000000))
fmt.Println(math.Pi)
}
结果如下,前两个分别为一千万和一亿样本时的结果,最后一行为go标准库math中的PI
3.1417284 3.1416146 3.141592653589793...
当样本数量为1亿时,耗时已经达到了数秒钟。以下例子取CPU核心数作为worker的数量,每个goroutine计算出一个PI的近似值,通过results这个channel汇集goroutine计算的结果,最后求平均值。
func MonteCarloMultiCore(samples int) float64 {
workers := runtime.NumCPU()
results := make(chan float64, workers)
threadSamples := samples / workers
for i := 0; i < workers; i++ {
go func() {
results<-MonteCarloPI(threadSamples)
}()
}
var total float64
for i := 0; i < workers; i++ {
total += <-results
}
return total / float64(workers)
}
在10亿的样本情况下,单核和多核的结果如下,可以看到提升是很明显的
MonteCarloPI 27.233842675s
MonteCarloMultiCore 5.598302862s
更多关于channel的资料,可以参考如下链接
Effective go
(https://golang.org/doc/effective_go.html#concurrency)
Go concurrency patterns
(https://blog.golang.org/pipelines)
Concurrency is not parallelism
(https://blog.golang.org/concurrency-is-not-parallelism)
以上是关于浅谈 golang channel的主要内容,如果未能解决你的问题,请参考以下文章
golang 片段7 for https://medium.com/@francesc/why-are-there-nil-channels-in-go-9877cc0b2308
golang goroutine例子[golang并发代码片段]