ants源码阅读

Posted 白沙云影

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了ants源码阅读相关的知识,希望对你有一定的参考价值。

ants 是一个高性能的 goroutine 池,实现了对大规模 goroutine 的调度管理、goroutine 复用,允许使用者在开发并发程序的时候限制 goroutine 数量,复用资源,达到更高效执行任务的效果。

1. 工作流程

1.1 创建NewPool

执行过程:

  1. 获取配置初始化Pool对象
  2. 开启携程定时清理过期的goworker
func NewPool(size int, options ...Option) (*Pool, error) {
    // 1. 配置解析
    opts := loadOptions(options...) 

    if size <= 0 {
        size = -1
    }

    if expiry := opts.ExpiryDuration; expiry < 0 {
        return nil, ErrInvalidPoolExpiry
    } else if expiry == 0 {
        opts.ExpiryDuration = DefaultCleanIntervalTime
    }

    if opts.Logger == nil {
        opts.Logger = defaultLogger
    }
    // 2. 初始化pool对象
    p := &Pool{
        capacity: int32(size),
        lock:     internal.NewSpinLock(),
        options:  opts,
    }
    p.workerCache.New = func() interface{} {
        return &goWorker{
            pool: p,   // 把pool 反过来赋值给goworker
            task: make(chan func(), workerChanCap), // size 为 1 
        }
    }
    if p.options.PreAlloc {
        if size == -1 {
            return nil, ErrInvalidPreAllocSize
        }
        p.workers = newWorkerArray(loopQueueType, size) // 初始化pool中的workers 对象
    } else {
        p.workers = newWorkerArray(stackType, 0)
    }

    p.cond = sync.NewCond(p.lock)
   //3. 开启定时任务
    go p.purgePeriodically()

    return p, nil
}

定时清理任务

func (p *Pool) purgePeriodically() {
    heartbeat := time.NewTicker(p.options.ExpiryDuration)
    defer heartbeat.Stop()

    for range heartbeat.C {
        if p.IsClosed() {
            break
        }

        p.lock.Lock()
        expiredWorkers := p.workers.retrieveExpiry(p.options.ExpiryDuration) // 获取过时的goworkers 
        p.lock.Unlock()

        for i := range expiredWorkers {
            expiredWorkers[i].task <- nil  // 把过时的goworker task 置空
            expiredWorkers[i] = nil 
        }
        
        if p.Running() == 0 {
            p.cond.Broadcast()
        }
    }
}

1.2 发布任务

  1. 获取空闲的goworker
  2. 把任务写入到空闲的goworker的task中
func (p *Pool) Submit(task func()) error {
    if p.IsClosed() {
        return ErrPoolClosed
    }
    var w *goWorker
    if w = p.retrieveWorker(); w == nil {
        return ErrPoolOverload
    }
    w.task <- task
    return nil
}

获取空闲的goworker 逻辑:

graph TD
    A[获取空闲的goworker] --> B{p.workers 是否有空闲}
    B -->|有| A
    B --> |否| C{运行中的goworker是否超过容量}
    C --> |否| D[sync.Pool 新建一个goWorker]
    C --> |是| E[sync.cond.wait 等待]
    D --> |开携程run goworker| F[运行task, 把goworker重新放回到pool.goWorker]
    F --> |sync.cond.Signal|E
    E --> |没有运行的goworker| D 

获取空闲的goworker

