Go语言 | 协程池的应用(可能是全网最适合小白的教程)

Posted “逛丢一只鞋”

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Go语言 | 协程池的应用(可能是全网最适合小白的教程)相关的知识,希望对你有一定的参考价值。

文章目录

前言

为什么说这是全网最适合小白的呢?因为我就是一个第一次写多线程,使用协程池的小白,自己明白这里面的入门不易,特此分享给大家

需求分析

在我们的服务中,有这么一个功能,需要函数先去遍历一个文件夹,这个文件夹可能有很多的子目录,然后读取目录下的所有json文件,并对其进行schema验证。

原始代码,是将上述需求通过串行的方式进行实现,也就是遍历,读到一个json文件后,就对其进行schema校验

在业务量小,即json文件少的情况下这种方法一点问题没有,但是当json文件激增到五六百个时,程序可能就要耗费七八秒的时间,这是不能接受的

目前改造计划通过多线程的方式,进行功能的划分,并发的执行读取任务,利用Go语言高并发的机制加速程序服务的运行

设计功能:

  • 分离文件遍历与文件读取校验的功能
  • 设计协程池,文件遍历作为主协程,文件读取校验作为worker协程
  • 通过chan通道传递文件名与校验结果

此前我主要使用C语言编程,对于多线程的使用可以说,一点没有,能规避的就规避,目前使用Go语言,多线程是学习和使用是躲不过了,这次还有这么一个协程池的概念

协程池和多线程相关概念

首先介绍协程池是什么,为什么不直接用多线程。

go协程池(goroutine)

Go语言中的goroutine虽然相对于系统线程来说比较轻量级(初始栈大小仅2KB),但是在高并发量下的goroutine频繁创建和销毁对于性能损耗以及GC来说压力也不小

很多情况下,我们需要考虑如下问题:

  1. 限制并发的goroutine数量;
  2. 复用goroutine,减轻runtime调度压力,提升程序性能;
  3. 规避过多的goroutine侵占系统资源(CPU&内存)。

go协程池

如果无休止的开辟Goroutine依然会出现高频率的调度Groutine,那么依然会浪费很多上下文切换的资源,导致做无用功。

所以设计一个Goroutine池限制Goroutine的开辟个数在大型并发场景还是必要的。

package main
 
import (
	"fmt"
	"time"
)
 
/* 有关Task任务相关定义及操作 */
//定义任务Task类型,每一个任务Task都可以抽象成一个函数
type Task struct 
	f func() error //一个无参的函数类型

 
//通过NewTask来创建一个Task
func NewTask(f func() error) *Task 
	t := Task
		f: f,
	
	return &t

 
//执行Task任务的方法
func (t *Task) Execute() 
	t.f() //调用任务所绑定的函数

 
/* 有关协程池的定义及操作 */
//定义池类型
type Pool struct 
	EntryChannel chan *Task //对外接收Task的入口
	worker_num   int        //协程池最大worker数量,限定Goroutine的个数
	JobsChannel  chan *Task //协程池内部的任务就绪队列

 
//创建一个协程池
func NewPool(cap int) *Pool 
	p := Pool
		EntryChannel: make(chan *Task),
		worker_num:   cap,
		JobsChannel:  make(chan *Task),
	
	return &p

 
//协程池创建一个worker并且开始工作
func (p *Pool) worker(work_ID int) 
	//worker不断的从JobsChannel内部任务队列中拿任务
	for task := range p.JobsChannel 
		//如果拿到任务,则执行task任务
		task.Execute()
		fmt.Println("worker ID ", work_ID, " 执行完毕任务")
	

 
//让协程池Pool开始工作
func (p *Pool) Run() 
	//1,首先根据协程池的worker数量限定,开启固定数量的Worker,
	//  每一个Worker用一个Goroutine承载
	for i := 0; i < p.worker_num; i++ 
		fmt.Println("开启固定数量的Worker:", i)
		go p.worker(i)
	
 
	//2, 从EntryChannel协程池入口取外界传递过来的任务
	//   并且将任务送进JobsChannel中
	for task := range p.EntryChannel 
		p.JobsChannel <- task
	
 
	//3, 执行完毕需要关闭JobsChannel
	close(p.JobsChannel)
	fmt.Println("执行完毕需要关闭JobsChannel")
 
	//4, 执行完毕需要关闭EntryChannel
	close(p.EntryChannel)
	fmt.Println("执行完毕需要关闭EntryChannel")

 
