go源码阅读——chan.go

Posted Wang-Junchao

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了go源码阅读——chan.go相关的知识,希望对你有一定的参考价值。

【博文目录>>>】 【项目地址>>>】

chan.go是go语言通道实现,通道结构的定义,接收和发送的操作都此文件中实现。

通道的结构

hchan是通道表示的基本结构,其内容表示如下:
一些特殊情况

  • 当dataqsiz=0时:说明这是一个无缓冲对列
  • 当dataqsiz>0时,说明是一个缓冲对列
type hchan struct 
    qcount   uint               // 队列中的数据总数
    dataqsiz uint               // 循环队列的大小
    buf      unsafe.Pointer     // 指向dataqsiz数组中的一个元素
    elemsize uint16
    closed   uint32             // 非0表示通道已经关闭
    elemtype *_type             // 元素类型
    sendx    uint               // 发送索引
    recvx    uint               // 接收索引
    recvq    waitq              // 所有等待的接收者
    sendq    waitq              // 所有等待的发送者

    // 锁保护hchan中的所有字段,以及此通道上阻止的sudogs中的多个字段。
    // 保持该锁状态时,请勿更改另一个G的状态(特别是不要准备好G),因为这会因堆栈收缩而死锁。
    // Question: sudogs?参见runtime/runtime2.go中的sudog结构
    lock mutex

通道的发送和接收

通道的发送和接收都是使用了一个名为waitq的等待对例,每个接收者和发送者都是一个sudog结构,此构在runtiem/runtime2.go文件中定义,在之后的源码分析中会仔细说明。

type waitq struct 
	first *sudog    // 队列头
	last  *sudog    // 队列尾

通道的对齐方法式

从文件的源码中可以知道,在内存中通道是以8字节的方式进行对齐的,内存分配不是8个字节会自动对对齐

const (
	maxAlign  = 8 // 8字节对齐
	hchanSize = unsafe.Sizeof(hchan) + uintptr(-int(unsafe.Sizeof(hchan))&(maxAlign-1))
	debugChan = false
)

通道的使用

通道的创建的方法

在源码中通道的创建有三个实现方法

  • func reflect_makechan(t *chantype, size int) *hchan …
  • func makechan64(t *chantype, size int64) *hchan …
  • func makechan(t *chantype, size int) *hchan …
    其本质是调用最后一个通道的创建方法。

通道的创建过程

方法func makechan(t *chantype, size int) *hchan ...说明了通道的整个创建过程。

/**
 * 创建通道
 * @param t 通道类型指针
 * @param size 通道大小,0表示无缓冲通道
 * @return
 **/
func makechan(t *chantype, size int) *hchan ...

通道创建要通过以过几个步骤。

  • 1、检查通道元素类型的size,如果elem.size >= 1<<16,则创建失败。
  • 2、检查通道和元素的对齐,如果hchanSize%maxAlign != 0 || elem.align > maxAlign,则通道创建失败
  • 3、计算通道需要分配的内存,如果overflow || mem > maxAlloc-hchanSize || size < 0,则通道创建失败
    • 3.1、overflow:表示内存内存计算有溢出
    • 3.2、mem > maxAlloc-hchanSize:表示内存分配置超出了限制
    • 3.3、size < 0:表示通道缓冲为负数
  • 4、根据不同条件进行内存分配
    • 4.1、mem == 0:队列或者元素大小为0,这是表示元素或者队列大小为0,直接分配hchanSize大小的内存,缓冲地址向自身
    • 4.2、elem.ptrdata == 0:元素不包含指针,直接分配hchanSize+mem大小的内存,并且缓冲地址向自身+hchanSize
    • 4.3、其他情况(元素包含指针):使用new的方法创建一个hchan,分配mem大小的内存,并且缓冲地址向内存分配的地址
  • 5、最后设置通道结构的其他值

向通道中发送数据

