golang源码之channel
Posted better_hui
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了golang源码之channel相关的知识,希望对你有一定的参考价值。
目录
channel是golang语言goroutine间的通信方式。
一、数据结构
1、hchan
type hchan struct {
qcount uint // 目前队列中的数据数量total data in the queue
dataqsiz uint // 队列的容量size of the circular queue
buf unsafe.Pointer // 队列points to an array of dataqsiz elements
elemsize uint16 //数据的大小
closed uint32
elemtype *_type // 数据类型element type
sendx uint // 发送的Index send index
recvx uint // 接收的Index receive index
recvq waitq // 接收的等待G list of recv waiters
sendq waitq // 发送的等待G list of send waiters
// lock protects all fields in hchan, as well as several
// fields in sudogs blocked on this channel.
//
// Do not change another G's status while holding this lock
// (in particular, do not ready a G), as this can deadlock
// with stack shrinking.
lock mutex
}
2、环形队列
数据缓冲队列,数据结构是一个链表,但是通过sendx 和 recvx ,实现了环形队列结构
3、等待队列
hchan中有两个等待队列:发送的等待队列(sendq)和接收的等待队列(recvq ),存放的是一系列的G
-
因读阻塞的goroutine会被向channel写入数据的goroutine唤醒;
-
因写阻塞的goroutine会被从channel读数据的goroutine唤醒;
4、类型信息
一个channel中,只能存放同一种数据类型的数据,因为我们在发送或者接收的时候,是通过指针来操作数据的,我们通过数据类型可以结算内存地址
-
elemtype代表类型,用于数据传递过程中的赋值;
-
elemsize代表类型大小,用于在buf中定位元素位置。
5、锁
发送和接收的全过程是加锁的,用以保证goroutine安全
6、sudog
等待的G队列,都会包装成sudog的数据结构
二、初始化channel
初始化分两种情况:有缓冲区、无缓冲区
func makechan(t *chantype, size int) *hchan {
elem := t.elem
// compiler checks this but be safe.
// 检查元素的大小
if elem.size >= 1<<16 {
throw("makechan: invalid channel element type")
}
if hchanSize%maxAlign != 0 || elem.align > maxAlign {
throw("makechan: bad alignment")
}
// 检查元素大小 * 缓冲队列长度 是否会溢出
mem, overflow := math.MulUintptr(elem.size, uintptr(size))
if overflow || mem > maxAlloc-hchanSize || size < 0 {
panic(plainError("makechan: size out of range"))
}
var c *hchan
switch {
case mem == 0:
// elem.size*size 结果为0, 要么是 elem.size 为0, 要么是创建的size为0
// 此时只分配 hchan 大小的内存.
c = (*hchan)(mallocgc(hchanSize, nil, true))
c.buf = c.raceaddr()
case elem.ptrdata == 0:
// elem 当中不包含指针. hchan和buf的地址是连续的. 此时分配 hchanSize + mem
c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
c.buf = add(unsafe.Pointer(c), hchanSize)
default:
// elem 包含指针, 两次内存操作. hchan 和 buf的地址是不连续的
c = new(hchan)
c.buf = mallocgc(mem, elem, true)
}
// 缓冲区元素类型大小
c.elemsize = uint16(elem.size)
// 缓冲区元素类型
c.elemtype = elem
// 总的容量
c.dataqsiz = uint(size)
lockInit(&c.lock, lockRankHchan)
if debugChan {
print("makechan: chan=", c, "; elemsize=", elem.size, "; dataqsiz=", size, "\\n")
}
return c
}
三、发送数据
1、recvq 不为空,说明缓冲队列里没有数据,此时直接从recvq中取出一个数据,并传递
2、recvq 为空,且缓冲队列还有空余 ,将数据写入缓冲区
3、缓冲队列已满 , sender加入到sendq
func chansend1(c *hchan, elem unsafe.Pointer) {
chansend(c, elem, true, getcallerpc())
}
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
if c == nil {
if !block {
return false
}
gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
throw("unreachable")
}
if debugChan {
print("chansend: chan=", c, "\\n")
}
if raceenabled {
racereadpc(c.raceaddr(), callerpc, funcPC(chansend))
}
//快速的判断:非阻塞模式下 且 chan未关闭 且 队列已满 , 则直接返回
if !block && c.closed == 0 && full(c) {
return false
}
var t0 int64
if blockprofilerate > 0 {
t0 = cputicks()
}
//先加锁
lock(&c.lock)
if c.closed != 0 {
unlock(&c.lock)
panic(plainError("send on closed channel"))
}
//尝试从接收的G队列中获取一个G,如果成功,那么直接发送数据给对应的G
if sg := c.recvq.dequeue(); sg != nil {
// Found a waiting receiver. We pass the value we want to send
// directly to the receiver, bypassing the channel buffer (if any).
send(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true
}
//判断环形队列是否还有空间,如果有,那么将数据加入队列
if c.qcount < c.dataqsiz {
// Space is available in the channel buffer. Enqueue the element to send.
//qp是新加入数据的指针地址
qp := chanbuf(c, c.sendx)
if raceenabled {
racenotify(c, c.sendx, nil)
}
//将发送数据 移动至 qp
typedmemmove(c.elemtype, qp, ep)
//下一个发送数据的槽位++
c.sendx++
//因为这是一个环,所以sendx到达对位后,从0继续开始
if c.sendx == c.dataqsiz {
c.sendx = 0
}
//队列内数据的count ++
c.qcount++
//解锁
unlock(&c.lock)
return true
}
//如果是非阻塞模式 , 那么快速失败
if !block {
unlock(&c.lock)
return false
}
//返回当前G的指针
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
// 将待阻塞的当前G 打包成sudog数据结构
mysg.elem = ep
mysg.waitlink = nil
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.waiting = mysg
gp.param = nil
//将打包后的结构加入到发送的队列
c.sendq.enqueue(mysg)
atomic.Store8(&gp.parkingOnChan, 1)
//接触当前G 与 M绑定关系 , 也就是G阻塞了 , 但是M需要继续处理P下的其他G
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)
// 保持发送数据的活性值,直到被消费
KeepAlive(ep)
// 唤醒后的校验
if mysg != gp.waiting {
throw("G waiting list is corrupted")
}
gp.waiting = nil
gp.activeStackChans = false
closed := !mysg.success
gp.param = nil
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}
mysg.c = nil
releaseSudog(mysg)
if closed {
if c.closed == 0 {
throw("chansend: spurious wakeup")
}
panic(plainError("send on closed channel"))
}
return true
}
四、接收数据
1、sendq 不为空 ,且没有缓冲区 ,直接取出一个sender ,接收数据
2、sendq 不为空 , 且有冲换区 , a、取出缓冲区的数据复制给recvg ; b、取出一个sender 取出数据放到buf中 ,并唤醒G
3、sendq 为空 , 缓冲区为空 , 加入到等待队列
4、sendq 为空 , 缓冲区不为空 , 取出数据
func chanrecv1(c *hchan, elem unsafe.Pointer) {
chanrecv(c, elem, true)
}
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
if debugChan {
print("chanrecv: chan=", c, "\\n")
}
// 当前缓冲队列= nil 而且是非阻塞模式,直接返回
if c == nil {
if !block {
return
}
gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
throw("unreachable")
}
// 非阻塞 且 缓冲是重的 , 直接返回
if !block && empty(c) {
if atomic.Load(&c.closed) == 0 {
return
}
if empty(c) {
if raceenabled {
raceacquire(c.raceaddr())
}
if ep != nil {
typedmemclr(c.elemtype, ep)
}
return true, false
}
}
var t0 int64
if blockprofilerate > 0 {
t0 = cputicks()
}
// 加锁
lock(&c.lock)
// 非关闭状态 且 缓冲没有数据
// 换句话说 只要是队列里有数据, 都进不来,可以继续执行
if c.closed != 0 && c.qcount == 0 {
if raceenabled {
raceacquire(c.raceaddr())
}
unlock(&c.lock)
if ep != nil {
typedmemclr(c.elemtype, ep)
}
return true, false
}
//从发送队列尝试获取一个G
if sg := c.sendq.dequeue(); sg != nil {
recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true, true
}
// 缓存队列中有数据
if c.qcount > 0 {
// 取出一个数据
qp := chanbuf(c, c.recvx)
if raceenabled {
racenotify(c, c.recvx, nil)
}
// 将取出的数据 复制给接收对象ep
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
// 清除qp的数据
typedmemclr(c.elemtype, qp)
// 待消费的数据指针向后移动
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
//缓存数据-- , 解锁
c.qcount--
unlock(&c.lock)
return true, true
}
// 走到这里 表明 sender队列中没有等待的G , 缓存的队列中没有数据
// 如果是非阻塞模式, 快速范围
if !block {
unlock(&c.lock)
return false, false
}
// 获取不到任何待消费的数据, 且是阻塞模式 ,那么阻塞吧
// 获取当前的G ,并包装成Sudog结构
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
mysg.elem = ep //接收的数据对象
mysg.waitlink = nil
gp.waiting = mysg
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.param = nil
//Sudog结构加入接收的等待队列
c.recvq.enqueue(mysg)
atomic.Store8(&gp.parkingOnChan, 1)
// 休眠
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2)
if mysg != gp.waiting {
throw("G waiting list is corrupted")
}
gp.waiting = nil
gp.activeStackChans = false
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}
success := mysg.success
gp.param = nil
mysg.c = nil
// 释放sudog
releaseSudog(mysg)
return true, success
}
//sendq不为空的情况下的处理方法
func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
// 不带缓存的情况
if c.dataqsiz == 0 {
// 非缓冲型通道, 或者同步通道.
// 设置 -race
if raceenabled {
racesync(c, sg)
}
if ep != nil {
// copy data from sender
// 将去除的sender中的数据 复制给接收对象ep
recvDirect(c.elemtype, sg, ep)
}
} else {
// 获取待接收的数据
qp := chanbuf(c, c.recvx)
if raceenabled {
racenotify(c, c.recvx, nil)
racenotify(c, c.recvx, sg)
}
// copy data from queue to receiver
// 将待接收的数据 复制给接收对象ep
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
// copy data from sender to queue
// 将sender中的数据 复制给recvx指针的地址 , 另外将recvx 向后移动一位
typedmemmove(c.elemtype, qp, sg.elem)
c.recvx++
// 如果已经取到队尾了 , 那么从0继续开始
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
}
sg.elem = nil
gp := sg.g
unlockf()
gp.param = unsafe.Pointer(sg)
sg.success = true
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
// 唤醒sender
goready(gp, skip+1)
}
//sendq不为空 且 没有缓冲区 ,直接发送
func recvDirect(t *_type, sg *sudog, dst unsafe.Pointer) {
// 从发送队列中获取数据
src := sg.elem
// 将发送队列的数据 复制给接收的对象dst
typeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.size)
memmove(dst, src, t.size)
}
五、关闭
func closechan(c *hchan) {
if c == nil {
panic(plainError("close of nil channel"))
}
//加锁
lock(&c.lock)
// 重复关闭一个 chan
if c.closed != 0 {
unlock(&c.lock)
panic(plainError("close of closed channel"))
}
if raceenabled {
callerpc := getcallerpc()
racewritepc(c.raceaddr(), callerpc, funcPC(closechan))
racerelease(c.raceaddr())
}
c.closed = 1
var glist gList
// release all readers
// 释放所有接收的recvq
for {
sg := c.recvq.dequeue()
if sg == nil {
break
}
if sg.elem != nil {
typedmemclr(c.elemtype, sg.elem)
sg.elem = nil
}
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
gp := sg.g
gp.param = unsafe.Pointer(sg)
sg.success = false
if raceenabled {
raceacquireg(gp, c.raceaddr())
}
glist.push(gp)
}
// 释放所有接收的sendq
for {
sg := c.sendq.dequeue()
if sg == nil {
break
}
sg.elem = nil
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
gp := sg.g
gp.param = unsafe.Pointer(sg)
sg.success = false
if raceenabled {
raceacquireg(gp, c.raceaddr())
}
glist.push(gp)
}
unlock(&c.lock)
// Ready all Gs now that we've dropped the channel lock.
// 唤醒 glist 当中所有的 gp. 这样做的好处是减少锁定的时间
for !glist.empty() {
gp := glist.pop()
gp.schedlink = 0
goready(gp, 3)
}
}
以上是关于golang源码之channel的主要内容,如果未能解决你的问题,请参考以下文章
golang 片段7 for https://medium.com/@francesc/why-are-there-nil-channels-in-go-9877cc0b2308