//主函数
func main() 
	//创建一个Task
	t := NewTask(func() error 
		fmt.Println("创建一个Task:", time.Now().Format("2006-01-02 15:04:05"))
		return nil
	)
 
	//创建一个协程池,最大开启3个协程worker
	p := NewPool(3)
 
	//开一个协程 不断的向 Pool 输送打印一条时间的task任务
	go func() 
		for 
			p.EntryChannel <- t
		
	()
 
	//启动协程池p
	p.Run()

上述代码通过一个简单的例子说明了协程池的基本工作原理,但是如果上述框架要应用在实际工程中,还有许多的不足,因此,go协程池也是有库函数的存在

ants库

ants是一个受fasthttp启发的高性能协程池,fasthttp号称是比go原生的net/http快10倍,其原因之一就是采用了各种池化技术, ants相比之前两种协程池,其模型更像是之前接触到的数据库连接池,需要从空余的worker中取出一个来执行任务, 当无可用空余worker的时候再去创建,而当pool的容量达到上线之后,剩余的任务阻塞等待当前进行中的worker执行完毕将worker放回pool, 直至pool中有空闲worker。

ants在内存的管理上做得很好,除了定期清除过期worker(一定时间内没有分配到任务的worker),ants还实现了一种适用于大批量相同任务的pool, 这种pool与一个需要大批量重复执行的函数锁绑定,避免了调用方不停的创建,更加节省内存。

ants Git仓库地址

如果可以看懂这个库的使用,还是推荐通过第三方维护的库来进行实现,可以规避很多初次使用遇到的问题。

package main
 
import (
	"fmt"
	"github.com/panjf2000/ants"
	"sync"
	"time"
)
 
//任务
func sendMail(i int, wg *sync.WaitGroup) func() 
	var cnt int
	return func() 
		for 
			time.Sleep(time.Second * 2)
			fmt.Println("send mail to ", i)
			cnt++
			if cnt > 5 && i == 1 
				fmt.Println("退出协程ID:", i)
				break
			
		
		wg.Done()
	

 
func main() 
	wg := sync.WaitGroup
 
	//申请一个协程池对象
	pool, _ := ants.NewPool(2)
 
	//关闭协程池
	defer pool.Release()
 
	// 向pool提交任务
	for i := 1; i <= 5; i++ 
		pool.Submit(sendMail(i, &wg))
		wg.Add(1)
	
	wg.Wait()

源码中提到, ants的吞吐量能够比原生groutine高出N倍,内存节省10到20倍。

实现

在我的实际设计中,因为各种原因,我没有使用ants库,而是通过其原理进行了设计

因为牵扯到具体的功能,直接看可能会比较懵,但我想说明的就是,我们需要关注的点

func loadConfigData(paths []string, schema map[string]*gojsonschema.Schema,
	loadSchemaCheck bool, poolNumber int) 

	var wg sync.WaitGroup
	var mapGuard sync.Mutex
	fileinfochan := make(chan fileInfoChan, 100)

	for i := 0; i < poolNumber; i++ 
		go func() 
			for fileinfo := range fileinfochan 
				switch fileinfo.jsonType 
				case hub.OTHER_TYPE_TEMPALTE:
					_ = loadOtherDataTempParse(fileinfo, &mapGuard)
					wg.Done()
				case hub.OTHER_TYPE_PLUGIN:
					_ = loadOtherDataPluginParse(fileinfo, &mapGuard)
					wg.Done()
				case hub.OTHER_TYPE_YAML:
					_ = loadOtherDataYamlParse(fileinfo, &mapGuard)
					wg.Done()
				default:
					loadJsonDefDataParse(fileinfo.jsonType, fileinfo.fileName, fileinfo.fileNamePath,
						schema, loadSchemaCheck, &mapGuard)
					wg.Done()
				
			
		()
	

	logger.LogS().Debugln("加载API def文件...")
	for i := hub.JSON_TYPE_PRIVATE; i <= hub.JSON_TYPE_SCHEDULE; i++ 
		loadJsonDefData(i, paths[i], true, schema, loadSchemaCheck, fileinfochan, &wg)
	

	wg.Wait()
	close(fileinfochan)