向通道中发送数据的方法一共有四个,如下所示

  • func chansend1(c *hchan, elem unsafe.Pointer) …
  • func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool …
  • func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) …
  • func sendDirect(t *_type, sg *sudog, src unsafe.Pointer) …
    chansend1方法是go编译代码中c <- x的入口点,即当我们编写代码 c <- x时,就是调用此方法。chansend1方法本质还是调用chansend方法进行处理
    这四个方法的调用关系:chansend1 -> chansend -> send -> sendDirect

chansend方法

chansend方法的执行代表了事个数据的发送过程,方法签名如下:

/**
  * 通用单通道发送/接收
  * 如果block不为nil,则协议将不会休眠,但如果无法完成则返回。
  *
  * 当涉及休眠的通道已关闭时,可以使用g.param == nil唤醒休眠。
  * 最容易循环并重新运行该操作; 我们将看到它现已关闭。
  * @param c 通道对象
  * @param ep 元素指针
  * @param block 是否阻塞
  * @param callerpc 调用者指针
  * @return bool true:表示发送成功
  **/
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool ...

通道数据发送的处理过程:

  • 1、如果通道为空
    • 1.1、通道是非阻塞的,则返回false,表示发送失败,方法结束
    • 1.2、通道是阻塞的,则会阻塞当前goroutine,同时会执行一个throw方法,方法结束
  • 2、检查通道和数据状态,2.1,2.2,2.3同时满足时发关失败,方法结束
    • 2.1、!block:通道非阻塞
    • 2.2、c.closed == 0:通道未关闭
    • 2.3、((c.dataqsiz == 0 && c.recvq.first == nil) || (c.dataqsiz > 0 && c.qcount == c.dataqsiz))
      • 2.3.1、(c.dataqsiz == 0 && c.recvq.first == nil):通道中没有数据,并且没有接收者
      • 2.3.2、(c.dataqsiz > 0 && c.qcount == c.dataqsiz):通道中有数据,并且已经满了
  • 3、对通道进行锁定
  • 4、判断通道是否已经关闭,如果已经关闭,就解锁通道,并且抛出panic。方法结束
  • 5、在接收者队列中找出一个最先入队的接收者,如果有,就调用send方法进行发送,返回true,方法结束
  • 6、如果没有找到接收者,并且c.qcount < c.dataqsiz,即通道的发送缓冲区未满,将要发送的数据拷贝到通道缓冲区,更新相关的计数据信息,并释放锁,返回true,方法结束
  • 7、没有找到接收都,并且没有缓冲可用,非阻塞方式,则解锁通道,返回false,发送失败
  • 8、没有找到接收者,并且没有缓冲可用,阻塞方式。获取gp(g)和mysg(sudog),并且将发送数据挂到mysg上,将mysg加到发送队列。调用gopark访求将当前goroutine阻塞,直到被恢复。
  • 9、在恢复后我们还要将发送数据保活,以确保数据正确被接收者复制出去了。
  • 10、检查goroutine状态,如果mysg != gp.waiting说明被破坏了,执行throw,方法结束
  • 11、如果gp.param == nil,说明唤醒有问题,
    • 11.1、如果通道未关闭,则说明是伪唤醒,执行throw方法结束
    • 11.2、如果通道关闭,则panic,在关闭的通道中进行了发送消息。
  • 12、最后是清理数据,并且释放mysg

其他相关方法

/**
 * 编译器实现,将goroutine的select receive(v = <-c)语句转成对应的方法执行
 * select 
 * case v = <-c:
 * 	... foo
 * default:
 * 	... bar
 * 
 * 
 * => (实际对应)
 * 
 * if selectnbrecv(&v, c) 
 * 	... foo
 *  else 
 * 	... bar
 * 
 */
func selectnbsend(c *hchan, elem unsafe.Pointer) (selected bool) ...

从通道中接收数据

