Goroutines和Channels

Posted wujuntian

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Goroutines和Channels相关的知识,希望对你有一定的参考价值。

Go语言中的并发程序可以用两种手段来实现,第一种是传统的并发模型,多线程共享内存,第二种则是现代的并发模型,顺序通信进程(CSP),Go语言使用goroutine和channel来支持顺序通信进程。


一、Goroutines
1. 在Go语言中,每一个并发的执行单元叫作一个goroutine。
2. main goroutine:当一个程序启动时,其主函数即在一个单独的goroutine中运行,称为main goroutine。
3. goroutine创建:新的goroutine使用go语句来创建。在语法上,go语句是一个普通的函数或方法调用前加上关键字go。go语句会使其语句中的函数在一个新创建的goroutine中运行,而go语句本身会迅速地完成。

技术图片

4. 主函数返回时,所有的goroutine都会被直接打断,程序退出。


二、Channels
如果说goroutines是Go语言程序的并发体的话,那么channels就是它们之间的通信机制。
通过channels,一个goroutine可以给另一个goroutine发送值信息。
每个channel能发送的值类型只有一种。
1. channels创建

技术图片

channel是引用类型变量。
2. channels比较
两个相同类型的channel可以使用==运算符比较。如果两个channel引用的是相同的对象,那么比较的结果为真。一个channel也可以和nil进行比较。
3. channels数据发送与接收

技术图片

一个不使用接收结果的接收操作也是合法的。
4. channels关闭

技术图片

基于已经关闭的channel的任何发送操作都将导致panic异常。
基于已经关闭的channel执行接收操作依然可以接收到之前已经发送成功的数据,如果channel中已经没有数据的话,后续接收操作将不再阻塞,而是立即返回一个零值。
5. 不带缓存的channels
一个基于无缓存channel的发送操作将导致发送者goroutine阻塞,直到另一个goroutine在相同的channel上执行接收操作,当发送的值通过channel成功传输以后,两个goroutine才能继续执行后面的语句。反之,如果接收操作先发生,那么接收者goroutine也将阻塞,直至有另一个goroutine在相同的channel上执行发送操作。
基于无缓存channel的发送和接收操作将导致两个goroutine做一次同步操作,因为这个原因,无缓存channel有时也被称为同步channel。
注:
有些消息事件并不携带额外的信息,它们仅仅是用作两个goroutine之间的同步,这时候可以使用struct空结构体作为Channels元素的类型。
6. 串联的channels(pipeline)
channels也可以用于将多个goroutine链接在一起,一个channel的输出作为下一个channel的输入。这种串联的channels就是所谓的管道(pipeline)。
7. 单方向的channel
chan<- int:只发送int的channel,只能发送不能接收
<-chan int:只接收int的channel,只能接收不能发送
这种限制将在编译期检测。
注:
因为关闭操作只用于断言不再向channel发送新的数据,所以只有在发送者所在的goroutine才会调用close函数,因此对一个只接收的channel调用close将是一个编译错误。
任何双向channel向单向channel变量的赋值都将导致隐式转换,从双向channel转换为单向channel。不存在反向转换的语法。
8. 带缓存的channels
带缓存的channel内部持有一个元素队列。

技术图片

channel内部缓存的容量:cap(ch)
channel内部缓存的长度:len(ch)
基于带缓存channel的发送操作就是向其内部缓存队列的尾部插入元素,接收操作则是从队列的头部删除元素。如果内部缓存队列是满的,那么发送操作将阻塞直到另一个goroutine执行接收操作而释放了新的队列空间。相反,如果channel是空的,接收操作将阻塞直到有另一个goroutine执行发送操作而向队列插入元素。
channel的缓存队列解耦了接收和发送的goroutine。


三、并发的循环
1. goroutine泄露
某个channel的内部缓存队列满了,但是没有goroutine向这个channel执行接收操作,从而导致向这个channel发送值的所有goroutine都永远阻塞下去,并且永远都不会退出。这种情况,称为goroutine泄露,这将是一个bug。和垃圾变量不同,泄露的goroutine并不会被自动回收,因此确保每个不再需要的goroutine都能正常退出是很重要的。
2. sync.WaitGroup
有时候,我们需要多个goroutine都运行结束以后做一些事情,比如关闭一个channel。Go语言提供了一种特殊的计数器,用于检测多个goroutine是否都已运行结束。它在每一个goroutine启动时自增1,在每一个goroutine退出时自减1,且会一直等待直至计数器自减为0,即表示所有goroutine都已运行结束。
使用示例:

func makeThumbnails(filenames <-chan string) int64 
	sizes := make(chan int64)
	var	wg sync.WaitGroup//number of working goroutines
	for	f := range filenames 
		wg.Add(1)//必需在worker goroutines开始之前调用,才能保证Add()是在closer goroutine调用Wait()之前被调用
		//worker goroutines
		go func(f string) 
			defer wg.Done()//使用defer来确保计数器即使是在程序出错的情况下依然能够正确地自减
			thumb, err := thumbnail.ImageFile(f)
			if err != nil 
				log.Println(err)
				return
			
			info, _ := os.Stat(thumb)
			sizes <- info.Size()
		(f)
	
	//closer goroutine
	go func() 
		wg.Wait()
		close(sizes)
	()
	var	total int64
	for	size := range sizes 
		total += size
	
	return total

 

四、基于select的多路复用
有时候我们需要等待多个channel的其中一个返回事件,但是我们又无法做到从每一个channel中接收信息。假如我们试图从其中一个channel中接收信息,而这个channel又没有返回信息,那么程序会立刻被阻塞,从而无法收到其他channel返回的信息。这时,基于select的多路复用就派上用场了。

