[Golang]实现一个带有等待和超时功能的协程池 - 类似Java中的ExecutorService接口实现

Posted dm_vincent

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了[Golang]实现一个带有等待和超时功能的协程池 - 类似Java中的ExecutorService接口实现相关的知识,希望对你有一定的参考价值。

对于支持CSP并发编程模型的Golang而言,要实现一个协程池是非常简单的。对于习惯了基于线程模型完成并发编程的开发同学,可能初次接触会有点难理解,但是俗话说"书读百遍其义自见",百来行的代码也并不多。

我们的目标是实现一个具有以下特性的协程池(熟悉Java的话,基本上就是实现了ExecutorService接口中的主要方法):

  1. 能够指定任务队列长度和工作协程的数量使用任务队列
  2. 能够支持启动和停止
  3. 能够等待投递到其中的任务执行完毕(等待时间可以指定)

核心的三个数据结构:

  1. 协程池
  2. 工作协程
  3. 任务

直接上代码:

package pool

import (
   "sync"
   "time"
)

const (
   defaultWorkerQueueLength = 10   // 默认工作协程数量
   defaultJobQueueLength    = 1000 // 默认任务队列长度
)

type Job func()

type TimeoutPool struct 
   workerQueue chan *worker
   jobQueue    chan Job
   jobCount    int
   jobRet      chan struct
   stop        chan struct
   terminated  chan struct
   lock        sync.Mutex


// 初始化一个带有执行超时时间的协程池,协程数量:10;任务队列长度1000
func NewTimeoutPoolWithDefaults() *TimeoutPool 
   return NewTimeoutPool(defaultWorkerQueueLength, defaultJobQueueLength)


// 初始化一个带有执行超时时间的协程池,指定worker数量以及任务队列长度
func NewTimeoutPool(workerQueueLen, jobQueueLen int) *TimeoutPool 
   pool := &TimeoutPool
      workerQueue: make(chan *worker, workerQueueLen),
      jobQueue:    make(chan Job, jobQueueLen),
      jobRet:      make(chan struct, jobQueueLen),
      stop:        make(chan struct),
      terminated:  make(chan struct),
   

   return pool


// 停止协程池运行,如果有正在运行中的任务会等待其运行完毕
func (p *TimeoutPool) Terminate() 
   p.stop <- struct


// 提交一个任务到协程池
func (p *TimeoutPool) Submit(job Job) 
   p.jobQueue <- job

   p.lock.Lock()
   p.jobCount += 1
   p.lock.Unlock()


// 启动并等待协程池内的运行全部运行结束 - 如果没有主动停止,如果有任务还在执行中会一直等待
// 如果返回true表示在规定时间范围内成功结束;返回false表示主动停止
// 注意:最终应该只有一个协程来调用Wait等待协程池运行结束,否则其中的计数存在竞态条件问题
func (p *TimeoutPool) StartAndWaitUntilTerminated() bool 
   // 启动协程池
   p.start()

   // 等待运行结束
   completed := 0
   for completed < p.jobCount 
      select       
      case <-p.terminated:
         return false
      default:
         select 
         case <-p.jobRet:
            completed += 1
         case <-p.terminated:
            return false
         
      
   

   return true


// 启动并等待协程池内的运行全部运行结束
// 如果返回true表示在规定时间范围内成功结束;返回false表示运行整体超时或者主动停止
// 注意:最终应该只有一个协程来调用Wait等待协程池运行结束,否则其中的计数存在竞态条件问题
func (p *TimeoutPool) StartAndWait(timeout time.Duration) bool 
   // 启动协程池
   p.start()

   // 等待运行结束
   completed := 0
   for completed < p.jobCount 
      select 
      case <-p.terminated:
         return false
      case <-time.After(timeout):
         return false
      default:
         select 
         case <-p.jobRet:
            completed += 1
         case <-p.terminated:
            return false
         case <-time.After(timeout):
            return false
         
      
   

   return true


// ~~ 内部实现

func (p *TimeoutPool) start() 
   for i := 0; i < cap(p.workerQueue); i++ 
      newWorker(p.workerQueue, p.jobRet)
   

   go p.dispatch()


func (p *TimeoutPool) dispatch() 
   for 
      var job Job
      select 
      case job = <-p.jobQueue:
         worker := <-p.workerQueue
         worker.jobChannel <- job
      case <-p.stop:
         for i := 0; i < cap(p.workerQueue); i++ 
            worker := <-p.workerQueue
            worker.stop <- struct
            <-worker.stop
         
         p.terminated <- struct
         return
      
   


type worker struct 
   workerQueue chan *worker
   jobChannel  chan Job
   jobRet      chan struct
   stop        chan struct


func (w *worker) start() 
   go func() 
      for 
         w.workerQueue <- w
         var job Job
         select 
         case job = <-w.jobChannel:
            job()
            w.jobRet <- struct
         case <-w.stop:
            w.stop <- struct
            return
         
      
   ()


func newWorker(workerQueue chan *worker, jobRet chan struct) *worker 
   worker := &worker
      workerQueue: workerQueue,
      jobChannel:  make(chan Job),
      jobRet:      jobRet,
      stop:        make(chan struct),
   

   worker.start()
   return worker

值得说明的一些细节:

  1. 提交任务时使用一个锁来保证多个协程并发提交任务时,总任务数能够正确计数
  2. channel发送一个信号后等待反馈:
worker.stop <- struct  // 给工作协程发送一个stop信号
<-worker.stop              // 等待工作协程成功stop后的反馈信号
  1. 等待执行结束的方法中多级select体现接收信号的优先级关系(源自多情况同时满足时,select会随机选择一个的语言特性):
func (p *TimeoutPool) StartAndWait(timeout time.Duration) bool 
   // 启动协程池
   p.start()

   // 等待运行结束
   completed := 0
   for completed < p.jobCount 
      // 外层的select优先级高,当有任务完成的信号和terminate信号同时出现时,优先选择terminate信号
      select 
      case <-p.terminated:
         return false
      case <-time.After(timeout):
         return false
      default:
         // 内层的select优先级低,能够同时处理三种支持的信号
         select 
         case <-p.jobRet:
            completed += 1
         case <-p.terminated:
            return false
         case <-time.After(timeout):
            return false
         
      
   

   return true

  1. 停止协程池时,需要对所有的工作协程发出stop信号:
for i := 0; i < cap(p.workerQueue); i++ 
    worker := <-p.workerQueue
    worker.stop <- struct
    <-worker.stop
 

注意这里的遍历方式,而不是for…range语法。使用range语法可能不是遍历所有工作协程,当工作协程处于执行中的状态时,按照上述实现它是不会被遍历到的

以上是关于[Golang]实现一个带有等待和超时功能的协程池 - 类似Java中的ExecutorService接口实现的主要内容,如果未能解决你的问题,请参考以下文章

[Golang]实现一个带有等待和超时功能的协程池 - 类似Java中的ExecutorService接口实现

深入浅出Golang的协程池设计

fasthttp中的协程池实现

Go协程与协程池

Go协程与协程池

golang协程调度模式解密