chan.go文件中一共有五个方法实现用于接收通道内容,他们分别是:

  • func chanrecv1(c *hchan, elem unsafe.Pointer) …,
  • func chanrecv2(c *hchan, elem unsafe.Pointer) (received bool) …
  • func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) …
  • func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) …
  • func recvDirect(t *_type, sg *sudog, dst unsafe.Pointer) …
    其调用链 (chanrecv1|chanrecv2) -> chanrecv -> recv -> recvDirect
    其中chanrecv1方法是go代码<-c的入口点

chanrecv方法

chanrecv方法实现的通道接收的主要功能,其方法签名:

/**
 * 通用单通道发送/接收
 * 如果block不为nil,则协议将不会休眠,但如果无法完成则返回。
 *
 * 当涉及休眠的通道已关闭时,可以使用g.param == nil唤醒休眠。
 * 最容易循环并重新运行该操作; 我们将看到它现已关闭。
 * @param c 通道对象
 * @param ep 元素指针
 * @param block 是否阻塞
 * @param callerpc 调用者指针
 * @return bool true:表示发送成功
 **/
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) ...

通道数据接收的处理过程如下:

  • 1、如通道接收通道为nil
    • 1.1、如果接收是非阻塞的,则返回false,方法结束
    • 1.2、如果接收是阻塞的,则阻塞goroutine,即此goroutine会永久阻塞
  • 2、当2.1,2.2,2.3同时满足时,则返回false,方法结束
    • 2.1、!block == true,即非阻塞
    • 2.2、c.closed == 0,通道未关闭
    • 2.3、2.3.1,2.3.2两个满足其中一个
      • 2.3.1、c.dataqsiz == 0 && c.sendq.first == nil,通道中没有数据,并且没有发送者
      • 2.3.2、c.dataqsiz > 0 && atomic.Loaduint(&c.qcount) == 0,通道中有空间,但没有数据
  • 3、加锁
  • 4、如果通道已经关闭,并且没有数据,则解锁,进行内存清理,返回(true, false),方法结束
  • 5、从发送队列中取队头发送者,如果不为空,就用发送者发送的数据,返回(true, true),方法结束
  • 6、没有发送者,数据队列不为空(c.qcount > 0),直接从数据队列中接收数据,并且更新队列计数和接收索引指针,然后复制数据,清理内存,释放锁,最后返回(true, true),方法结束
  • 7、没有发送者,并且队列为空,并且是非阻塞状态,则释放锁,返回(false, false),方法结束
  • 8、没有找到发送都,并且没有缓冲可用,阻塞方式。获取gp(g)和mysg(sudog),并且将接收数据指针挂到mysg上,将mysg加到接收队列。调用gopark访求将当前goroutine阻塞,直到被恢复。
  • 9、在恢复后我们还是判断是否是意外唤醒,如果是,就panic,方法结束
  • 10、进行清理工作,释放sudog,返回(true, !closed)

recv方法

通道另一个核心方法就是recv,其方法签名如下:

/**
 * recv在完整通道c上处理接收操作。
 * 有2个部分:
 *      1)将发送方sg发送的值放入通道中,并唤醒发送方以继续进行。
 *      2)接收方接收到的值(当前G)被写入ep。
 * 对于同步通道,两个值相同。
 * 对于异步通道,接收者从通道缓冲区获取数据,而发送者的数据放入通道缓冲区。
 * 频道c必须已满且已锁定。 recv用unlockf解锁c。
 * sg必须已经从c中出队。
 * 非nil必须指向堆或调用者的堆栈。
 * @param c 通道对象针对
 * @param sg sudog指针
 * @param ep 用户接收元素的指针
 * @param unlockf 解锁函数
 * @param skip
 * @return
 **/
func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) ...

recv接收数据的过程

  • 1、对于无缓冲通道(c.dataqsiz == 0),当数据指针不空的时候,直接拷贝发送指针所指的数据
  • 2、对于缓冲通道,将数据写入到缓冲中,并且更新缓冲计数,接收索引,发送索引
  • 3、进行goroutine相关处理,释放锁,并且将当前goroutine置于等待

