白话 Golang 协程池

Posted 恋喵大鲤鱼

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了白话 Golang 协程池相关的知识,希望对你有一定的参考价值。

在这里插入图片描述

1.何为并发

并发指在一段时间内有多个任务(程序,线程,协程等)被同时执行。注意,不是同一时刻。

在单处理机系统中,每一时刻仅能有一个任务被执行,故微观上这些任务只能是分时地交替执行。倘若在计算机系统中有多个处理机,则这些可以并发执行的任务被分配到多个处理机上同时执行,这就实现了并行。

并发 (Concurrency) 和 并行 ( Parallelism) 是不同的。这里引用 Erlang 之父 Joe Armstrong 对并发与并行区别的形象描述:
在这里插入图片描述

2.并发的好处

并发最直接的好处就是高效。

假如我们要做三件事情,一是吃饭,二是洗脚,三是看电视。如果串行执行,那么非常浪费时间。

如果改成并发执行,吃饭,洗脚,看电视同时进行,则非常节省时间。

3.Go 如何并发

Go 为并发而生。

Go 程(goroutine)是由 Go 运行时管理的轻量级线程。通过它我们可以轻松实现并发编程。

package main

import (
	"fmt"
	"sync"
	"time"
)

func eatFood() {
	fmt.Println("eat cost 3 seconds")
	time.Sleep(3 * time.Second)
}

func washFeet() {
	fmt.Println("wash feet cost 3 seconds")
	time.Sleep(3 * time.Second)
}

func watchTV() {
	fmt.Println("watch tv cost 3 seconds")
	time.Sleep(3 * time.Second)
}

func worker(task func(), wg *sync.WaitGroup) {
	defer wg.Done()
	task()
}

func main() {
	var wg sync.WaitGroup
	wg.Add(3)

	// 并发执行
	start := time.Now().Unix()
	go worker(eatFood, &wg)  // 创建一个 goroutine 执行吃饭任务
	go worker(washFeet, &wg) // 创建一个 goroutine 执行洗脚任务
	go worker(watchTV, &wg)  // 创建一个 goroutine 执行看电视任务
	wg.Wait()                // 等待三个 goroutine 全部执行完成
	fmt.Printf("total only cost %v seconds", time.Now().Unix()-start)
}

运行输出:

wash feet cost 3 seconds
eat cost 3 seconds
watch tv cost 3 seconds
total only cost 3 seconds

4.G-P-M 调度模型

说到 Go 程,不得不说 Go 程的调度。

Go 优秀的并发性能得益于出色的基于 G-M-P 模型的 Go 程调度器,官方宣称用 Golang 写并发程序的时候随便起个成千上万的 goroutine 毫无压力。

Go 程的调度模型是 G-P-M,通过 P(Processor,逻辑处理器)将 G(Goroutine,用户线程)与 M(Machine,系统线程)解耦,我们可以随意开启多个 G 交由调度器分配到 P,再通过 P 将 G 交由 M 来完成执行。

Go 调度器的基本模型:
在这里插入图片描述

  • 每个 P 维护一个 G 的本地队列;

  • 当一个 G 被创建出来,或者变为可执行状态时,优先把它放到 P 的本地队列中,否则放到全局队列;

  • 当一个 G 在 M 里执行结束后,P 会从队列中把该 G 取出;如果此时 P 的队列为空,即没有其他 G 可以执行, M 会先尝试从全局队列寻找 G 来执行,如果全局队列为空,它会随机挑选另外一个 P,从它的队列里拿走一半 G 到自己的队列中执行。

注意: P 的数量在默认情况下,会被设定为 CPU 的核数。而 M 虽然需要跟 P 绑定执行,但数量上并不与 P 相等。这是因为 M 会因为系统调用或者其他事情被阻塞,因此随着程序的执行,M 的数量可能增长,而 P 在没有用户干预的情况下,则会保持不变。

