深入理解Golang之channel

Posted YuNansen

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了深入理解Golang之channel相关的知识,希望对你有一定的参考价值。

前言

Golang在并发编程上有两大利器,分别是channel和goroutine,这篇文章我们先聊聊channel。熟悉Golang的人都知道一句名言:“使用通信来共享内存,而不是通过共享内存来通信”。这句话有两层意思,Go语言确实在sync包中提供了传统的锁机制,但更推荐使用channel来解决并发问题。这篇文章会先从channel的用法、channel的原理两部分对channel做一个较为深入的探究。

channel用法

什么是channel

从字面上看,channel的意思大概就是管道的意思。channel是一种go协程用以接收或发送消息的安全的消息队列,channel就像两个go协程之间的导管,来实现各种资源的同步。可以用下图示意:

channel的用法很简单:

	func main()     
		ch := make(chan int, 1) // 创建一个类型为int,缓冲区大小为1的channel    
		ch <- 2 // 将2发送到ch    
		n, ok := <- ch // n接收从ch发出的值    
		if ok         
			fmt.Println(n) // 2    
		    
		close(ch) // 关闭channel
	

使用channel时有几个注意点:

  • 向一个nil channel发送消息,会一直阻塞;
  • 向一个已经关闭的channel发送消息,会引发运行时恐慌(panic);
  • channel关闭后不可以继续向channel发送消息,但可以继续从channel接收消息;
  • 当channel关闭并且缓冲区为空时,继续从从channel接收消息会得到一个对应类型的零值。

Unbuffered channels与Buffered channels

Unbuffered channels是指缓冲区大小为0的channel,这种channel的接收者会阻塞直至接收到消息,发送者会阻塞直至接收者接收到消息,这种机制可以用于两个goroutine进行状态同步;Buffered channels拥有缓冲区,发送者在将消息发送到缓冲区之前是阻塞的,当缓冲区已满时,发送者会阻塞;当缓冲区为空时,接收者会阻塞。
引用The Nature Of Channels In Go中的两张图来说明Unbuffered channels与Buffered channels, 非常形象,读者可自行体会一下:

Unbuffered channels:

Buffered channels:

channel的遍历

for range

channel支持 for range 的方式进行遍历:

	package main  
	import "fmt"  
	func main()       
		ci := make(chan int, 5)      
		for i := 1; i <= 5; i++         
			ci <- i    
		        
		close(ci)      
		for i := range ci           
			fmt.Println(i)      
		  
	  

值得注意的是,在遍历时,如果channel 没有关闭,那么会一直等待下去,出现 deadlock 的错误;如果在遍历时channel已经关闭,那么在遍历完数据后自动退出遍历。也就是说,for range 的遍历方式时阻塞型的遍历方式。

for select

select可以处理非阻塞式消息发送、接收及多路选择。

package main  
import "fmt"  
func main()       
	ci := make(chan int, 2)    
	for i := 1; i <= 2; i++         
		ci <- i    
	    
	close(ci)    
	cs := make(chan string, 2)    
	cs <- "hi"    
	cs <- "golang"    
	close(cs)    
	ciClosed, csClosed := false, false    
	for         
		if ciClosed && csClosed            
 			return        
 		        
 		select         
 			case i, ok := <-ci:            
 				if ok                 
 					fmt.Println(i)            
 				 else                 
 					ciClosed = true                
 					fmt.Println("ci closed")            
 				        
 			case s, ok := <-cs:            
 				if ok                 
 					fmt.Println(s)            
 				 else                 
 					csClosed = true                
 					fmt.Println("cs closed")            
 				        
 			default:            
 				fmt.Println("waiting...")        
		    
 	
  

select中有case代码块,用于channel发送或接收消息,任意一个case代码块准备好时,执行其对应内容;多个case代码块准备好时,随机选择一个case代码块并执行;所有case代码块都没有准备好,则等待;还可以有一个default代码块,所有case代码块都没有准备好时执行default代码块。

channel原理

先贴一下channel的源码地址,读者可以对照来看。

数据结构

先看channel的结构体:

type hchan struct     
	qcount   uint           // total data in the queue    
	dataqsiz uint           // size of the circular queue    
	buf      unsafe.Pointer // points to an array of dataqsiz elements    // channel中元素大小    
	elemsize uint16     // 是否已关闭    
	closed   uint32    // channel中元素类型    
	elemtype *_type // element type    
	sendx    uint   // send index    
	recvx    uint   // receive index    
	recvq    waitq  // list of recv waiters   
 	sendq    waitq  // list of send waiters    
	 // lock protects all fields in hchan, as well as several    
	 // fields in sudogs blocked on this channel.    
	 //    
	 // Do not change another G's status while holding this lock    
	 // (in particular, do not ready a G), as this can deadlock    
	 // with stack shrinking.    
 	lock mutex

channel的缓冲区其实是一个环形队列,qcount表示队列中元素的数量,dataqsiz表示环形队列的总大小,buf表示一个指向循环数组的指针;sendx和recvx分别用来标识当前发送和接收的元素在循环队列中的位置;recvq和sendq都是一个列表,分别用于存储当前处于等待接收和等待发送的Goroutine。

再看一下waitq的数据结构:

type waitq struct     
	first *sudog    
	last  *sudog