其他相关方法

/**
 * 编译器实现,将goroutine的select receive(v = <-c)语句转成对应的方法执行
 * select 
 * case v = <-c:
 * 	... foo
 * default:
 * 	... bar
 * 
 * 
 * => (实际对应)
 * 
 * if selectnbrecv(&v, c) 
 * 	... foo
 *  else 
 * 	... bar
 * 
 **/
func selectnbrecv(elem unsafe.Pointer, c *hchan) (selected bool) ...

/**
 * 编译器实现,将goroutine的select receive(case v, ok = <-c:)语句转成对应的方法执行
 * select 
 * case v, ok = <-c:
 *  ... foo
 * default:
 *  ... bar
 * 
 *
 * => (实际对应)
 *
 * if c != nil && selectnbrecv2(&v, &ok, c) 
 *  ... foo
 *  else 
 *  ... bar
 * 
 **/
func selectnbrecv2(elem unsafe.Pointer, received *bool, c *hchan) (selected bool) ...

通道关闭

chan.go中只有一个关闭通道的方法

  • func closechan(c *hchan) …
    通过的关闭过程
  • 1、如果通道为nil,则painc,方法结束
  • 2、锁定通道
  • 3、如果通道已经关闭了,解锁通道,并且panic,方法结束,即通道只能关闭一次
  • 4、标记通道已经关闭
  • 5、释放所有的接收者,将接收者入gList队列
  • 6、释放所有的发送者,将发送者入gList队列
  • 7、释放锁
  • 8、将gLsit中的元素都标记成进入goready状态

源码

package runtime

// This file contains the implementation of Go channels.

// Invariants:
//  At least one of c.sendq and c.recvq is empty,
//  except for the case of an unbuffered channel with a single goroutine
//  blocked on it for both sending and receiving using a select statement,
//  in which case the length of c.sendq and c.recvq is limited only by the
//  size of the select statement.
//
// For buffered channels, also:
//  c.qcount > 0 implies that c.recvq is empty.
//  c.qcount < c.dataqsiz implies that c.sendq is empty.
/**
 * 此文件包含Go渠道的实现。此包中所使用的类型大部分定义在:runtime/type.go文件中
 *
 * 不变量:
 * c.sendq和c.recvq中的至少一个为空,但在无缓冲通道上阻塞了单个goroutine以便使用select语句发送和接收的情况除外,在这种情况下,
 * c.sendq的长度 而c.recvq仅受select语句的大小限制。
 *
 * 对于缓冲通道,还:
 * c.qcount> 0表示c.recvq为空。
 * c.qcount <c.dataqsiz表示c.sendq为空。
 */
import (
	"runtime/internal/atomic"
	"runtime/internal/math"
	"unsafe"
)

const (
	maxAlign  = 8 // 8字节对齐
	hchanSize = unsafe.Sizeof(hchan) + uintptr(-int(unsafe.Sizeof(hchan))&(maxAlign-1))
	debugChan = false
)

type hchan struct 
	qcount   uint           // total data in the queue // 队列中的数据总数
	dataqsiz uint           // size of the circular queue   // 循环队列的大小
	buf      unsafe.Pointer // points to an array of dataqsiz elements // 指向dataqsiz数组中的一个元素
	elemsize uint16
	closed   uint32  // 非0表示通道已经关闭
	elemtype *_type // element type // 元素类型
	sendx    uint   // send index   // 发送索引
	recvx    uint   // receive index // 接收索引
	recvq    waitq  // list of recv waiters // 所有等待的接收者
	sendq    waitq  // list of send waiters // 所有等待的发送者

	// lock protects all fields in hchan, as well as several
	// fields in sudogs blocked on this channel.
	//
	// Do not change another G's status while holding this lock
	// (in particular, do not ready a G), as this can deadlock
	// with stack shrinking.
	// 锁保护hchan中的所有字段,以及此通道上阻止的sudogs中的多个字段。
    // 保持该锁状态时,请勿更改另一个G的状态(特别是不要准备好G),因为这会因堆栈收缩而死锁。
    // Question: sudogs?参见runtime/runtime2.go中的sudog结构
	lock mutex