5.Go 程的代价

Go 程是轻量级线程,使用简单,但它是 Go 为我们提供的免费午餐吗?

天下哪有免费的午餐,Go 程也不例外。

Go 程虽然轻量,但仍有开销。

Go 的开销主要是三个方面:创建(占用内存)、调度(增加调度器负担)和删除(增加 GC 压力)。

内存开销:

空间上,一个 Go 程占用约 2K 的内存,在源码src/runtime/runtime2.go里面,我们可以找到 Go 程的结构定义type g struct

调度开销:

时间上,协程调度也会有 CPU 开销。我们可以利用runntime.Gosched()让当前协程主动让出 CPU 去执行另外一个协程,下面看一下协程之间切换的耗时。

const NUM = 10000

func cal() {
	for i := 0; i < NUM; i++ {
		runtime.Gosched()
	}
}

func main() {
	// 只设置一个 Processor
	runtime.GOMAXPROCS(1)
	start := time.Now().UnixNano()
	go cal()
	for i := 0; i < NUM; i++ {
		runtime.Gosched()
	}
	end := time.Now().UnixNano()
	fmt.Printf("total %vns per %vns", end-start, (end-start)/NUM)
}

运行输出:

total 997200ns per 99ns

可见一次协程的切换,耗时大概在 100ns,相对于线程的微秒级耗时切换,性能表现非常优秀,但是仍有开销。

GC 开销:
创建 Go 程到运行结束,占用的内存资源是需要由 GC 来回收,如果无休止地创建大量 Go 程后,势必会造成对 GC 的压力。

package main

import (
	"fmt"
	"runtime"
	"runtime/debug"
	"sync"
	"time"
)

func createLargeNumGoroutine(num int, wg *sync.WaitGroup) {
	wg.Add(num)
	for i := 0; i < num; i++ {
		go func() {
			defer wg.Done()
		}()
	}
}

func main() {
	// 只设置一个 Processor 保证 Go 程串行执行
	runtime.GOMAXPROCS(1)
	// 关闭GC改为手动执行
	debug.SetGCPercent(-1)

	var wg sync.WaitGroup
	createLargeNumGoroutine(1000, &wg)
	wg.Wait()
	t := time.Now()
	runtime.GC() // 手动GC
	cost := time.Since(t)
	fmt.Printf("GC cost %v when goroutine num is %v\\n", cost, 1000)

	createLargeNumGoroutine(10000, &wg)
	wg.Wait()
	t = time.Now()
	runtime.GC() // 手动GC
	cost = time.Since(t)
	fmt.Printf("GC cost %v when goroutine num is %v\\n", cost, 10000)

	createLargeNumGoroutine(100000, &wg)
	wg.Wait()
	t = time.Now()
	runtime.GC() // 手动GC
	cost = time.Since(t)
	fmt.Printf("GC cost %v when goroutine num is %v\\n", cost, 100000)
}

运行输出:

GC cost 0s when goroutine num is 1000
GC cost 2.0027ms when goroutine num is 10000
GC cost 30.9523ms when goroutine num is 100000

当创建的 Go 程数量越多,GC 耗时越大。

上面的分析目的是为了尽可能地量化 goroutine 的开销。虽然官方宣称用 Golang 写并发程序的时候随便起个成千上万的 goroutine 毫无压力,但当我们起十万、百万甚至千万个 goroutine 呢?goroutine 轻量的开销将被放大。

6.协程池的作用

无休止地创建大量 goroutine,势必会因为对大量 go 程的创建、调度和销毁带来性能损耗。

为了解决这个问题,可以引入协程池。

使用协程池限制 Go 程的开辟个数在大型并发场景是有必要的,这也是性能优化方法中对象复用思想的一个具体应用。

7.简易协程池的设计&实现

一个简单的协程池可以这么设计。

(1)定义一个接口表示任务,每一个具体的任务实现这个接口。