type sudog struct     
	// 当前goroutine    
	g *g    
	// isSelect indicates g is participating in a select, so    
	// g.selectDone must be CAS'd to win the wake-up race.    
	isSelect bool    
	next     *sudog    
	prev     *sudog    
	elem     unsafe.Pointer // data element (may point to stack)    
	// The following fields are never accessed concurrently.    
	// For channels, waitlink is only accessed by g.    
	// For semaphores, all fields (including the ones above)    
	// are only accessed when holding a semaRoot lock.    
	acquiretime int64    
	releasetime int64    
	ticket      uint32    
	parent      *sudog // semaRoot binary tree    
	waitlink    *sudog // g.waiting list or semaRoot    
	waittail    *sudog // semaRoot    
	c           *hchan // channel

其中sudog表示处于等待列表中的Goroutine封装,包含了一些上下文信息,first和last分别指向等待列表的首位的Goroutine。

编译分析

在分析channel的原理之前,我们先使用go tool分析以下代码,看看channel的各种操作在底层调用了什么运行时方法:

ch := make(chan int, 2)
ch <- 2
ch <- 1
<-ch
n, ok := <-ch
if ok     
	fmt.Println(n)

close(ch)

编译

go build test.go
go tool objdump -s "main\\.main" test | grep CALL

把CALL过滤出来:

  test.go:118           0x1092f55               e81612f7ff              CALL runtime.makechan(SB)  test.go:119           0x1092f74               e82714f7ff              CALL runtime.chansend1(SB)  test.go:120           0x1092f8e               e80d14f7ff              CALL runtime.chansend1(SB)  test.go:121           0x1092fa5               e8361ff7ff              CALL runtime.chanrecv1(SB)  test.go:122           0x1092fbd               e85e1ff7ff              CALL runtime.chanrecv2(SB)  test.go:126           0x1092fd7               e8841cf7ff              CALL runtime.closechan(SB)  test.go:124           0x1092fea               e8b156f7ff              CALL runtime.convT64(SB)  print.go:275          0x1093041               e88a98ffff              CALL fmt.Fprintln(SB)  test.go:47            0x1093055               e896c1fbff              CALL runtime.morestack_noctxt(SB)

创建

从上面的编译分析可以看出在创建channel时调用了运行时方法makechan:

func makechan(t *chantype, size int) *hchan     elem := t.elem    // compiler checks this but be safe.    if elem.size >= 1<<16         throw("makechan: invalid channel element type")        if hchanSize%maxAlign != 0 || elem.align > maxAlign         throw("makechan: bad alignment")        // 计算缓冲区需要的总大小(缓冲区大小*元素大小),并判断是否超出最大可分配范围    mem, overflow := math.MulUintptr(elem.size, uintptr(size))    if overflow || mem > maxAlloc-hchanSize || size < 0         panic(plainError("makechan: size out of range"))        // Hchan does not contain pointers interesting for GC when elements stored in buf do not contain pointers.    // buf points into the same allocation, elemtype is persistent.    // SudoG's are referenced from their owning thread so they can't be collected.    // TODO(dvyukov,rlh): Rethink when collector can move allocated objects.    var c *hchan    switch     case mem == 0:        // 缓冲区大小为0,或者channel中元素大小为0(struct)时,只需分配channel必需的空间即可        // Queue or element size is zero.        c = (*hchan)(mallocgc(hchanSize, nil, true))        // Race detector uses this location for synchronization.        c.buf = c.raceaddr()    case elem.kind&kindNoPointers != 0:        // 通过位运算知道channel中元素类型不是指针,分配一片连续内存空间,所需空间等于 缓冲区数组空间 + hchan必需的空间。        // Elements do not contain pointers.        // Allocate hchan and buf in one call.        c = (*hchan)(mallocgc(hchanSize+mem, nil, true))        c.buf = add(unsafe.Pointer(c), hchanSize)    default:        // 元素中包含指针,为hchan和缓冲区分别分配空间        // Elements contain pointers.        c = new(hchan)        c.buf = mallocgc(mem, elem, true)        c.elemsize = uint16(elem.size)    c.elemtype = elem    c.dataqsiz = uint(size)    if debugChan         print("makechan: chan=", c, "; elemsize=", elem.size, "; elemalg=", elem.alg, "; dataqsiz=", size, "\\n")        return c

makechan的代码逻辑还是比较简单的,首先校验元素类型和缓冲区空间大小,然后创建hchan,分配所需空间。这里有三种情况:当缓冲区大小为0,或者channel中元素大小为0时,只需分配channel必需的空间即可;当channel元素类型不是指针时,则只需要为hchan和缓冲区分配一片连续内存空间,空间大小为缓冲区数组空间加上hchan必需的空间;默认情况,缓冲区包含指针,则需要为hchan和缓冲区分别分配内存。最后更新hchan的其他字段,包括elemsize,elemtype,dataqsiz。

发送

未完待续

转自:https://juejin.im/post/5decff136fb9a016544bce67

以上是关于深入理解Golang之channel的主要内容,如果未能解决你的问题,请参考以下文章

深入理解Golang中的Context包

深入理解Golang中的Context包

golang源码之channel

golang并发编程之channel

Golang入门到项目实战 golang并发变成之通道channel

golang之goroutine和channel