ants源码阅读
Posted 白沙云影
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了ants源码阅读相关的知识,希望对你有一定的参考价值。
ants 是一个高性能的 goroutine 池,实现了对大规模 goroutine 的调度管理、goroutine 复用,允许使用者在开发并发程序的时候限制 goroutine 数量,复用资源,达到更高效执行任务的效果。
1. 工作流程
1.1 创建NewPool
执行过程:
- 获取配置初始化Pool对象
- 开启携程定时清理过期的goworker
func NewPool(size int, options ...Option) (*Pool, error) {
// 1. 配置解析
opts := loadOptions(options...)
if size <= 0 {
size = -1
}
if expiry := opts.ExpiryDuration; expiry < 0 {
return nil, ErrInvalidPoolExpiry
} else if expiry == 0 {
opts.ExpiryDuration = DefaultCleanIntervalTime
}
if opts.Logger == nil {
opts.Logger = defaultLogger
}
// 2. 初始化pool对象
p := &Pool{
capacity: int32(size),
lock: internal.NewSpinLock(),
options: opts,
}
p.workerCache.New = func() interface{} {
return &goWorker{
pool: p, // 把pool 反过来赋值给goworker
task: make(chan func(), workerChanCap), // size 为 1
}
}
if p.options.PreAlloc {
if size == -1 {
return nil, ErrInvalidPreAllocSize
}
p.workers = newWorkerArray(loopQueueType, size) // 初始化pool中的workers 对象
} else {
p.workers = newWorkerArray(stackType, 0)
}
p.cond = sync.NewCond(p.lock)
//3. 开启定时任务
go p.purgePeriodically()
return p, nil
}
定时清理任务
func (p *Pool) purgePeriodically() {
heartbeat := time.NewTicker(p.options.ExpiryDuration)
defer heartbeat.Stop()
for range heartbeat.C {
if p.IsClosed() {
break
}
p.lock.Lock()
expiredWorkers := p.workers.retrieveExpiry(p.options.ExpiryDuration) // 获取过时的goworkers
p.lock.Unlock()
for i := range expiredWorkers {
expiredWorkers[i].task <- nil // 把过时的goworker task 置空
expiredWorkers[i] = nil
}
if p.Running() == 0 {
p.cond.Broadcast()
}
}
}
1.2 发布任务
- 获取空闲的goworker
- 把任务写入到空闲的goworker的task中
func (p *Pool) Submit(task func()) error {
if p.IsClosed() {
return ErrPoolClosed
}
var w *goWorker
if w = p.retrieveWorker(); w == nil {
return ErrPoolOverload
}
w.task <- task
return nil
}
获取空闲的goworker 逻辑:
graph TD
A[获取空闲的goworker] --> B{p.workers 是否有空闲}
B -->|有| A
B --> |否| C{运行中的goworker是否超过容量}
C --> |否| D[sync.Pool 新建一个goWorker]
C --> |是| E[sync.cond.wait 等待]
D --> |开携程run goworker| F[运行task, 把goworker重新放回到pool.goWorker]
F --> |sync.cond.Signal|E
E --> |没有运行的goworker| D
获取空闲的goworker
func (p *Pool) retrieveWorker() (w *goWorker) {
spawnWorker := func() {
w = p.workerCache.Get().(*goWorker)
w.run()
} // sync.Pool 创建一个goworker
p.lock.Lock()
// 1. 获取空闲的goworkers
w = p.workers.detach()
if w != nil {
p.lock.Unlock()
} else if capacity := p.Cap(); capacity == -1 {
p.lock.Unlock()
spawnWorker()
} else if p.Running() < capacity {
p.lock.Unlock()
spawnWorker()
} else {
if p.options.Nonblocking {
p.lock.Unlock()
return
}
Reentry:
if p.options.MaxBlockingTasks != 0 && p.blockingNum >= p.options.MaxBlockingTasks {
p.lock.Unlock()
return
}
p.blockingNum++
p.cond.Wait() // 等待
p.blockingNum--
var nw int
if nw = p.Running(); nw == 0 {
p.lock.Unlock()
if !p.IsClosed() {
spawnWorker()
}
return
}
if w = p.workers.detach(); w == nil {
if nw < capacity {
p.lock.Unlock()
spawnWorker()
return
}
goto Reentry
}
p.lock.Unlock()
}
return
}
goworker 运行任务
func (w *goWorker) run() {
w.pool.incRunning()
go func() {
defer func() {
w.pool.decRunning()
w.pool.workerCache.Put(w) // sync.Pool 放回goworker
if p := recover(); p != nil {
if ph := w.pool.options.PanicHandler; ph != nil {
ph(p)
} else {
w.pool.options.Logger.Printf("worker exits from a panic: %v\\n", p)
var buf [4096]byte
n := runtime.Stack(buf[:], false)
w.pool.options.Logger.Printf("worker exits from panic: %s\\n", string(buf[:n]))
}
}
// Call Signal() here in case there are goroutines waiting for available workers.
w.pool.cond.Signal() // 发出信号
}()
for f := range w.task {
if f == nil {
fmt.Println("f is nil")
return
}
f() // 运行 task
if ok := w.pool.revertWorker(w); !ok { // goworker 重新放回pool.workers
return
}
}
}()
}
2. 配置参数设置方法
该解析配置使用的是: 函数式选项模式 。 该方法的好处是配置对象的生成方法具有可扩展性。
type Option func(opts *Options) //核心
func loadOptions(options ...Option) *Options {
opts := new(Options)
for _, option := range options {
option(opts)
}
return opts
}
type Options struct {
ExpiryDuration time.Duration
PreAlloc bool
MaxBlockingTasks int
Nonblocking bool
PanicHandler func(interface{})
Logger Logger
}
// WithOptions accepts the whole options config.
func WithOptions(options Options) Option {
return func(opts *Options) {
*opts = options
}
}
func WithExpiryDuration(expiryDuration time.Duration) Option {
return func(opts *Options) {
opts.ExpiryDuration = expiryDuration
}
}
func WithPreAlloc(preAlloc bool) Option {
return func(opts *Options) {
opts.PreAlloc = preAlloc
}
}
func WithMaxBlockingTasks(maxBlockingTasks int) Option {
return func(opts *Options) {
opts.MaxBlockingTasks = maxBlockingTasks
}
}
使用方法:
loadOptions(WithPreAlloc(true), WithMaxBlockingTasks(2))
学习总结:
- 函数式选项模式 的使用方法
ants 包高性能的原因在于:
- 执行携程的对象可以重复使用。
- 使用的lock是重新封装的spinLock,少了自旋部分。简化了sync.mutex的功能,缩短了时间。
- 可以选择使用队列或切片的goworker,提前分配空间。
3.不足之处: goworker 携程不能停止,如果任务阻塞的话,goroutine会得不到释放。可以用context控制。
以上是关于ants源码阅读的主要内容,如果未能解决你的问题,请参考以下文章