Golang channel源码分析
Posted 非典型程序员
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Golang channel源码分析相关的知识,希望对你有一定的参考价值。
一.简介
channel 是Go中的一等公民类型。channel 是由 Hoare 的 CSP 派生的同步原语之一,是实现CSP并发模型的关键。是用来实现并发同步的工具,channel与goroutine让Go的并发编程的两大基石,让并发编程变得异常简单方便。
当一个资源需要在 goroutine 之间共享时,channel 在 goroutine 之间架起了一个管道,并提供了确保同步交换数据的机制。Go中的channel,可以看作是一个FIFO队列,goroutine可以往chan里面发送和接收数据。随着值的传递,数据的所有权在读写channel的协程间进行转换。
Go语言设计团队的首任负责人Rob Pike对并发编程有一个建议。通道机制就是这种哲学的一个设计结果
Do not communicate by sharing memory; instead, share memory by communicating
不要让计算通过共享内存来通讯,而应该让它们通过通讯来共享内存
Go中除了channel,也支持传统方式,如锁、原子函数、并发数据结构来实现并发同步。Go中不并强制你使用哪种并发同步的机制,使用chan的好处就是可以让代码看起来更加简洁,不容易出错。
channel的本质是值的拷贝
Remember all transfer of value on the go channels happens with the copy of value.
二.数据结构(hchan)
channel的底层数据构对应一个*hchan。数据结构定义如下。
主要是一个缓冲区队列(内存一次分配)、两个goroutine等待队列、是否关闭标识、读写索引、锁等。
channel底层还是通过锁来实现的
//src/runtime/chan.go:hchan
type hchan struct {
/*
chan里的元素个数
*/
qcount uint
/*
缓冲区循环队列的大小
*/
dataqsiz uint
/*
是一个指针
针对带缓冲区的chan,那么这里指向底层循环数组
非缓冲的chan是没有的
*/
buf unsafe.Pointer
/*
chan中单个元素的大小
*/
elemsize uint16
/*
chan是否被关闭
*/
closed uint32
/*
元素的类型
*/
elemtype *_type
/*
已发送元素在循环数组中的索引(相对于底层数组的索引,接收下一个元素的索引)
*/
sendx uint
/*
已接收元素在循环数组中的索引(相对于底层数组的索引,下一个会返回的元素的索引)
*/
recvx uint
/*
等待接收的 goroutine 队列(阻塞的goroutine)
*/
recvq waitq
/*
等待发送的 goroutine 队列(阻塞的goroutine)
*/
sendq waitq
/*
锁
保证chan里面元素的读写都是原子的
*/
lock mutex
}
/*
链表节点--双向链表
waitq 是 sudog 的双向链表
sudog 是 goroutine 的封装
*/
type waitq struct {
/*
头节点
*/
first *sudog
/*
尾节点
*/
last *sudog
}
//src/runtime/runtime2.go
/*
读取:sudog.elem 指向 val
val <- ch
发送:sudog.elem 指向 val
ch <- y
*/
type sudog struct {
//....
g *g //阻塞的goroutine
elem unsafe.Pointer //对应的元素
c *hchan //阻塞的channel
}
三.创建chan
// 无缓冲通道
noCacheChan := make(chan TYPE)
// 带缓冲通道
cacheChan := make(chan TYPE, SIZE)
我们一般使用make关键字来创建一个chan,这里其实是一个语法糖,在编译的时候会调用对应的创建channel的方法。虽然有两种不同的创建方式,但最终调用的都是makechan方法,只有size参数有别罢了。
_ = make(chan int)
如上我们创建了一个chan,查看对应的汇编代码,我们发现其实是调用了runtime.makechan方法。
那么在编译的时候具体是怎么转换的呢?
首先编译器会将 make 设置成OMAKE节点,再根据元素类型判断,如果是TCHAN类型,那么将op类型重置为OMAKECHAN,并且设置缓冲区大小参数
// src/cmd/compile/internal/gc/typecheck.go
func typecheck1(n *Node, top int) (res *Node) {
switch n.Op {
case OMAKE:
//...
//...
//...
switch t.Etype {
//...
//...
//...
case TCHAN:
l = nil
if i < len(args) {
//带缓冲区
n.Left = l
} else {
//不带缓冲区
n.Left = nodintconst(0)
}
n.Op = OMAKECHAN
}
n.Type = t
//...
//...
//...
return n
}
然后再将OMAKECHAN类型转换为makechan方法
//src/cmd/compile/internal/gc/walk.go
func walkexpr(n *Node, init *Nodes) *Node {
//...
//...
//...
switch n.Op {
//...
//...
//...
case OMAKECHAN:
//这里执行make方法的转换
size := n.Left
fnname := "makechan64"
argtype := types.Types[TINT64]
if size.Type.IsKind(TIDEAL) || maxintval[size.Type.Etype].Cmp(maxintval[TUINT]) <= 0 {
fnname = "makechan"
argtype = types.Types[TINT]
}
n = mkcall1(chanfn(fnname, 1, n.Type), n.Type, init, typename(n.Type), conv(size, argtype))
//......
return n
}
下面我们看下chan创建的源码。
func makechan(t *chantype, size int64) *hchan
该方法返回一个hchan的一个指针。
/*
chan创建
t:chan的相关类型信息
size:缓冲区大小
创建hchan
如果chan中元素不含有指针,也就是不含有GC感兴趣的指针的时候
*/
func makechan(t *chantype, size int) *hchan {
elem := t.elem
// element单元素大小的校验
if elem.size >= 1<<16 {
throw("makechan: invalid channel element type")
}
if hchanSize%maxAlign != 0 || elem.align > maxAlign {
throw("makechan: bad alignment")
}
//获取需要分配的内存
mem, overflow := math.MulUintptr(elem.size, uintptr(size))
if overflow || mem > maxAlloc-hchanSize || size < 0 {
panic(plainError("makechan: size out of range"))
}
// 校验###############
var c *hchan
// hchan空间开辟
switch {
/*
非缓冲区chan,或者元素大小为0
直接开辟hchanSize大小即可
*/
case mem == 0:
c = (*hchan)(mallocgc(hchanSize, nil, true))
c.buf = c.raceaddr()
/*
chan中元素不包含指针
一次性分配hchan和buf的空间
这种情况,gc不会对 channel 中的元素进行 scan
*/
case elem.ptrdata == 0:
//申请 hchanSize+元素大小*元素个数的连续内存
c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
//buf 指向 hchan 后边的地址
c.buf = add(unsafe.Pointer(c), hchanSize)
default:
/*元素中包含指针
调用了两次分配空间的函数
*/
c = new(hchan)
c.buf = mallocgc(mem, elem, true)
}
//记录单个元素的大小
c.elemsize = uint16(elem.size)
//元素的类型
c.elemtype = elem
//设置缓冲区队列大小
c.dataqsiz = uint(size)
lockInit(&c.lock, lockRankHchan)
if debugChan {
print("makechan: chan=", c, "; elemsize=", elem.size, "; dataqsiz=", size, "\n")
}
return c
}
总结下
无缓冲或者缓冲的元素类型为 struct{} 时,并不会为缓冲区分配内存空间
缓冲的元素结构中不包含指针时,会将 hchan 和 缓冲区buf 放在一块连续的内存空间中,避免GC的扫描
缓冲区的内存空间是在chan创建的时候一次性分配的,并非按需分配等
chan的创建都是调用mallocgc在堆上创建的,GC会回收此空间,也就是无论chan是否被显示的关闭,在没有引用的情况下,就可能被GC回收
四.往chan发送数据
ch:=make(chan int,9999)
ch<-1 //阻塞发送
select {
case ch<-2://非阻塞发送
default:
}
查看汇编代码,发现实际调用了
runtime.makechan、runtime.chansend1两个方法
阻塞发送时,如果缓冲chan已经满了,或者非缓冲chan但是没有成对的接收者时,该goroutine会被挂起
//阻塞发送
func chansend1(c *hchan, elem unsafe.Pointer) {
chansend(c, elem, true, getcallerpc())
}
//select下的非阻塞发送
/*
select {
case c <- v:
... foo
default:
... bar
}
as
if selectnbsend(c, v) {
... foo
} else {
... bar
}
*/
func selectnbsend(c *hchan, elem unsafe.Pointer) (selected bool) {
return chansend(c, elem, false, getcallerpc())
}
无论是阻塞发送还是非阻塞发送,最终都是调用了chansend方法
下面看下发送的源代码
/*
往chan中发送数据
*/
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
/*
如果chan为nil
*/
if c == nil {
//非阻塞,直接返回false,不会阻塞
if !block {
return false
}
//永远挂起
gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
throw("unreachable")
}
/*
快速校验,如果非阻塞写入,并且chan没有关闭且缓冲区已经满了,直接返回
无需下面的加锁操
*/
if !block && c.closed == 0 && full(c) {
return false
}
var t0 int64
if blockprofilerate > 0 {
t0 = cputicks()
}
//加锁
lock(&c.lock)
/*
如果往一个已经关闭的chan中发送数据,直接panic
*/
if c.closed != 0 {
unlock(&c.lock)
panic(plainError("send on closed channel"))
}
/*
从接收队列中找一个goroutine
对应两种case
1.非缓冲队列,有消费者被挂起
2.缓冲chan,且缓冲数据已经被消费干净,且有消费者被挂起
如果找到,那么直接把数据从send goroutine copy到 recv goroutine
*/
if sg := c.recvq.dequeue(); sg != nil {
send(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true
}
//处理缓冲chan并且还有缓冲空间的情况。当前缓存的元素<缓冲队列大小
if c.qcount < c.dataqsiz {
//根据发送游标确认将发送的数据放到对应地址
qp := chanbuf(c, c.sendx)
if raceenabled {
racenotify(c, c.sendx, nil)
}
//将发送的数据放到sendx处
typedmemmove(c.elemtype, qp, ep)
//移动发送游标
c.sendx++
//如果到尾巴了,重置游标
if c.sendx == c.dataqsiz {
c.sendx = 0
}
//缓存个数+1
c.qcount++
//解锁
unlock(&c.lock)
return true
}
//如果是非阻塞的发送,解锁后直接返回false,即未发送成功
if !block {
unlock(&c.lock)
return false
}
//挂起当前线程
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
//保存数据
mysg.elem = ep
mysg.waitlink = nil
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.waiting = mysg
gp.param = nil
//当前goroutine入队
c.sendq.enqueue(mysg)
atomic.Store8(&gp.parkingOnChan, 1)
//挂起当前 goroutine
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)
KeepAlive(ep)
//当前线程被其它接收线程唤醒的后续操作
if mysg != gp.waiting {
throw("G waiting list is corrupted")
}
gp.waiting = nil
gp.activeStackChans = false
closed := !mysg.success
gp.param = nil
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}
mysg.c = nil
releaseSudog(mysg)
if closed {
if c.closed == 0 {
throw("chansend: spurious wakeup")
}
panic(plainError("send on closed channel"))
}
return true
}
func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
if raceenabled {
//如果缓冲队列大小为0
if c.dataqsiz == 0 {
racesync(c, sg)
} else {
racenotify(c, c.recvx, nil)
racenotify(c, c.recvx, sg)
//游标处理
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.sendx = c.recvx
}
}
//sg.elem 指向接收到的值存放的位置,如 val <- ch,指的就是 &val
if sg.elem != nil {
//地址拷贝
sendDirect(c.elemtype, sg, ep)
sg.elem = nil
}
gp := sg.g
unlockf()
gp.param = unsafe.Pointer(sg)
sg.success = true
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
//唤醒其它线程
goready(gp, skip+1)
}
总结下:
往一个nil chan中发送数据,如果是非阻塞发送,直接返回发送失败;如果是阻塞发送,那么永久挂起该goroutine
直正的发送操作是需要加锁解锁的,锁的粒度还是比较大的。为了在某些情况下可以不用加锁,增加了一个判断,如果非阻塞发送&&chan未关闭&&缓冲已经满了,那么直接返回false
加锁后,在真正发送前,会判断该chan是否已经被关闭了,如果往一个已经关闭的chan发数据会panic
如果发送的时候,可以找到被阻塞的读取goroutine,说明缓冲区没有缓冲数据了(非缓冲chan,或者缓冲chan已经空了)那么直接把数据从发送goroutine copy到 读取goroutine中,数据无需经过缓冲区buf来进行中转
如果有缓冲队列中有空间可以放,那么将数据放到对应下标处
五.从chan读取数据
chan中数据读取从反汇编码看调用了runtime.chanrecv1或runtime.selectnbrecv方法
func main() {
ch := make(chan int)
//阻塞读取
<-ch
select {
//非阻塞读取
case <-ch:
default:
}
}
无论何种读取方式最终都是调用了chanrecv方法,received 这个字段来反应 channel 是否被关闭
// <- c 这种形式的阻塞读取
func chanrecv1(c *hchan, elem unsafe.Pointer) {
chanrecv(c, elem, true)
}
// val,ok<-c这种形式阻塞读取
func chanrecv2(c *hchan, elem unsafe.Pointer) (received bool) {
_, received = chanrecv(c, elem, true)
return
}
//
// select {
// case v = <-c:
// ... foo
// default:
// ... bar
// }
//
// as
//
// if selectnbrecv(&v, c) {
// ... foo
// } else {
// ... bar
// }
//
func selectnbrecv(elem unsafe.Pointer, c *hchan) (selected bool) {
selected, _ = chanrecv(c, elem, false)
return
}
//
// select {
// case v, ok = <-c:
// ... foo
// default:
// ... bar
// }
//
// as
//
// if c != nil && selectnbrecv2(&v, &ok, c) {
// ... foo
// } else {
// ... bar
// }
//
func selectnbrecv2(elem unsafe.Pointer, received *bool, c *hchan) (selected bool) {
// TODO(khr): just return 2 values from this function, now that it is in Go.
selected, *received = chanrecv(c, elem, false)
return
}
//最终都是转到了该方法
/*
c:hchan数据结构
ep:返回值的val地址
block:是否阻塞
*/
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool)
下面看一下读取方法的源码
/*
chan读取
该方法在通道c上接收并将接收到的数据写到ep指向的地址
如果ep等于nil则接收到的数据将被忽略。
block是阻塞的意思,如果block等于false,即无需阻塞且没有可用元素,则返回(false, false)。
否则,如果c是关闭的,那么将*ep地址归0,并返回(true, false)。
否则,返回地址写入ep。返回 (true, true)
非空的ep必须指向堆或调用者的栈
*/
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
/*
chan等于nil时
如果非阻塞时,返回false,false
如果姐塞,那么挂起该线程。
但<-ch从chan接收时是一个阻塞的操作,这就是为什么从一个nil chan获取数据的时候会阻塞的原因
*/
if c == nil {
if !block {
return
}
//永久挂起该线程
gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
//永远不会被执行到
throw("unreachable")
}
/*
阻塞接收时,对于未关闭 channel 满足一些条件不需要加锁就可以直接返回
如果非阻接收 且 chan为空时,即此时chan没有数据可以接收
*/
if !block && empty(c) {
//先检查关闭状态,如果chan没有关闭,那么返回false,false
if atomic.Load(&c.closed) == 0 {
return
}
//再次检查如果为空,那么返回true,false
if empty(c) {
if raceenabled {
raceacquire(c.raceaddr())
}
if ep != nil {
typedmemclr(c.elemtype, ep)
}
return true, false
}
}
var t0 int64
if blockprofilerate > 0 {
t0 = cputicks()
}
//加锁
lock(&c.lock)
/*
如果chan已经被关闭 并且元素个数为0
*/
if c.closed != 0 && c.qcount == 0 {
if raceenabled {
raceacquire(c.raceaddr())
}
//解锁
unlock(&c.lock)
/*
从一个已关闭的 channel 执行接收操作,且未忽略返回值的时候
将ep清零,返回零值
*/
if ep != nil {
//根据类型来将对应的内存地址清零
typedmemclr(c.elemtype, ep)
}
//返回
return true, false
}
/*
先看下send队列里有没有可用的发送者,
如果有说明buf是满的,或者是非缓冲的队列,那么直接从发送者来获取数据
如果是非缓冲队列,直接进行内存拷贝,从发送goroutine到接收goroutine
如果buf是满的,接收缓冲队列的头部元素,并将发送者的元素放到循环数组尾部
*/
if sg := c.sendq.dequeue(); sg != nil {
recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true, true
}
/*
没有send队列但是有可以接收的元素
*/
if c.qcount > 0 {
//根据接收到下标从buf中取到对应的地址
qp := chanbuf(c, c.recvx)
if raceenabled {
racenotify(c, c.recvx, nil)
}
/*
如果没有忽略接收值,那么将返回值的地址放到ep中
*/
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
//清除缓冲队列中对应的元素
typedmemclr(c.elemtype, qp)
//接收下标++
c.recvx++
//如果下一个下标等于缓冲区大小,重置下标,因为是一个环状
if c.recvx == c.dataqsiz {
c.recvx = 0
}
//读出一个,就将元素个数减少一个
c.qcount--
//减少一个
unlock(&c.lock)
//返回
return true, true
}
//非阻塞读出,解锁,返回
if !block {
//解锁
unlock(&c.lock)
return false, false
}
// no sender available: block on this channel.
/*
如果没有可用的发送者,也没有可用的缓冲,那么当前线程需要阻塞到该chan上
创建一个sudog
*/
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
//保存ep地址
mysg.elem = ep
mysg.waitlink = nil
gp.waiting = mysg
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.param = nil
//入队,放到接收队列里面
c.recvq.enqueue(mysg)
atomic.Store8(&gp.parkingOnChan, 1)
//挂起当前线程
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2)
if mysg != gp.waiting {
throw("G waiting list is corrupted")
}
//被其它线程唤醒后,继续操作
gp.waiting = nil
gp.activeStackChans = false
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}
success := mysg.success
gp.param = nil
mysg.c = nil
releaseSudog(mysg)
return true, success
}
/*
有可用的发送goroutine时,进行的读取操作
如果是非缓冲队列,直接进行内存拷贝,从发送goroutine到接收goroutine
如果buf是满的,接收缓冲队列的头部元素,并将发送者的元素放到循环数组尾部
*/
func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
//如果循环队列为空,说明是一个非缓冲的队列,
if c.dataqsiz == 0 {
if raceenabled {
racesync(c, sg)
}
if ep != nil {
//从send里面直接拷贝出数据到接收goroutine中
recvDirect(c.elemtype, sg, ep)
}
//此分支是缓冲的chan并且缓冲已经满了
} else {
/*
从队列头部获取数据,当该发送者的数据放到队列尾部,由于队列是满的,头尾对应同一个槽
从获取的下标拿到地址
*/
qp := chanbuf(c, c.recvx)
if raceenabled {
racenotify(c, c.recvx, nil)
racenotify(c, c.recvx, sg)
}
//如果没有忽略返回值,那么copy到ep中
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
//把sender的数据拷贝到缓冲队列的属部
typedmemmove(c.elemtype, qp, sg.elem)
//调整接收游标
c.recvx++
//如果到尾巴了,重置游标
if c.recvx == c.dataqsiz {
c.recvx = 0
}
//重置发送游标
c.sendx = c.recvx
}
sg.elem = nil
gp := sg.g
//解锁
unlockf()
gp.param = unsafe.Pointer(sg)
sg.success = true
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
//唤醒发送的goroutine
goready(gp, skip+1)
}
总结
如果阻塞读取一个nil chan的时候,goroutine是会被永久挂起,但不会阻塞在select中,因为select中的读取是非阻塞的
如果chan已经被关闭了,但是还有缓冲数据,那么仍然是可以读取的
六.关闭chan
关闭chan使用close进行关闭,关闭是一个非必要的操作。但是往一个已经关闭的chan中发送数据,会出现panic。关闭chan的操作一般仅作为一个事件通知,通常情况下,我们不要在接收端关闭chan,避免引发不必要的panic
/*
关闭chan
*/
func closechan(c *hchan) {
/*
关闭一个nil的chan的时候会panic
*/
if c == nil {
panic(plainError("close of nil channel"))
}
//加锁
lock(&c.lock)
//关闭一个已经关闭的chan会出现panic
if c.closed != 0 {
unlock(&c.lock)
panic(plainError("close of closed channel"))
}
if raceenabled {
callerpc := getcallerpc()
racewritepc(c.raceaddr(), callerpc, funcPC(closechan))
racerelease(c.raceaddr())
}
//设置关闭标识
c.closed = 1
var glist gList
/*
释放所有接收goroutine
*/
for {
sg := c.recvq.dequeue()
//直到没有
if sg == nil {
break
}
if sg.elem != nil {
//清除数据
typedmemclr(c.elemtype, sg.elem)
sg.elem = nil
}
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
gp := sg.g
gp.param = unsafe.Pointer(sg)
sg.success = false
if raceenabled {
raceacquireg(gp, c.raceaddr())
}
//形成链表
glist.push(gp)
}
/*
唤醒所有的发送者,后续这些发送都会panic
*/
for {
sg := c.sendq.dequeue()
if sg == nil {
break
}
//重置
sg.elem = nil
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
gp := sg.g
gp.param = unsafe.Pointer(sg)
sg.success = false
if raceenabled {
raceacquireg(gp, c.raceaddr())
}
//形成链表
glist.push(gp)
}
//解锁
unlock(&c.lock)
//遍历链表按个唤醒
for !glist.empty() {
gp := glist.pop()
gp.schedlink = 0
goready(gp, 3)
}
}
关闭 nil channel 会 panic
关闭已关闭的 channel 会 panic
关闭操作会将待接收者的接收值置为空值,唤醒所有待发送者和待接收者
以上是关于Golang channel源码分析的主要内容,如果未能解决你的问题,请参考以下文章
golang 片段7 for https://medium.com/@francesc/why-are-there-nil-channels-in-go-9877cc0b2308