func (p *Pool) retrieveWorker() (w *goWorker) {
    spawnWorker := func() {
        w = p.workerCache.Get().(*goWorker)
        w.run()
    }  // sync.Pool 创建一个goworker

    p.lock.Lock()
   // 1. 获取空闲的goworkers 
    w = p.workers.detach() 
    if w != nil {
        p.lock.Unlock()
    } else if capacity := p.Cap(); capacity == -1 {
        p.lock.Unlock()
        spawnWorker()
    } else if p.Running() < capacity {
        p.lock.Unlock()
        spawnWorker()
    } else {
        if p.options.Nonblocking {
            p.lock.Unlock()
            return
        }
    Reentry:
        if p.options.MaxBlockingTasks != 0 && p.blockingNum >= p.options.MaxBlockingTasks {
            p.lock.Unlock()
            return
        }
        p.blockingNum++
        p.cond.Wait() // 等待
        p.blockingNum--
        var nw int
        if nw = p.Running(); nw == 0 {
            p.lock.Unlock()
            if !p.IsClosed() {
                spawnWorker()
            }
            return
        }
        if w = p.workers.detach(); w == nil {
            if nw < capacity {
                p.lock.Unlock()
                spawnWorker()
                return
            }
            goto Reentry
        }

        p.lock.Unlock()
    }
    return
}

goworker 运行任务

func (w *goWorker) run() {
    w.pool.incRunning()
    go func() {
        defer func() {
            w.pool.decRunning()
            w.pool.workerCache.Put(w) // sync.Pool 放回goworker
            if p := recover(); p != nil {
                if ph := w.pool.options.PanicHandler; ph != nil {
                    ph(p)
                } else {
                    w.pool.options.Logger.Printf("worker exits from a panic: %v\\n", p)
                    var buf [4096]byte
                    n := runtime.Stack(buf[:], false)
                    w.pool.options.Logger.Printf("worker exits from panic: %s\\n", string(buf[:n]))
                }
            }
            // Call Signal() here in case there are goroutines waiting for available workers.
            w.pool.cond.Signal() // 发出信号
        }()
        for f := range w.task {
            if f == nil {
                fmt.Println("f is nil")
                return
            }
            f()   // 运行 task 
            if ok := w.pool.revertWorker(w); !ok { // goworker 重新放回pool.workers
                return
            }
        }
    }()
}

2. 配置参数设置方法

该解析配置使用的是: 函数式选项模式 。 该方法的好处是配置对象的生成方法具有可扩展性。

type Option func(opts *Options) //核心

func loadOptions(options ...Option) *Options {
    opts := new(Options)
    for _, option := range options {
        option(opts)
    }
    return opts
}

type Options struct {

    ExpiryDuration time.Duration
    PreAlloc bool
    MaxBlockingTasks int
    Nonblocking bool
    PanicHandler func(interface{})
    Logger Logger
}

// WithOptions accepts the whole options config.
func WithOptions(options Options) Option {
    return func(opts *Options) {
        *opts = options
    }
}



func WithExpiryDuration(expiryDuration time.Duration) Option {
    return func(opts *Options) {
        opts.ExpiryDuration = expiryDuration
    }
}

func WithPreAlloc(preAlloc bool) Option {
    return func(opts *Options) {
        opts.PreAlloc = preAlloc
    }
}

func WithMaxBlockingTasks(maxBlockingTasks int) Option {
    return func(opts *Options) {
        opts.MaxBlockingTasks = maxBlockingTasks
    }
}

使用方法:

loadOptions(WithPreAlloc(true), WithMaxBlockingTasks(2))

学习总结:

  1. 函数式选项模式 的使用方法
  2. ants 包高性能的原因在于:

    1. 执行携程的对象可以重复使用。
    2. 使用的lock是重新封装的spinLock,少了自旋部分。简化了sync.mutex的功能,缩短了时间。
    3. 可以选择使用队列或切片的goworker,提前分配空间。

3.不足之处: goworker 携程不能停止,如果任务阻塞的话,goroutine会得不到释放。可以用context控制。

以上是关于ants源码阅读的主要内容,如果未能解决你的问题,请参考以下文章

tomcat 8.0.x源码阅读系列:源代码运行环境搭建

Python代码阅读(第19篇):合并多个字典

为 tunny 提交的一次 PR

Python代码阅读(第26篇):将列表映射成字典

如何进行 Java 代码阅读分析?

Python代码阅读(第41篇):矩阵转置