(2)使用 channel 作为任务队列,当有任务需要执行时,将这个任务插入到队列中。

(3)开启固定的协程(worker)从任务队列中获取任务来执行。

结构如下:
在这里插入图片描述
上面这个协程池的特点:

(1)Go 程数量固定。可以将 worker 的数量设置为最大同时并发数 runtime.NumCPU()。

(2)Task 泛化。提供任务接口,支持多类型任务,不同业务场景下只要实现任务接口便可以提交到任务队列供 worker 调用。

(3)简单易用。设计简约,实现简单,使用方便。

下面给出一个实现和使用示例:

package main

import (
	"fmt"
	"runtime"
	"sync"
	"time"
)

// Task 任务接口
type Task interface {
	Execute()
}

// Pool 协程池
type Pool struct {
	TaskChannel chan Task // 任务队列
}

// NewPool 创建一个协程池
func NewPool(cap ...int) *Pool {
	// 获取 worker 数量
	var n int
	if len(cap) > 0 {
		n = cap[0]
	}
	if n == 0 {
		n = runtime.NumCPU()
	}

	p := &Pool{
		TaskChannel: make(chan Task),
	}

	// 创建指定数量 worker 从任务队列取出任务执行
	for i := 0; i < n; i++ {
		go func() {
			for task := range p.TaskChannel {
				task.Execute()
			}
		}()
	}
	return p
}

// Submit 提交任务
func (p *Pool) Submit(t Task) {
	p.TaskChannel <- t
}

// EatFood 吃饭任务
type EatFood struct {
	wg *sync.WaitGroup
}

func (e *EatFood) Execute() {
	defer e.wg.Done()
	fmt.Println("eat cost 3 seconds")
	time.Sleep(3 * time.Second)
}

// WashFeet 洗脚任务
type WashFeet struct {
	wg *sync.WaitGroup
}

func (w *WashFeet) Execute() {
	defer w.wg.Done()
	fmt.Println("wash feet cost 3 seconds")
	time.Sleep(3 * time.Second)
}

// WatchTV 看电视任务
type WatchTV struct {
	wg *sync.WaitGroup
}

func (w *WatchTV) Execute() {
	defer w.wg.Done()
	fmt.Println("watch tv cost 3 seconds")
	time.Sleep(3 * time.Second)
}

func main() {
	p := NewPool()
	var wg sync.WaitGroup
	wg.Add(3)
	task1 := &EatFood{
		wg: &wg,
	}
	task2 := &WashFeet{
		wg: &wg,
	}
	task3 := &WatchTV{
		wg: &wg,
	}
	p.Submit(task1)
	p.Submit(task2)
	p.Submit(task3)
	// 等待所有任务执行完成
	wg.Wait()
}

运行结果:

wash feet cost 3 seconds
watch tv cost 3 seconds
eat cost 3 seconds

设计时,我们也可以将任务队列中的任务设计为无参匿名函数,这样子使用起来可能会更简单。

// Pool 协程池
type Pool struct {
	TaskChannel chan func() // 任务队列
}

上面这个协程池,设计简约,实现和使用起来也比较简单方便,但是严格来说,其并不是一个成熟的协程池,因为并没有提供 worker 与 go 程池的状态控制能力,worker 数量也无法根据节点算力和业务晚高峰时进行动态的扩增和缩减。如果没有动态扩缩容的能力,那么很有可能出现 go 程的并发量不足以完全利用节点的算力,或者请求量不足的情况下,出现部分 go 程长期空闲的情况。

总地来说上面简易协程池的不足:
(1)无法知道 worker 与 pool 的状态;
(2)worker 数量不足无法动态扩增;
(3)worker 数量过多无法自动缩减。

8.开源协程池的使用

一个成熟的协程池应该具有如下能力:

(1)worker & pool 状态控制;
性能测试、任务超时等都需要知道和控制任务与 Go 程池的状态。

