Go语言 | 协程池的应用(可能是全网最适合小白的教程)
Posted “逛丢一只鞋”
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Go语言 | 协程池的应用(可能是全网最适合小白的教程)相关的知识,希望对你有一定的参考价值。
文章目录
前言
为什么说这是全网最适合小白的呢?因为我就是一个第一次写多线程,使用协程池的小白,自己明白这里面的入门不易,特此分享给大家
需求分析
在我们的服务中,有这么一个功能,需要函数先去遍历一个文件夹,这个文件夹可能有很多的子目录,然后读取目录下的所有json文件,并对其进行schema验证。
原始代码,是将上述需求通过串行的方式进行实现,也就是遍历,读到一个json文件后,就对其进行schema校验
在业务量小,即json文件少的情况下这种方法一点问题没有,但是当json文件激增到五六百个时,程序可能就要耗费七八秒的时间,这是不能接受的
目前改造计划通过多线程的方式,进行功能的划分,并发的执行读取任务,利用Go语言高并发的机制加速程序服务的运行
设计功能:
- 分离文件遍历与文件读取校验的功能
- 设计协程池,文件遍历作为主协程,文件读取校验作为worker协程
- 通过chan通道传递文件名与校验结果
此前我主要使用C语言编程,对于多线程的使用可以说,一点没有,能规避的就规避,目前使用Go语言,多线程是学习和使用是躲不过了,这次还有这么一个协程池的概念
协程池和多线程相关概念
首先介绍协程池是什么,为什么不直接用多线程。
go协程池(goroutine)
Go语言中的goroutine虽然相对于系统线程来说比较轻量级(初始栈大小仅2KB),但是在高并发量下的goroutine频繁创建和销毁对于性能损耗以及GC来说压力也不小
很多情况下,我们需要考虑如下问题:
- 限制并发的goroutine数量;
- 复用goroutine,减轻runtime调度压力,提升程序性能;
- 规避过多的goroutine侵占系统资源(CPU&内存)。
go协程池
如果无休止的开辟Goroutine依然会出现高频率的调度Groutine,那么依然会浪费很多上下文切换的资源,导致做无用功。
所以设计一个Goroutine池限制Goroutine的开辟个数在大型并发场景还是必要的。
package main
import (
"fmt"
"time"
)
/* 有关Task任务相关定义及操作 */
//定义任务Task类型,每一个任务Task都可以抽象成一个函数
type Task struct
f func() error //一个无参的函数类型
//通过NewTask来创建一个Task
func NewTask(f func() error) *Task
t := Task
f: f,
return &t
//执行Task任务的方法
func (t *Task) Execute()
t.f() //调用任务所绑定的函数
/* 有关协程池的定义及操作 */
//定义池类型
type Pool struct
EntryChannel chan *Task //对外接收Task的入口
worker_num int //协程池最大worker数量,限定Goroutine的个数
JobsChannel chan *Task //协程池内部的任务就绪队列
//创建一个协程池
func NewPool(cap int) *Pool
p := Pool
EntryChannel: make(chan *Task),
worker_num: cap,
JobsChannel: make(chan *Task),
return &p
//协程池创建一个worker并且开始工作
func (p *Pool) worker(work_ID int)
//worker不断的从JobsChannel内部任务队列中拿任务
for task := range p.JobsChannel
//如果拿到任务,则执行task任务
task.Execute()
fmt.Println("worker ID ", work_ID, " 执行完毕任务")
//让协程池Pool开始工作
func (p *Pool) Run()
//1,首先根据协程池的worker数量限定,开启固定数量的Worker,
// 每一个Worker用一个Goroutine承载
for i := 0; i < p.worker_num; i++
fmt.Println("开启固定数量的Worker:", i)
go p.worker(i)
//2, 从EntryChannel协程池入口取外界传递过来的任务
// 并且将任务送进JobsChannel中
for task := range p.EntryChannel
p.JobsChannel <- task
//3, 执行完毕需要关闭JobsChannel
close(p.JobsChannel)
fmt.Println("执行完毕需要关闭JobsChannel")
//4, 执行完毕需要关闭EntryChannel
close(p.EntryChannel)
fmt.Println("执行完毕需要关闭EntryChannel")
//主函数
func main()
//创建一个Task
t := NewTask(func() error
fmt.Println("创建一个Task:", time.Now().Format("2006-01-02 15:04:05"))
return nil
)
//创建一个协程池,最大开启3个协程worker
p := NewPool(3)
//开一个协程 不断的向 Pool 输送打印一条时间的task任务
go func()
for
p.EntryChannel <- t
()
//启动协程池p
p.Run()
上述代码通过一个简单的例子说明了协程池的基本工作原理,但是如果上述框架要应用在实际工程中,还有许多的不足,因此,go协程池也是有库函数的存在
ants库
ants是一个受fasthttp启发的高性能协程池,fasthttp号称是比go原生的net/http快10倍,其原因之一就是采用了各种池化技术, ants相比之前两种协程池,其模型更像是之前接触到的数据库连接池,需要从空余的worker中取出一个来执行任务, 当无可用空余worker的时候再去创建,而当pool的容量达到上线之后,剩余的任务阻塞等待当前进行中的worker执行完毕将worker放回pool, 直至pool中有空闲worker。
ants在内存的管理上做得很好,除了定期清除过期worker(一定时间内没有分配到任务的worker),ants还实现了一种适用于大批量相同任务的pool, 这种pool与一个需要大批量重复执行的函数锁绑定,避免了调用方不停的创建,更加节省内存。
如果可以看懂这个库的使用,还是推荐通过第三方维护的库来进行实现,可以规避很多初次使用遇到的问题。
package main
import (
"fmt"
"github.com/panjf2000/ants"
"sync"
"time"
)
//任务
func sendMail(i int, wg *sync.WaitGroup) func()
var cnt int
return func()
for
time.Sleep(time.Second * 2)
fmt.Println("send mail to ", i)
cnt++
if cnt > 5 && i == 1
fmt.Println("退出协程ID:", i)
break
wg.Done()
func main()
wg := sync.WaitGroup
//申请一个协程池对象
pool, _ := ants.NewPool(2)
//关闭协程池
defer pool.Release()
// 向pool提交任务
for i := 1; i <= 5; i++
pool.Submit(sendMail(i, &wg))
wg.Add(1)
wg.Wait()
源码中提到, ants的吞吐量能够比原生groutine高出N倍,内存节省10到20倍。
实现要点
在我的实际设计中,因为各种原因,我没有使用ants库,而是通过其原理进行了设计
因为牵扯到具体的功能,直接看可能会比较懵,但我想说明的就是,我们需要关注的点
func loadConfigData(paths []string, schema map[string]*gojsonschema.Schema,
loadSchemaCheck bool, poolNumber int)
var wg sync.WaitGroup
var mapGuard sync.Mutex
fileinfochan := make(chan fileInfoChan, 100)
for i := 0; i < poolNumber; i++
go func()
for fileinfo := range fileinfochan
switch fileinfo.jsonType
case hub.OTHER_TYPE_TEMPALTE:
_ = loadOtherDataTempParse(fileinfo, &mapGuard)
wg.Done()
case hub.OTHER_TYPE_PLUGIN:
_ = loadOtherDataPluginParse(fileinfo, &mapGuard)
wg.Done()
case hub.OTHER_TYPE_YAML:
_ = loadOtherDataYamlParse(fileinfo, &mapGuard)
wg.Done()
default:
loadJsonDefDataParse(fileinfo.jsonType, fileinfo.fileName, fileinfo.fileNamePath,
schema, loadSchemaCheck, &mapGuard)
wg.Done()
()
logger.LogS().Debugln("加载API def文件...")
for i := hub.JSON_TYPE_PRIVATE; i <= hub.JSON_TYPE_SCHEDULE; i++
loadJsonDefData(i, paths[i], true, schema, loadSchemaCheck, fileinfochan, &wg)
wg.Wait()
close(fileinfochan)
... ...
// 遍历tml目录及子目录
func loadOtherDataTemp(path string, fileinfochan chan fileInfoChan, jsonType int, wg *sync.WaitGroup) error
logger.LogS().Debugln("加载template(*.tml)文件: ")
err := filepath.Walk(path, func(fileNamePath string, f os.FileInfo, err error) error
if strings.Contains(fileNamePath, ".tmpl")
fileinfo := fileInfoChan
fileNamePath: fileNamePath,
jsonType: jsonType,
wg.Add(1)
fileinfochan <- fileinfo
return nil
)
if err != nil
logger.LogS().Errorln(err.Error())
return err
logger.LogS().Debugln("加载(*.tml)文件完成!\\r\\n")
return nil
主要关注如下代码和变量
sync.WaitGroup
使用等待组进行多个任务的同步,等待组可以保证在并发环境中完成指定数量的任务
在 sync.WaitGroup(等待组)类型中,每个 sync.WaitGroup 值在内部维护着一个计数,此计数的初始默认值为零。
var wg sync.WaitGroup
在使用中,我们每往通道里发送一个数据,就是用wg.Add(1)
进行计数加1
,每在通道里消费一个数据,就使用wg.Done()
进行计数减1
。在主函数结束时,通过wg.Wait()
让程序等待通道中不再有发送,且已经消费完数据后再退出,即Wait() 会阻塞代码的运行,直到计数器地值减为0。
sync.Mutex
var mapGuard sync.Mutex
对于需要在多个线程里互斥访问的资源,可以使用sync.Mutex来操作,对应的是Lock和Unlock两个方法就是加锁和解锁。
Go语言中对于map的操作,必须通过互斥锁
var mutex sync.Mutex //互斥锁
func printer(str string)
mutex.Lock() //加锁
defer mutex.Unlock() //在defer语句里解锁,这样就可以保证在函数退出时释放。
for _,ch:=range str
fmt.Printf("%c",ch)
time.Sleep(time.Millisecond*100)
chan
创建chan的结构体,根据实际情况,创建结构体变量
初始化过程中,创建了100个缓存通道,在结束时,要使用close对chan进行关闭
type fileInfoChan struct
jsonType int
fileName string
fileNamePath string
fileinfochan := make(chan fileInfoChan, 100)
close(fileinfochan)
pool
for i := 0; i < poolNumber; i++
go func()
pool就是我们常说的协程池,这里我们来存放具体的任务,在设计中即解析json文件并验证
我们要在主函数中提前设置好建立的pool数量,也就是协程池数
假设我们poolNumber = 20
,也就是协程池数为20个,我们创建二十个线程,作为任务处理的线程
后续任务在chan排队进入线程池运行,消费chan中的任务
chan <-
信息如何进入通道排队,这里我们对文件夹进行遍历,每遍历到一个文件信息,我们就通过 <-
的方式传递到chan中进行排队,等待线程池的消耗
if strings.Contains(fileNamePath, ".tmpl")
fileinfo := fileInfoChan
fileNamePath: fileNamePath,
jsonType: jsonType,
wg.Add(1)
fileinfochan <- fileinfo
以上是关于Go语言 | 协程池的应用(可能是全网最适合小白的教程)的主要内容,如果未能解决你的问题,请参考以下文章