Go并发模式:管道与取消

Posted 一面千人

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Go并发模式:管道与取消相关的知识,希望对你有一定的参考价值。

关键字:Go语言,管道,取消机制,并发,sync.WaitGroup,包引用,通道,defer,select

GO并发模式:管道与取消

简介

Go的并发能力可以使构建一个流数据管道变得非常容易,并且可以高校地使用机器I/O和多核处理器。这篇文章展示了一些例子,包括管道,对操作失败的处理技术。

管道的概念

在Go里,并没有正式的管道的定义,它只是众多并发程序其中的一个。通俗来讲,一个管道是一系列由通道连接的阶段,每个阶段都是一组运行着同样函数的goroutine。在每个阶段里,goroutine在干着:

  1. 通过接入通道(inbound channels)接收上游流下来的值
  2. 对这些数据执行某个函数,通常会产生新的值
  3. 通过导出通道(outbound channels)下游发送值
  • 第一个阶段也叫source或者producer
  • 最后一个阶段也叫sink或者consumer
    以上这两个阶段都只能有一个通道,或者是接入通道或者是导出通道,不能同时拥有这两种。而其他每个阶段都可以共同拥有任意数量的接入通道和导出通道。

一个用来学习的例子

下面我们将展开一个简单的管道例子,来阐述其中的思想和技术,后面会有实际的例子。

平方函数

直接看代码中注释。

注意goroutine是函数体内并发,有一个壳sandbox扣着它。

// 要想run,必须package main,默认是文件夹目录名,要更改一下
package main

import "fmt"

// 设想一个拥有三个阶段的管道

/*
 * First Stage: gen
 * params: 一个以逗号分隔的整数列表,数量不限
 * return: 一个通道,包含参数中整数列表的通道
 */
func gen(nums ... int) <-chan int {
	out := make(chan int)
	// 通过一个goroutine来将参数中的每个整数发送到通道中去。
	go func() {
		for _, n := range nums {
			out <- n
		}
		close(out) // close方法作为上面的for循环的终止条件,不能省略。
	}()
	return out
}

/*
 * Second Stage: sq
 * params: 一个包含参数中整数列表的通道
 * return: 一个包含将参数通道中每个整数平方后的列表的通道
 * note: 因为参数和返回值的类型都是相同的整型通道,所以可以反复嵌套该方法。
 */
func sq(in <-chan int) <-chan int {
	out := make(chan int)
	go func() {
		for n := range in {
			out <- n * n // 平方
		}
		close(out)
	}()
	return out
}

/*
 * Final Stage: main
 * 是一个main函数,没有参数也没有返回值,它相当于客户端调用
 */
func main() {
	c := gen(2, 3) // 建立通道
	out := sq(c)   // 通道处理
	// 上面传入两个值2和3,那么这里就要对应的消费两次输出
	fmt.Println(<-out)
	fmt.Println(<-out)
	// 嵌套sq
	for n := range sq(sq(gen(1, 2, 4, 5))) {
		fmt.Println(n)
	}
}

// output:
//  4
//	9
//	1
//	16
//	256
//	625

Fan-out和Fan-in

  • Fan-out,扇出。多个函数可以读取同一个通道直到该通道关闭。可让一群工人并用CPU和IO
  • Fan-in,扇入。一个函数可以读取多个输入,每个输入被多路复用到一个独立的通道上,当所有输入被关闭时,这个通道也会被关闭,同时它也会关掉这个函数的使用权。

下面我们将运行连个sq函数的实例,都会读取同一个输入通道,我们将使用一个新函数,叫做merge,来扇入多个结果。

向一个已关闭的通道发送值,会引起通道panic错误,所以引入了sync.WaitGroup功能来控制当所有发送行为结束以后关闭通道。

sync.WaitGroup

sync.WaitGroup像java的倒计时锁,首先我们定义它的Wait方法设置一个锁到某个并发程序中,然后通过Add方法定义计数器大小CounterSize,该大小为最多发送数据到通道的执行次数,每次执行结束要通过Done方法来使CounterSize减一,直到CounterSize为0,上面我们定义的Wait才会释放锁。

注意,WaitGroup的计数器大小CounterSize在初始化时默认为1,也就是说没调用Add之前,需要一次Done方法执行以后,Wait锁才会释放。

merge函数