(2)无锁化操作;
在 worker 和 pool 的状态读写时使用 atomic 原子操作,避免多次上锁带来性能损耗。

(3)动态可伸缩;
根据实际请求量的大小,动态扩缩容以避免 Go 程数量出现过少或过多的情况。

(4)资源复用。
对回收的 worker 放到 sync.Pool 进行复用,而不是直接销毁,降低 GC 压力,提高性能。

工程实践中,建议使用业界开源成熟的协程池组件。

目前有很多第三方库实现了协程池,可以很方便地用来控制协程的并发数量,比较受欢迎的有:
(1)Jeffail/tunny
(2)panjf2000/ants

下面以 panjf2000/ants 为例,简单介绍其使用。

ants 是一个高性能的 goroutine 池,实现了对大规模 goroutine 的调度管理、goroutine 复用,允许使用者在开发并发程序的时候限制 goroutine 数量,复用资源,达到更高效执行任务的效果。

还是以上面吃饭,洗脚和看电视三个任务为例,演示 ants 的使用。

package main

import (
	"fmt"
	"sync"
	"time"

	"github.com/panjf2000/ants"
)

func eatFood() {
	fmt.Println("eat cost 3 seconds")
	time.Sleep(3 * time.Second)
}

func washFeet() {
	fmt.Println("wash feet cost 3 seconds")
	time.Sleep(3 * time.Second)
}

func watchTV() {
	fmt.Println("watch tv cost 3 seconds")
	time.Sleep(3 * time.Second)
}

func main() {
	var wg sync.WaitGroup
	taskEatFood := func() {
		defer wg.Done()
		eatFood()
	}
	taskWashFeet := func() {
		defer wg.Done()
		washFeet()
	}
	taskWatchTV := func() {
		defer wg.Done()
		watchTV()
	}
	t := time.Now()
	wg.Add(3)
	// Use the common pool
	ants.Submit(taskEatFood)
	ants.Submit(taskWashFeet)
	ants.Submit(taskWatchTV)
	wg.Wait()
	fmt.Printf("total only cost %v\\n", time.Since(t))
}

运行输出:

watch tv cost 3 seconds
eat cost 3 seconds
wash feet cost 3 seconds
total only cost 3.0012512s

更多信息请参阅 github.com/panjf2000/ants

9.小结

资源复用是高性能编程的基本方法之一,在高并发场景,我们可以使用协程池来复用协程提高程序性能。

其他诸如:
无锁: 尽量不要加锁对某个变量读写,而应该分多个变量单独读写;

缓存: 网络IO是接口耗时的主要部分,对实时性要求不高的业务场景,可以增加本地缓存降低接口耗时;

减包: 限制请求时分页大小,减小回包大小,降低打解包对 CPU 的消耗。

这些方法都是编写高性能程序的有效手段,本文不过多探讨,感兴趣的同学可自行探索实践。


参考文献

[1] 知乎.Go 为什么这么快
[2] 潘建锋.Goroutine 并发调度模型深度解析之手撸一个高性能 goroutine 池
[3] Dmitry Vyukov.Go Preemptive Scheduler Design Doc
[4] 张彦飞.协程究竟比线程能省多少开销?
[5] 博客园.go runtime.Gosched()的作用分析
[6] 书栈网.GC 的认识
[7] Go 语言高性能编程.控制协程(goroutine)的并发数量

以上是关于白话 Golang 协程池的主要内容,如果未能解决你的问题,请参考以下文章

Golang协程池(workpool)实现

深入浅出Golang的协程池设计

[Golang]实现一个带有等待和超时功能的协程池 - 类似Java中的ExecutorService接口实现

[Golang]实现一个带有等待和超时功能的协程池 - 类似Java中的ExecutorService接口实现

[Golang]实现一个带有等待和超时功能的协程池 - 类似Java中的ExecutorService接口实现

Go协程与协程池