技术图片

select会一直等待直至有能够执行的case分支才去执行这个case分支。当条件满足时,select才会去通信并执行case之后的语句;这时候其它通信是不会执行的。一个没有任何case的select语句写作select,会永远地等待下去。
如果多个case同时就绪时,select会随机地选择一个执行,这样来保证每一个channel都有平等的被select的机会。
channel的零值是nil,对一个nil的channel发送和接收操作会永远阻塞,在select语句中操作nil的channel永远都不会被select到。

 

五、消息广播
有时候,我们需要将某个消息广播通知所有运行中的goroutine,但是我们又无法知道goroutine的数量,这时候可以使用这种策略:创建一个空的channel,将从这个channel中接收信息的操作加入基于select的多路复用,由于这时channel是空的,所有的goroutine都无法从中接收到值;当我们需要广播消息的时候,关闭这个channel,这样所有的goroutine都能从中接收到零值,以此表示该消息已传达。(一个channel只能用于表达一种消息)

 

六、一个综合运用的实例程序——并发的字典遍历

/**
*广度遍历目录文件,计算文件数量与大小
*每隔0.5秒输出已计算的文件数量与大小
*当标准输入有值时,中断程序运行
**/
package main

import (
	"flag"
	"fmt"
	"io/ioutil"
	"os"
	"path/filepath"
	"sync"
	"time"
)

//广度遍历目录文件,计算文件数量与大小
func walkDir(dir string, n *sync.WaitGroup, fileSizes chan<- int64) 
	defer n.Done()
	if cancelled() 
		return
	
	for _, entry := range dirents(dir) 
		if entry.IsDir() 
			n.Add(1)
			subdir := filepath.Join(dir, entry.Name())
			go walkDir(subdir, n, fileSizes)
		 else 
			fileSizes <- entry.Size()
		
	


/*
*目的:控制同一时刻打开的文件数量,防止程序占用太多资源
*实现:创建一个带缓存的channel,通过channel的缓存队列大小控制同一时刻打开的文件数量。
*读取文件之前向该channel发送一个信号量,读取完以后向该channel接收一个信号量,若是channel的缓存
*队列满了,则会阻塞,无法继续打开文件。
 */
var sema = make(chan struct, 2)

func dirents(dir string) []os.FileInfo 
	select 
	case sema <- struct:
	case <-done:
		return nil
	
	defer func()  <-sema ()
	entries, err := ioutil.ReadDir(dir)
	if err != nil 
		fmt.Fprint(os.Stderr, "du1: %\\n", err)
		return nil
	
	return entries


/*
*目的:读取标准输入,只要有值则停止运行程序
*实现:创建一个空channel,每个goroutine读取文件之前都先从channel中接收值,如果阻塞,表示程序可以继续运行,
*当标准输入有值时,关闭该channel,所有未读取文件的goroutine从channel中接收到零值,中断程序运行。
 */
var done = make(chan struct)

func cancelled() bool 
	select 
	case <-done:
		return true
	default:
		return false
	


func main() 
	//读取标准输入,只要有输入则停止程序
	go func() 
		os.Stdin.Read(make([]byte, 1))
		close(done)
	()

	flag.Parse()
	roots := flag.Args()
	if len(roots) == 0 
		roots = []string"."
	
	//用于发送文件大小的channel
	fileSizes := make(chan int64)

	/*
	*目的:当读取文件大小的所有goroutine都结束运行时,关闭用于发送文件大小的channel
	*实现:使用特殊计数器sync.WaitGroup,每次新增一个goroutine则自增1,每次结束一个goroutine
	*则自减1,当其自减为0时,表示所有goroutine都已结束运行,可以关闭channel了
	 */
	var n sync.WaitGroup
	for _, root := range roots 
		n.Add(1)
		go walkDir(root, &n, fileSizes)
	
	go func() 
		n.Wait()
		close(fileSizes)
	()

	/*
	*目的:每隔0.1秒输出文件数量与大小计算的结果
	*实现:使用time.Tick函数。time.Tick函数返回一个channel,程序会周期性地像一个节拍器一样向这个channel发送事件,
	*每一个事件的值是一个时间戳。当从该channel中接收到值的时候,就可以打印了。
	 */
	var tick <-chan time.Time
	tick = time.Tick(100 * time.Millisecond)

	var nfiles, nbytes int64
loop:
	for 
		select 
		case <-done:
			/*
			*程序结束,需要先吧fileSizes这个channel中的内容排空,以保证对walkDir的调用不会被向fileSizes发送信息阻塞住,
			*可以正确地完成,防止goroutine泄露。
			 */
			for range fileSizes 
			
		case size, ok := <-fileSizes:
			if !ok  //fileSizes这个channel已关闭,退出循环loop
				break loop
			
			nfiles++
			nbytes += size
		case <-tick:
			printDiskUsage(nfiles, nbytes)
		
	


//打印已经计算的文件数量与大小
func printDiskUsage(nfiles, nbytes int64) 
	fmt.Printf("%d files  %.1f GB\\n", nfiles, float64(nbytes)/1e9)

 

以上是关于Goroutines和Channels的主要内容,如果未能解决你的问题,请参考以下文章

golang 使用goroutines和channel异步获取url

[日常] Go语言圣经-Goroutines和线程

Goroutines和Channels

Goroutines和Channels

Goroutines和Channels

如何确保在 goroutines 中启动的 goroutines 彼此同步?