func merge(cs ...<-chan int) <-chan int {
	var wg sync.WaitGroup
	// 定义一个独立通道out,将接收所有值
	out := make(chan int)

	// 将通道中所有值转到out
	output := func(c <-chan int) {
		for n := range c {
			out <- n
		}
		wg.Done()
	}

	wg.Add(len(cs))
	// 将merge参数中所有通道的值都合到唯一通道out上去
	for _, c := range cs {
		go output(c)
	}

	// 启动一个额外的goroutine(不会按照代码顺序执行,而是一进到merge就会启动)来等待直到所有通道Done以后关闭那个唯一通道out。
	go func() {
		wg.Wait()// 直到wg全都Done了才会继续执行。
		close(out)
	}()
	return out
}

Go的包引用问题

当我们要使用其他Go文件内部的函数时,会有两种处理方法:

  1. 将函数绑定到某个type下,然后在调用时创建那个type的实例即可调用。
  2. 将函数名首字母大写,我们就可以通过包名调用了。

以上两种方法都会存在一个问题,就是包引用问题,如果你找不到源码位置,调用其函数就无从谈起,那么如何正确的引用包呢?

注意,最容易引发混乱的是main函数,因为main函数是可执行Go文件的必须元素,同时必须是指定package也为main,因此我们尽量不要在main函数所在的Go文件中添加与main无关的内容,否则我们很难通过包名或者文件名定位函数的意思。

注意,Go中最没用的就是Go文件名了,包引用都是通过package。

正确的引用包是:将被调用函数所在文件,声明package为其所在文件夹名字,

注意,所有的该文件夹下的Go文件的package声明必须为同一个,不能出现第二个值,对外部调用者来讲,这些文件好似在一起,都是从一个package读取,并无区分。

然后在调用函数的地方import进来被调用函数声明的package即可。

所以总结一下,文件夹名即包名,文件夹内给Go文件起名要能够解释清楚文件内容,main函数文件指定到有意义的文件夹下,导入所需函数包。

main函数

func main() {
	in := pipeline.Gen(2, 3)
	c1 := pipeline.Sq(in)
	c2 := pipeline.Sq(in)
	// 将c1和c2通道内的值合并到一起
	for n := range merge(c1, c2) {
		fmt.Println(n)
	}
}

// Output:
//	4
//	9

等价于

out:= merge(c1,c2)
fmt.Println(<-out)
fmt.Println(<-out)
fmt.Println(<-out)// 第三次输出,通道已无值,输出零值,如果通道输出完毕时即关闭的话,这一行会报错
// Output:
//	4
//	9
//	0

发现问题?发送次数少于接收次数

上面的管道函数有一个模式:

  • 所有的发送操作完成时,阶段会关闭他们的导出通道。
  • 阶段会一直从导入通道中接收值,直到那些通道被关闭。

这个模式允许每个接收的阶段可以被作为一个range循环写入,并且保证一旦所有的值都已经成功发送下游,所有的goroutine退出。

但是在真实的管道里,阶段不会总是能接收到所有的导入值。有时候这是由于一个设计:

接收者可能只需要一个值的子集来取得进展。

更常见的是,一个阶段早早退出是因为一个导入值代表了一个更早阶段的error。在这两种情况下,接收方不应该等待其余值的到达,并且我们想要更早的阶段来停止那些后期阶段不需要的生产时期的值。

在我们的例子中,

out:= merge(c1,c2)
fmt.Println(<-out)
// Output:
//	4

实际上out通道中还有一个9没有被输出,通道的值此时没有被完全消费,这时goroutine就会不断尝试发送该值,并且会被无限期阻塞。

这是一个资源泄露,goroutine消耗内存和运行时资源,并且在goroutine栈中的堆引用会一直防止数据被垃圾收集器回收。goroutine不可被垃圾收集,只能必须靠自己exit。

所以,我们需要找到方式,能够在下游阶段接收所有导入值失败的时候,上游阶段的管道仍旧能够退出:

  1. 一种方式是改变导出通道让它又有一个buffer缓冲区,一个缓冲区能够持有一个固定数量的值,如果缓冲区内仍有空间,发送操作就立即完成。

缓冲区的内容我们在前面的文章中有仔细介绍。

总之就是可以释放goroutine的发送操作到缓冲区,不会被无限期阻塞。