/**
 * 等待队列数据结棍
 */
type waitq struct 
	first *sudog    // 队列头
	last  *sudog    // 队列尾


//go:linkname reflect_makechan reflect.makechan
/**
 * 通过反射创建通道,
 * @param
 * @return
 **/
func reflect_makechan(t *chantype, size int) *hchan 
	return makechan(t, size) // 在reflect/value.go/makechan方法中实现


/**
 * 创建通道
 * @param t 通道类型
 * @param size 看上去是支持64位int,本质上只支持int类型
 * @return
 **/
func makechan64(t *chantype, size int64) *hchan 
	if int64(int(size)) != size  // 说明有溢出
		panic(plainError("makechan: size out of range"))
	

	return makechan(t, int(size))


/**
 * 创建通道
 * @param t 通道类型指针
 * @param size 通道大小,0表示无缓冲通道
 * @return
 **/
func makechan(t *chantype, size int) *hchan 
	elem := t.elem

	// compiler checks this but be safe.
	// 编译器对此进行检查,但很安全。
	if elem.size >= 1<<16 
		throw("makechan: invalid channel element type")
	
	// 非8字节对齐
	if hchanSize%maxAlign != 0 || elem.align > maxAlign 
		throw("makechan: bad alignment")
	

    // MulUintptr返回a * b以及乘法是否溢出。 在受支持的平台上,这是编译器固有的功能。
	mem, overflow := math.MulUintptr(elem.size, uintptr(size))
	// 发生溢出,或者分配内存超限制,或者size<0
	if overflow || mem > maxAlloc-hchanSize || size < 0 
		panic(plainError("makechan: size out of range"))
	

	// Hchan does not contain pointers interesting for GC when elements stored in buf do not contain pointers.
	// buf points into the same allocation, elemtype is persistent.
	// SudoG's are referenced from their owning thread so they can't be collected.
	// TODO(dvyukov,rlh): Rethink when collector can move allocated objects.
	// 当存储在buf中的元素不包含指针时,Hchan不包含GC感兴趣的指针。 buf指向相同的分配,
	// elemtype是持久的。 SudoG是从它们自己的线程中引用的,因此无法收集它们。
    // TODO(dvyukov,rlh):重新考虑何时收集器可以移动分配的对象。
	var c *hchan
	switch 
	case mem == 0: // 不需要分配置内存空间
		// Queue or element size is zero.
	    // 队列或元素大小为零。
	    // mallocgc分配一个大小为size字节的对象。 小对象是从每个P缓存的空闲列表中分配的。
	    // 大对象(> 32 kB)直接从堆中分配。
		c = (*hchan)(mallocgc(hchanSize, nil, true))
		// Race detector uses this location for synchronization.
		// 竞态探测器使用此位置进行同步。
		c.buf = c.raceaddr()
	case elem.ptrdata == 0: // 无指针数据
		// Elements do not contain pointers.
		// Allocate hchan and buf in one call.
		// 元素不包含指针。 在一次调用中分配hchan和buf。
		c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
		c.buf = add(unsafe.Pointer(c), hchanSize) // 修改数据指针地址
	default:
		// Elements contain pointers.
	    // 元素包含指针。
		c = new(hchan)
		c.buf = mallocgc(mem, elem, true)
	

	c.elemsize = uint16(elem.size) // 设置元素大小
	c.elemtype = elem   // 设置元素类型
	c.dataqsiz = uint(size) // 设置通道大小

	if debugChan 
		print("makechan: chan=", c, "; elemsize=", elem.size, "; dataqsiz=", size, "\\n")
	
	return c


