[Golang]实现一个带有等待和超时功能的协程池 - 类似Java中的ExecutorService接口实现
Posted dm_vincent
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了[Golang]实现一个带有等待和超时功能的协程池 - 类似Java中的ExecutorService接口实现相关的知识,希望对你有一定的参考价值。
对于支持CSP并发编程模型的Golang而言,要实现一个协程池是非常简单的。对于习惯了基于线程模型完成并发编程的开发同学,可能初次接触会有点难理解,但是俗话说"书读百遍其义自见",百来行的代码也并不多。
我们的目标是实现一个具有以下特性的协程池(熟悉Java的话,基本上就是实现了ExecutorService接口中的主要方法):
- 能够指定任务队列长度和工作协程的数量使用任务队列
- 能够支持启动和停止
- 能够等待投递到其中的任务执行完毕(等待时间可以指定)
核心的三个数据结构:
- 协程池
- 工作协程
- 任务
直接上代码:
package pool
import (
"sync"
"time"
)
const (
defaultWorkerQueueLength = 10 // 默认工作协程数量
defaultJobQueueLength = 1000 // 默认任务队列长度
)
type Job func()
type TimeoutPool struct
workerQueue chan *worker
jobQueue chan Job
jobCount int
jobRet chan struct
stop chan struct
terminated chan struct
lock sync.Mutex
// 初始化一个带有执行超时时间的协程池,协程数量:10;任务队列长度1000
func NewTimeoutPoolWithDefaults() *TimeoutPool
return NewTimeoutPool(defaultWorkerQueueLength, defaultJobQueueLength)
// 初始化一个带有执行超时时间的协程池,指定worker数量以及任务队列长度
func NewTimeoutPool(workerQueueLen, jobQueueLen int) *TimeoutPool
pool := &TimeoutPool
workerQueue: make(chan *worker, workerQueueLen),
jobQueue: make(chan Job, jobQueueLen),
jobRet: make(chan struct, jobQueueLen),
stop: make(chan struct),
terminated: make(chan struct),
return pool
// 停止协程池运行,如果有正在运行中的任务会等待其运行完毕
func (p *TimeoutPool) Terminate()
p.stop <- struct
// 提交一个任务到协程池
func (p *TimeoutPool) Submit(job Job)
p.jobQueue <- job
p.lock.Lock()
p.jobCount += 1
p.lock.Unlock()
// 启动并等待协程池内的运行全部运行结束 - 如果没有主动停止,如果有任务还在执行中会一直等待
// 如果返回true表示在规定时间范围内成功结束;返回false表示主动停止
// 注意:最终应该只有一个协程来调用Wait等待协程池运行结束,否则其中的计数存在竞态条件问题
func (p *TimeoutPool) StartAndWaitUntilTerminated() bool
// 启动协程池
p.start()
// 等待运行结束
completed := 0
for completed < p.jobCount
select
case <-p.terminated:
return false
default:
select
case <-p.jobRet:
completed += 1
case <-p.terminated:
return false
return true
// 启动并等待协程池内的运行全部运行结束
// 如果返回true表示在规定时间范围内成功结束;返回false表示运行整体超时或者主动停止
// 注意:最终应该只有一个协程来调用Wait等待协程池运行结束,否则其中的计数存在竞态条件问题
func (p *TimeoutPool) StartAndWait(timeout time.Duration) bool
// 启动协程池
p.start()
// 等待运行结束
completed := 0
for completed < p.jobCount
select
case <-p.terminated:
return false
case <-time.After(timeout):
return false
default:
select
case <-p.jobRet:
completed += 1
case <-p.terminated:
return false
case <-time.After(timeout):
return false
return true
// ~~ 内部实现
func (p *TimeoutPool) start()
for i := 0; i < cap(p.workerQueue); i++
newWorker(p.workerQueue, p.jobRet)
go p.dispatch()
func (p *TimeoutPool) dispatch()
for
var job Job
select
case job = <-p.jobQueue:
worker := <-p.workerQueue
worker.jobChannel <- job
case <-p.stop:
for i := 0; i < cap(p.workerQueue); i++
worker := <-p.workerQueue
worker.stop <- struct
<-worker.stop
p.terminated <- struct
return
type worker struct
workerQueue chan *worker
jobChannel chan Job
jobRet chan struct
stop chan struct
func (w *worker) start()
go func()
for
w.workerQueue <- w
var job Job
select
case job = <-w.jobChannel:
job()
w.jobRet <- struct
case <-w.stop:
w.stop <- struct
return
()
func newWorker(workerQueue chan *worker, jobRet chan struct) *worker
worker := &worker
workerQueue: workerQueue,
jobChannel: make(chan Job),
jobRet: jobRet,
stop: make(chan struct),
worker.start()
return worker
值得说明的一些细节:
- 提交任务时使用一个锁来保证多个协程并发提交任务时,总任务数能够正确计数
- channel发送一个信号后等待反馈:
worker.stop <- struct // 给工作协程发送一个stop信号
<-worker.stop // 等待工作协程成功stop后的反馈信号
- 等待执行结束的方法中多级select体现接收信号的优先级关系(源自多情况同时满足时,select会随机选择一个的语言特性):
func (p *TimeoutPool) StartAndWait(timeout time.Duration) bool
// 启动协程池
p.start()
// 等待运行结束
completed := 0
for completed < p.jobCount
// 外层的select优先级高,当有任务完成的信号和terminate信号同时出现时,优先选择terminate信号
select
case <-p.terminated:
return false
case <-time.After(timeout):
return false
default:
// 内层的select优先级低,能够同时处理三种支持的信号
select
case <-p.jobRet:
completed += 1
case <-p.terminated:
return false
case <-time.After(timeout):
return false
return true
- 停止协程池时,需要对所有的工作协程发出stop信号:
for i := 0; i < cap(p.workerQueue); i++
worker := <-p.workerQueue
worker.stop <- struct
<-worker.stop
注意这里的遍历方式,而不是for…range语法。使用range语法可能不是遍历所有工作协程,当工作协程处于执行中的状态时,按照上述实现它是不会被遍历到的
以上是关于[Golang]实现一个带有等待和超时功能的协程池 - 类似Java中的ExecutorService接口实现的主要内容,如果未能解决你的问题,请参考以下文章