在我们的管道中,返回到被阻塞的goroutine,我们可能考虑到添加一个缓冲区到merge函数返回的导出通道:

out := make(chan int, 1)// 增加一个缓冲区,可以存放通道中未发送的值

但是问题仍在发生,我们这里是因为知道我们上面只写了一遍发送,而通道已知有两次接收值,所以我们可以这么干,但是这个代码是不好的,易碎的,一旦条件发生改变,就要对代码进行调整。

因此,我们需要为下游阶段提供一种方式来象征发送者,来停止接收输入。

明确的取消机制

当main函数决定退出,而不再接收任何out通道的值的时候,它必须告诉上游的goroutine,放弃他们试图发送的值。

在一个通道中如此操作发送值,被称作done。

它发送两个值因为有两个潜在的阻塞发送者。

我们修改merge,给它加入一个参数是struct{}结构体通道。然后修改merge中的output函数,将原来的

out <- n:

替换为:

select {
case out <- n:
case <-done:
}

意思是:如果n还有未发送的值,就正常发送,如果done有未发送的值就发送done。然后我们再修改一下main函数:

done := make(chan struct{}, 2)
out := merge(done, c1, c2)
fmt.Println(<-out)
done <- struct{}{}
done <- struct{}{}

当out只被输出一次的时候,此时循环还剩两次(总共三次,因为merge函数的参数有三个通道,会循环三次),为了避免循环阻塞在out输出的位置,我们给done通道传入了结构体零值,merge函数中那个循环就会放弃发送out值,而去执行done的发送。

但是问题仍在继续,这里仍旧是因为我们预知通道接收次数,以及发送放空次数,所以可以写出这个顺序和次数,这仍旧是易碎的,本质上除了让我们学习了一下这种写法,与上面发生的无异。

我们需要一种方式,可以在未知goroutine数量,未知通道大小的情况下,随时按需阻止下游阶段发送未发送完毕的通道。

因为接收操作在一个封闭的通道可以总是立即执行,产生类元素的零值。

这就意味着main函数能够对所有被done通道关闭的发送者解除阻塞。这实际上是一个广播信号发送者。我们扩展管道功能的each来接收done作为一个参数来安排通过defer来延迟关闭,以便所有的main函数的返回路径能够发送信号到管道阶段去退出。

先看merge函数:

defer wg.Done()
for n := range c {
	select {
	case out <- n:
	case <-done:
		return
	}
}

我们在for循环前面加入了一行sync.WaitGroup的Done的延迟方法,然后修改了select内部当done可被输出时,直接结束merge函数(更别提跳出循环了),直接执行defer的wg.Done去掉一次计数器数值。然后看main函数:

func main() {
	done := make(chan struct{})
	defer close(done)
	in := pipeline.Gen(2, 3)
	c1 := pipeline.Sq(in)
	c2 := pipeline.Sq(in)
	out := merge(done, c1, c2)
	fmt.Println(<-out)
}

首先我们去掉了done通道的缓冲区,加了一行关闭done通道的延迟操作。当代码执行玩fmt的一次输出以后,main函数执行完毕,会调用defer关闭done通道,回到merge函数中,done通道被关闭以后,case ->done被执行merge函数执行完毕,执行wg.Done()。

总结

本文详细阐述了Go管道的概念,是有三组动作:生产通道,处理通道,使用通道,这三组动作实现了Go的管道。通过一个例子我们搞清楚了管道的含义,接着又介绍了Fan-out,是关于多个函数对同一个通道的操作,以及一个函数对多个通道的操作(例子中使用了merge,将多个通道合并为一个)。这期间,我们研究了sync.WaitGroup以及Go语言中的包引用特性。最后,我们在例子中发现了管道并发的问题,并循序渐进地找到了解决方法,在此期间,让我们加深了对defer,管道,通道,select的理解。

参考资料

  • Go官方文档

源码位置

更多请转到醒者呆的博客园

以上是关于Go并发模式:管道与取消的主要内容,如果未能解决你的问题,请参考以下文章

Go并发模型:流水线与取消(Pipelines and cancellation译文)

Go并发模型:流水线与取消(Pipelines and cancellation译文)

Golang并发(Go程管道)

Go语言管道

gin go并发案例

Go语言协程并发---管道信号量应用