golang 高手才会的协程数量控制套路总结

Posted 文大侠666

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了golang 高手才会的协程数量控制套路总结相关的知识,希望对你有一定的参考价值。

尽管Goroutine(协程)非常清轻量,但是本身也是占用资源的,过多协程切换也会带来开销,总之物极必反,无限制的开协程的结果只会是Game Over。生产实践中必须考虑控制协程数量,本文带你看看针对不同场景和需求的协程数量控制方式,看看这些姿势你都会了吗?

场景

如下,go中一个典型场景是,接受数据然后开协程处理,代码如下

// runTaskDataGenerator 产生数据
func runTaskDataGenerator(dataChan chan int) 
	for i := 0; i < 100; i++ 
		dataChan <- i
	

	close(dataChan)


// runInfiniteTask 每来一个数据起协程处理任务
func runInfiniteTask(dataChan <-chan int) 
	var wg sync.WaitGroup

	for data := range dataChan 
		wg.Add(1)
		go func(data int) 
			defer wg.Done()

			// do something
			time.Sleep(3 * time.Second)
		(data)
	

	wg.Wait()


func TestRunInfiniteTask(t *testing.T) 
	dataChan := make(chan int)

	go runTaskDataGenerator(dataChan)
	go runNumGoroutineMonitor()

	runInfiniteTask(dataChan)

runNumGoroutineMonitor监控协程数量

// runNumGoroutineMonitor 协程数量监控
func runNumGoroutineMonitor() 
	log.Printf("协程数量->%d\\n", runtime.NumGoroutine())

	for 
		select 
		case <-time.After(time.Second):
			log.Printf("协程数量->%d\\n", runtime.NumGoroutine())
		
	

运行结果如下,可以看到**有多少数据就会起多少协程,如果任务处理时间较长,短时间可能出现大量协程**,耗尽资源,需要控制协程数量。

=== RUN TestRunInfiniteTask
2023/01/05 10:36:12 协程数量->6
2023/01/05 10:36:13 协程数量->105
2023/01/05 10:36:14 协程数量->105
— PASS: TestRunInfiniteTask (3.00s)

固定个数协程并发处理任务

一般叫做Bounded/Fixed并发控制。

  • 优点是简单,不复杂的并发任务这样简单处理即可。
  • 缺点在于dataChan可能流量不不均衡,需要同时处理的任务多少在变动,但是对应的协程数量保持不变,要不就是任务处理堵塞要不就是存在多余的协程空闲
// runBoundedTask 起maxTaskNum个协程共同处理任务
func runBoundedTask(dataChan <-chan int, maxTaskNum int) 
	var wg sync.WaitGroup
	wg.Add(maxTaskNum)

	for i := 0; i < maxTaskNum; i++ 
		go func() 
			defer wg.Done()

			for data := range dataChan 
				func(data int) 

					// do something
					time.Sleep(3 * time.Second)
				(data)
			
		()
	

	wg.Wait()

动态个数协程并发处理任务

针对固定个数协程的缺点,一个思路是协程数量最好能够根据来的处理任务的多少,动态变更,指定一个并发上限,任务多时增加协程数量,任务少时减少协程数量。这里提供两种实现思路

自定义令牌池实现

令牌池维持最大允许并发任务数个令牌,每个任务启动时请求令牌,运行完成返回令牌。

// runDynamicTask 
// 最大同时运行maxTaskNum个任务处理数据
// 自定义令牌池维持maxTaskNum个令牌供竞争
func runDynamicTask(dataChan <-chan int, maxTaskNum int) 
	// 初始化令牌池
	tokenPool := make(chan struct, maxTaskNum)
	for i := 0; i < maxTaskNum; i++ 
		tokenPool <- struct
	

	var wg sync.WaitGroup

	for data := range dataChan 
		// 先获取令牌,如果被消费完则阻塞等待其它任务返还令牌
		<-tokenPool

		wg.Add(1)
		go func(data int) 
			defer wg.Done()

			// 任务运行完成,返还令牌
			defer func() 
				tokenPool <- struct
			()

			// do something
			time.Sleep(3 * time.Second)
		(data)
	

	wg.Wait()