// chanbuf(c, i) is pointer to the i'th slot in the buffer.
/**
 * chanbuf(c, i) 返回指向缓冲区中第i个槽值的指针。
 * @param c 通道对象
 * @return i 第i个槽位
 **/
func chanbuf(c *hchan, i uint) unsafe.Pointer 
	return add(c.buf, uintptr(i)*uintptr(c.elemsize))


// entry point for c <- x from compiled code
//go:nosplit
/**
 * 编译代码中c <- x的入口点,即当我们编写代码 c <- x时,就是调用此方法
 * @param c 通道对象
 * @param elem 需要发送的元素
 * @return
 **/
func chansend1(c *hchan, elem unsafe.Pointer) 
	chansend(c, elem, true, getcallerpc())


/*
 * generic single channel send/recv
 * If block is not nil,
 * then the protocol will not
 * sleep but return if it could
 * not complete.
 *
 * sleep can wake up with g.param == nil
 * when a channel involved in the sleep has
 * been closed.  it is easiest to loop and re-run
 * the operation; we'll see that it's now closed.
 */
 /**
  * 通用单通道发送/接收
  * 如果block不为nil,则协议将不会休眠,但如果无法完成则返回。
  *
  * 当涉及休眠的通道已关闭时,可以使用g.param == nil唤醒休眠。
  * 最容易循环并重新运行该操作; 我们将看到它现已关闭。
  * @param c 通道对象
  * @param ep 元素指针
  * @param block 是否阻塞
  * @param callerpc 调用者指针
  * @return bool true:表示发送成功
  **/
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool 
	if c == nil  // 通道已为空
		if !block  // 非阻塞
			return false
		
		// 将当前goroutine置于等待状态并调用unlockf。 如果unlockf返回false,则继续执行goroutine程序。
		// unlockf一定不能访问此G的堆栈,因为它可能在调用gopark和调用unlockf之间移动。
        // waitReason参数说明了goroutine已停止的原因。
        // 它显示在堆栈跟踪和堆转储中。
        // waitReason应具有唯一性和描述性。
        // 不要重复使用waitReason,请添加新的waitReason。
        // 更详细的说明参见:runtime/proc.go和runtime/runtime2.go
		gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
		throw("unreachable")
	

	if debugChan  // 当前已经为false
		print("chansend: chan=", c, "\\n")
	

	if raceenabled  // 当前已经为false
		racereadpc(c.raceaddr(), callerpc, funcPC(chansend))
	

	// Fast path: check for failed non-blocking operation without acquiring the lock.
	//
	// After observing that the channel is not closed, we observe that the channel is
	// not ready for sending. Each of these observations is a single word-sized read
	// (first c.closed and second c.recvq.first or c.qcount depending on kind of channel).
	// Because a closed channel cannot transition from 'ready for sending' to
	// 'not ready for sending', even if the channel is closed between the two observations,
	// they imply a moment between the two when the channel was both not yet closed
	// and not ready for sending. We behave as if we observed the channel at that moment,
	// and report that the send cannot proceed.
	//
	// It is okay if the reads are reordered here: if we observe that the channel is not
	// ready for sending and then observe that it is not closed, that implies that the
	// channel wasn't closed during the first observation.
	// 快速路径:在没有获取锁的情况下检查失败的非阻塞操作。
    //
    // 观察到通道未关闭后,我们观察到该通道尚未准备好发送。 这些观察中的每一个都是单个字(word)大小的读取
    // (根据通道的类型,取第一个c.closed和第二个c.recvq.first或c.qcount)。
    // 因为关闭的通道无法从“准备发送”转换为“未准备发送”,所以即使通道在两个观测值之间处于关闭状态,
    // 它们也隐含着两者之间的一个时刻,即通道既未关闭又未关闭准备发送。 我们的行为就好像我们当时在观察该通道,
    // 并报告发送无法继续进行。
    //
    // 如果在此处对读取进行了重新排序,也是可以的:如果我们观察到该通道尚未准备好发送,然后观察到它没有关闭,
    // 则意味着该通道在第一次观察期间没有关闭。
    // !block:非阻塞状态
    // c.closed == 0:通道未关闭
    // (c.dataqsiz == 0 && c.recvq.first == nil):通道中没有数据,并且没有接收者
    // (c.dataqsiz > 0 && c.qcount == c.dataqsiz):通道中有数据,并且已经满了
	if !block && c.closed == 0 && ((c.dataqsiz == 0 && c.recvq.first == nil) ||
		(c.dataqsiz > 0 && c.qcount == c.dataqsiz)) 
		return false
	

	var t0 int64
	if blockprofilerate > 0 
		t0 = cputicks()
	

    // 加锁
	lock(&c.lock)

	if c.closed != 0  // 通道已经关闭,解锁,抛出panic
		unlock(&c.lock)
		panic(plainError("send on closed channel"))
	

	if sg := c.recvq.dequeue(); sg != nil 
		// Found a waiting receiver. We pass the value we want to send
		// directly to the receiver, bypassing the channel buffer (if any).
		// 找到了等待的接收者。 我们绕过通道缓冲区(如果有)将要发送的值直接发送给接收器。
		send(c, sg, ep, func()  unlock(&c.lock) , 3)
		return true
	

    // 没有找到接收者,并且队列元素未填满通道的循环队列
	if c.qcount < c.dataqsiz 
		// Space is available in the channel buffer. Enqueue the element to send.
		// 通道缓冲区中有可用空间。 使要发送的元素入队。
		qp := chanbuf(c, c.sendx)
		if raceenabled  // 此值已经为false
			raceacquire(qp)
			racerelease(qp)
		
		// 执行内存移动
		typedmemmove(c.elemtype, qp, ep)
		c.sendx++ // 指向下一个可以发送数据的空位
		if c.sendx == c.dataqsiz  // 已经达到了末尾
			c.sendx = 0 // 重新指向头部
		
		c.qcount++ // 通道总的数据加1
		unlock(&c.lock) // 解锁
		return true // 说明发送成功
	

    // 非阻塞,因为找不到接收者,所以失败
	if !block 
		unlock(&c.lock)
		return false
	

	// Block on the channel. Some receiver will complete our operation for us.
    // 在通道上阻塞。一些接收器将为我们完成操作。
    // getg将返回指向当前g的指针。获取suodg
	gp := getg()
	mysg := acquireSudog()
	mysg.releasetime = 0
	if t0 != 0 
		mysg.releasetime = -1
	
	// No stack splits between assigning elem and enqueuing mysg
	// on gp.waiting where copystack can find it.
	// 在分配elem和将mysg入队到gp.waitcopy可以找到它的地方之间没有堆栈拆分。
	mysg.elem = ep
	mysg.waitlink = nil
	mysg.g = gp
	mysg.isSelect = false
	mysg.c = c
	gp.waiting = mysg
	gp.param = nil
	c.sendq.enqueue(mysg)
	gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)
	// Ensure the value being sent is kept alive until the
	// receiver copies it out. The sudog has a pointer to the
	// stack object, but sudogs aren't considered as roots of the
	// stack tracer.
	// 确保发送的值保持活动状态,直到接收者将其复制出来。
	// sudog具有指向堆栈对象的指针,但是sudog不被视为堆栈跟踪器的根。
	KeepAlive(ep)

	// someone woke us up.
	// 有人把我们唤醒了
	if mysg != gp.waiting 
		throw("G waiting list is corrupted")
	
	gp.waiting = nil
	gp.activeStackChans = false
	if gp.param == nil 
		if c.closed =

以上是关于go源码阅读——chan.go的主要内容,如果未能解决你的问题,请参考以下文章

golang - channel

golang golang_chan_select_timeout.go

golang pipeline_by_running_chan.go

[Go] gocron源码阅读-groutine与channel应用到信号捕获

Go小技巧(二)— 打开已经关闭的channel

golang channel源码阅读