... ...
// 遍历tml目录及子目录
func loadOtherDataTemp(path string, fileinfochan chan fileInfoChan, jsonType int, wg *sync.WaitGroup) error 
	logger.LogS().Debugln("加载template(*.tml)文件: ")
	err := filepath.Walk(path, func(fileNamePath string, f os.FileInfo, err error) error 
		if strings.Contains(fileNamePath, ".tmpl") 
			fileinfo := fileInfoChan
				fileNamePath: fileNamePath,
				jsonType:     jsonType,
			
			wg.Add(1)
			fileinfochan <- fileinfo
		
		return nil
	)
	if err != nil 
		logger.LogS().Errorln(err.Error())
		return err
	
	logger.LogS().Debugln("加载(*.tml)文件完成!\\r\\n")
	return nil

主要关注如下代码和变量

sync.WaitGroup

使用等待组进行多个任务的同步,等待组可以保证在并发环境中完成指定数量的任务

在 sync.WaitGroup(等待组)类型中,每个 sync.WaitGroup 值在内部维护着一个计数,此计数的初始默认值为零。

var wg sync.WaitGroup

在使用中,我们每往通道里发送一个数据,就是用wg.Add(1)进行计数加1,每在通道里消费一个数据,就使用wg.Done()进行计数减1。在主函数结束时,通过wg.Wait()让程序等待通道中不再有发送,且已经消费完数据后再退出,即Wait() 会阻塞代码的运行,直到计数器地值减为0。

sync.Mutex

var mapGuard sync.Mutex

对于需要在多个线程里互斥访问的资源,可以使用sync.Mutex来操作,对应的是Lock和Unlock两个方法就是加锁和解锁。

Go语言中对于map的操作,必须通过互斥锁

var mutex sync.Mutex                //互斥锁
func printer(str string)
    mutex.Lock()                //加锁
    defer mutex.Unlock()        //在defer语句里解锁,这样就可以保证在函数退出时释放。
    for _,ch:=range str
        fmt.Printf("%c",ch)
        time.Sleep(time.Millisecond*100)
    

chan

创建chan的结构体,根据实际情况,创建结构体变量

初始化过程中,创建了100个缓存通道,在结束时,要使用close对chan进行关闭

type fileInfoChan struct 
	jsonType     int
	fileName     string
	fileNamePath string


fileinfochan := make(chan fileInfoChan, 100)

close(fileinfochan)

pool

for i := 0; i < poolNumber; i++ 
		go func() 
		

pool就是我们常说的协程池,这里我们来存放具体的任务,在设计中即解析json文件并验证

我们要在主函数中提前设置好建立的pool数量,也就是协程池数

假设我们poolNumber = 20,也就是协程池数为20个,我们创建二十个线程,作为任务处理的线程

后续任务在chan排队进入线程池运行,消费chan中的任务

chan <-

信息如何进入通道排队,这里我们对文件夹进行遍历,每遍历到一个文件信息,我们就通过 <-的方式传递到chan中进行排队,等待线程池的消耗

if strings.Contains(fileNamePath, ".tmpl") 
			fileinfo := fileInfoChan
				fileNamePath: fileNamePath,
				jsonType:     jsonType,
			
			wg.Add(1)
			fileinfochan <- fileinfo
		

以上是关于Go语言 | 协程池的应用(可能是全网最适合小白的教程)的主要内容,如果未能解决你的问题,请参考以下文章

深入浅出Golang的协程池设计

白话 Golang 协程池

白话 Golang 协程池

Goroutine并发调度模型深度解析之手撸一个协程池

Go协程与协程池

Go协程与协程池