信号量Semaphore实现

同上,令牌池换成信号量。

// runSemaphoreTask 
// 最大同时运行maxTaskNum个任务处理数据
// 使用信号量维持maxTaskNum个信号
func runSemaphoreTask(dataChan <-chan int, maxTaskNum int64) 
	w := semaphore.NewWeighted(maxTaskNum)

	var wg sync.WaitGroup

	for data := range dataChan 
		// 先获取信号量,如果被消费完则阻塞等待信号量返还
		_ = w.Acquire(context.TODO(), 1)

		wg.Add(1)
		go func(data int) 
			defer wg.Done()

			// 运行完成返还信号量
			defer w.Release(1)

			// do something
			time.Sleep(3 * time.Second)
		(data)
	

	wg.Wait()

指定处理速度并发处理任务

针对固定个数协程的缺点,另一个思路是借鉴限流器的实现,控制每个时刻最大允许协程数量也达到控制协程数量的目的。这里也提供两种实现思路

自定义令牌池实现

相当于一个简单限流器,指定速度生产令牌,每个任务启动时必须请求到令牌。

// runRateLimitTask 限制每秒允许的最大协程数量,限流器的思路
func runRateLimitTask(dataChan <-chan int) 
	// 初始化令牌池
	tokenPool := make(chan struct)
	go func() 
		for 
			select 
			// 动态控制令牌生成速度
			case <-time.After(time.Second):
				tokenPool <- struct
			
		
	()

	var wg sync.WaitGroup

	for data := range dataChan 
		// 先获取令牌,如果被消费完则阻塞等待新令牌产生
		<-tokenPool

		wg.Add(1)
		go func(data int) 
			defer wg.Done()

			// do something
			time.Sleep(3 * time.Second)
		(data)
	

	wg.Wait()

官方限流器实现

逻辑同上,每个任务启动必须先获取令牌。

// runRateLimitTask2 限制每秒允许的最大协程数量,使用官方限流器
func runRateLimitTask2(dataChan <-chan int) 
	// 初始化令牌池
	limit := rate.Every(time.Second) // 每秒一个
	limiter := rate.NewLimiter(limit, 10)

	var wg sync.WaitGroup

	for data := range dataChan 
		// 先获取令牌,如果被消费完则阻塞等待新令牌产生
		_ = limiter.Wait(context.TODO())

		wg.Add(1)
		go func(data int) 
			defer wg.Done()

			// do something
			time.Sleep(3 * time.Second)
		(data)
	

	wg.Wait()

协程池并发处理任务

生产业务中,针对复杂业务或者不想那么麻烦,可以直接上协程池。
常用协程池https://github.com/panjf2000/ants,如下实现。

代码上看起来简洁很多,根本原理和动态个数协程控制思路差不多,后续单开一篇文章讲讲协程池的实现。

// runGoroutinePoolTask 使用协程池动态管理协程数量
func runGoroutinePoolTask(dataChan <-chan int, maxTaskNum int) 
	p, _ := ants.NewPool(maxTaskNum)
	defer p.Release()

	var wg sync.WaitGroup

	for _ = range dataChan 
		wg.Add(1)

		// 提交任务,协程池动态管理数量,可以做更多的分配优化策略
		_ = p.Submit(func() 
			defer wg.Done()

			// do something
			time.Sleep(3 * time.Second)
		)

	

	wg.Wait()

参考

演示代码 https://gitee.com/wenzhou1219/go-in-prod/tree/master/task_concurrency

https://zhuanlan.zhihu.com/p/568151296

以上是关于golang 高手才会的协程数量控制套路总结的主要内容,如果未能解决你的问题,请参考以下文章

golang 高手才会的协程数量控制套路总结

golang 高手才会的协程数量控制套路总结

golang协程调度模式解密

关于Python的协程问题总结

写一个 golang 风格的协程扩展

深入浅出Golang的协程池设计