Go组件学习——手写连接池并没有那么简单

Posted JackieZheng

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Go组件学习——手写连接池并没有那么简单相关的知识,希望对你有一定的参考价值。


1、背景

前段时间在看gorm,发现gorm是复用database/sql的连接池。

于是翻了下database/sql的数据库连接池的代码实现,看完代码,好像也不是很复杂,但是总觉得理解不够深刻,于是萌生了自己想写个连接池的想法。(最后也验证了,看源码的理解确实不够深刻,一看就会,一做就跪)

2、连接池的实现原理

什么是连接池

  • 顾名思义是一个池子

  • 池子里面存放有限数量即时可用的连接,减少创建连接和关闭连接的时间

  • 连接是有存活时间的

具体到数据库连接池,我根据自己的理解画了一张获取连接的流程图


从上图我们可以看出,除了连接池的容量大小,我们还有一个最大连接数的限制。池子里的连接让我们不用频繁的创建和关闭连接,同时应该也要有最大连接的限制,避免无限制的创建连接导致服务器资源耗尽,拖垮服务不可用。

池子中的连接也有存活时间,如果超过存活时间则会销毁连接。

3、实现连接池我们需要考虑哪些问题

3.1 功能点

  • 获取连接

  • 释放连接

  • Ping

  • 关闭连接池

  • 设置最大连接数和连接池容量(连接存活时间等等)

3.2 实现细节

  • 连接应该有哪些属性,比如最大连接数、连接池容量、连接创建时间和存活时间

  • 如何模拟使用连接池以及超过最大连接数后等待其他连接释放

  • 如何保证在多协程操作下数据的一致性

  • 如果实现连接的超时监听和通知

4、具体实现

这里的连接池实现包括

  • 设置最大连接数和连接池容量

  • 获取连接

  • 释放连接

4.1 结构定义

定义Conn结构体,这里包含了几乎所有的有关连接需要的信息属性

 
   
   
 
  1. type Conn struct {

  2. maxConn int // 最大连接数

  3. maxIdle int // 最大可用连接数

  4. freeConn int // 线程池空闲连接数

  5. connPool []int // 连接池

  6. openCount int // 已经打开的连接数

  7. waitConn map[int]chan Permission // 排队等待的连接队列

  8. waitCount int // 等待个数

  9. lock sync.Mutex // 锁

  10. nextConnIndex NextConnIndex // 下一个连接的ID标识(用于区分每个ID)

  11. freeConns map[int]Permission // 连接池的连接

  12. }

这里并不会创建一个真正的数据库连接,而是使用一个非空的Permission表示拿到了连接。拿到一个非空的Permission才有资格执行后面类似增删改查的操作。

Permission对应的结构体如下

 
   
   
 
  1. type Permission struct {

  2. NextConnIndex // 对应Conn中的NextConnIndex

  3. Content string // 通行证的具体内容,比如"PASSED"表示成功获取

  4. CreatedAt time.Time // 创建时间,即连接的创建时间

  5. MaxLifeTime time.Duration // 连接的存活时间,本次没有用到这个属性,保留

  6. }

NextConnIndex对应的结构体如下

 
   
   
 
  1. type NextConnIndex struct {

  2. Index int

  3. }

还有一个用来设置最大连接数以及连接池最大连接数的Config

 
   
   
 
  1. type Config struct {

  2. MaxConn int

  3. MaxIdle int

  4. }

4.2 初始化连接池参数

 
   
   
 
  1. func Prepare(ctx context.Context, config *Config) (conn *Conn) {

  2. // go func() {

  3. //for {

  4. //conn.expiredCh = make(chan string, len(conn.freeConns))

  5. //for _, value := range conn.freeConns {

  6. // if value.CreatedAt.Add(value.MaxLifeTime).Before(nowFunc()) {

  7. // conn.expiredCh <- "CLOSE"

  8. // }

  9. //}

  10. // }()

  11. return &Conn{

  12. maxConn: config.MaxConn,

  13. maxIdle: config.MaxIdle,

  14. openCount: 0,

  15. connPool: []int{},

  16. waitConn: make(map[int]chan Permission),

  17. waitCount: 0,

  18. freeConns: make(map[int]Permission),

  19. }

  20. }

这里主要是初始化上面的Conn结构体参数。

注释的部分,主要想通过启动一个监听协程,用于监听已经过期的连接,并通过channel发送。(这块还有一些细节没有想清楚,先搁置)

4.3 设置MaxConn和MaxIdle

在main.go中添加代码

 
   
   
 
  1. ctx := context.Background()

  2. config := &custom_pool.Config{

  3. MaxConn: 2,

  4. MaxIdle: 1,

  5. }

这里意味连接池只能缓存一个连接,最大新建连接数为2,超过则要加入等待队列。

4.4 获取连接

 
   
   
 
  1. // 创建连接

  2. func (conn *Conn) New(ctx context.Context) (permission Permission, err error) {

  3. /**

  4. 1、如果当前连接池已满,即len(freeConns)=0

  5. 2、判定openConn是否大于maxConn,如果大于,则丢弃获取加入队列进行等待

  6. 3、如果小于,则考虑创建新连接

  7. */

  8. conn.lock.Lock()


  9. select {

  10. default:

  11. case <-ctx.Done(): // context取消或超时,则退出

  12. conn.lock.Unlock()


  13. return Permission{}, errors.New("new conn failed, context cancelled!")

  14. }


  15. // 连接池不为空,从连接池获取连接

  16. if len(conn.freeConns) > 0 {

  17. var (

  18. popPermission Permission

  19. popReqKey int

  20. )


  21. // 获取其中一个连接

  22. for popReqKey, popPermission = range conn.freeConns {

  23. break

  24. }

  25. // 从连接池删除

  26. delete(conn.freeConns, popReqKey)

  27. fmt.Println("log", "use free conn!!!!!", "openCount: ", conn.openCount, " freeConns: ", conn.freeConns)

  28. conn.lock.Unlock()

  29. return popPermission, nil

  30. }


  31. if conn.openCount >= conn.maxConn { // 当前连接数大于上限,则加入等待队列

  32. nextConnIndex := getNextConnIndex(conn)


  33. req := make(chan Permission, 1)

  34. conn.waitConn[nextConnIndex] = req

  35. conn.waitCount++

  36. conn.lock.Unlock()


  37. select {

  38. // 如果在等待指定超时时间后,仍然无法获取释放连接,则放弃获取连接,这里如果不在超时时间后退出会一直阻塞

  39. case <-time.After(time.Second * time.Duration(3)):

  40. fmt.Println("超时,通知主线程退出")

  41. return

  42. case ret, ok := <-req: // 有放回的连接, 直接拿来用

  43. if !ok {

  44. return Permission{}, errors.New("new conn failed, no available conn release")

  45. }

  46. fmt.Println("log", "received released conn!!!!!", "openCount: ", conn.openCount, " freeConns: ", conn.freeConns)

  47. return ret, nil

  48. }

  49. return Permission{}, errors.New("new conn failed")

  50. }


  51. // 新建连接

  52. conn.openCount++

  53. conn.lock.Unlock()

  54. permission = Permission{NextConnIndex: NextConnIndex{nextConnIndex},

  55. Content: "PASSED", CreatedAt: nowFunc(), MaxLifeTime: time.Second * 5}

  56. fmt.Println("log", "create conn!!!!!", "openCount: ", conn.openCount, " freeConns: ", conn.freeConns)

  57. return permission, nil

  58. }

这里主要分为三个部分


  • 如果连接池不为空,则直接从池子里面获取连接使用即可



  • 如果连接池为空,且当前的连接数已经超过最大连接数maxConn,则会将当前任务加入等待队列,同时监听是否有释放的可用连接,如果有则拿来直接用,如果超过指定等待时间后仍然取不到连接则退出阻塞返回。



  • 如果连接池为空,且尚未达到最大连接数maxConn,则新建一个新连接。


getNextConnIndex函数

 
   
   
 
  1. func getNextConnIndex(conn *Conn) int {

  2. currentIndex := conn.nextConnIndex.Index

  3. conn.nextConnIndex.Index = currentIndex + 1

  4. return conn.nextConnIndex.Index

  5. }

4.5 释放连接

 
   
   
 
  1. // 释放连接

  2. func (conn *Conn) Release(ctx context.Context) (result bool, err error) {

  3. conn.lock.Lock()

  4. // 如果等待队列有等待任务,则通知正在阻塞等待获取连接的进程(即New方法中"<-req"逻辑)

  5. // 这里没有做指定连接的释放,只是保证释放的连接会被利用起来

  6. if len(conn.waitConn) > 0 {

  7. var req chan Permission

  8. var reqKey int

  9. for reqKey, req = range conn.waitConn {

  10. break

  11. }

  12. // 假定释放的连接就是下面新建的连接

  13. permission := Permission{NextConnIndex: NextConnIndex{reqKey},

  14. Content: "PASSED", CreatedAt: nowFunc(), MaxLifeTime: time.Second * 5}

  15. req <- permission

  16. conn.waitCount--

  17. delete(conn.waitConn, reqKey)

  18. conn.lock.Unlock()

  19. } else {

  20. if conn.openCount > 0 {

  21. conn.openCount--


  22. if len(conn.freeConns) < conn.maxIdle { // 确保连接池大小不会超过maxIdle

  23. nextConnIndex := getNextConnIndex(conn)

  24. permission := Permission{NextConnIndex: NextConnIndex{nextConnIndex},

  25. Content: "PASSED", CreatedAt: nowFunc(), MaxLifeTime: time.Second * 5}

  26. conn.freeConns[nextConnIndex] = permission

  27. }

  28. }

  29. conn.lock.Unlock()

  30. }

  31. return

  32. }

这里主要分为两部分

  • 如果释放连接的时候发现等待队列有任务在等待,则将释放的连接通过channel发送,给正在等待连接释放的阻塞任务使用,同时从等待队列中删除该任务。

  • 如果当前无等待任务,则将连接放入连接池

这里的nowFunc

 
   
   
 
  1. var nowFunc = time.Now

5、Case模拟

5.1 无释放创建连接

即只有创建连接,拿到连接也不会释放连接

 
   
   
 
  1. package main


  2. import (

  3. "context"

  4. custom_pool "go-demo/main/src/custom-pool"

  5. )


  6. func main() {


  7. ctx := context.Background()

  8. config := &custom_pool.Config{

  9. MaxConn: 2,

  10. MaxIdle: 1,

  11. }

  12. conn := custom_pool.Prepare(ctx, config)

  13. if _, err := conn.New(ctx); err != nil {

  14. return

  15. }

  16. if _, err := conn.New(ctx); err != nil {

  17. return

  18. }

  19. if _, err := conn.New(ctx); err != nil {

  20. return

  21. }

  22. if _, err := conn.New(ctx); err != nil {

  23. return

  24. }

  25. if _, err := conn.New(ctx); err != nil {

  26. return

  27. }

  28. }

执行结果如下

注意上面代码都是一直在获取连接,在获取连接后没有释放连接。

第一次获取,连接池为空,则新建连接

第二次获取,连接池为空,继续新建连接

第三次获取,连接池为空,同时已有连接数>=maxConn,所以会阻塞等待释放连接,但是因为没有连接释放,所以一直等待,直到3秒超时后退出。

所以第三次、第四次和第五次都是超时退出

5.2 释放连接

如果我们释放连接会怎么样,我们可以通过新启一个协程用于释放一个连接如下

 
   
   
 
  1. package main


  2. import (

  3. "context"

  4. custom_pool "go-demo/main/src/custom-pool"

  5. )


  6. func main() {


  7. ctx := context.Background()

  8. config := &custom_pool.Config{

  9. MaxConn: 2,

  10. MaxIdle: 1,

  11. }

  12. conn := custom_pool.Prepare(ctx, config)

  13. if _, err := conn.New(ctx); err != nil {

  14. return

  15. }

  16. if _, err := conn.New(ctx); err != nil {

  17. return

  18. }

  19. go conn.Release(ctx)

  20. if _, err := conn.New(ctx); err != nil {

  21. return

  22. }

  23. if _, err := conn.New(ctx); err != nil {

  24. return

  25. }

  26. if _, err := conn.New(ctx); err != nil {

  27. return

  28. }

  29. }

执行结果如下

 
   
   
 
  1. log create conn!!!!! openCount: 1 freeConns: map[]

  2. log create conn!!!!! openCount: 2 freeConns: map[]

  3. log received released conn!!!!! openCount: 2 freeConns: map[]

  4. 超时,通知主线程退出

  5. 超时,通知主线程退出

前两次和上面一样,但是第三次获取的时候,会收到一个释放的连接,所以可以直接复用释放的连接返回。

但是第四次和第五次创建,因为没有释放的连接,所以都会因为等待超时后退出。

5.3 使用连接池

上面的两个case是在MaxConn=2,MaxIdle=1的情况下执行的。

下面我们看看如果基于以上两个参数设定,模拟出正好使用连接池的情况。

 
   
   
 
  1. package main


  2. import (

  3. "context"

  4. custom_pool "go-demo/main/src/custom-pool"

  5. )


  6. func main() {


  7. ctx := context.Background()

  8. config := &custom_pool.Config{

  9. MaxConn: 2,

  10. MaxIdle: 1,

  11. }

  12. conn := custom_pool.Prepare(ctx, config)

  13. if _, err := conn.New(ctx); err != nil {

  14. return

  15. }

  16. go conn.Release(ctx)

  17. if _, err := conn.New(ctx); err != nil {

  18. return

  19. }

  20. go conn.Release(ctx)

  21. if _, err := conn.New(ctx); err != nil {

  22. return

  23. }

  24. go conn.Release(ctx)

  25. if _, err := conn.New(ctx); err != nil {

  26. return

  27. }

  28. go conn.Release(ctx)

  29. if _, err := conn.New(ctx); err != nil {

  30. return

  31. }

  32. }

即除了第一次,后面都会有连接释放。

执行结果可能情况如下

 
   
   
 
  1. log create conn!!!!! openCount: 1 freeConns: map[]

  2. log create conn!!!!! openCount: 2 freeConns: map[]

  3. log use free conn!!!!! openCount: 1 freeConns: map[]

  4. log use free conn!!!!! openCount: 0 freeConns: map[]

  5. log create conn!!!!! openCount: 1 freeConns: map[]

从执行结果可以看出,这里有两次使用了连接池中的连接。

注意:因为释放是新启协程执行,所以无法保证执行顺序,不同的执行顺序,会有不同的执行结果。上面只是执行结果的一种。

以上完整代码参见https://github.com/DMinerJackie/go-demo/tree/master/main/src/custom-pool

6、总结和展望

6.1 总结

  • 通过手写连接池加深对于连接池实现的理解

  • 学会使用channel和协程

  • 学会如何在channel阻塞指定时间后退出(设立超时时间)

  • 学会对于共享资源加锁,比如nextConnIndex的获取和更新需要加锁

6.2 展望

  • Close和Ping没有写(实现不难)

  • 连接池连接需要有存活时间,并在连接过期的时候从连接池删除

  • 实现使用的是普通的map集合,可以考虑并发安全的syncMap

  • 代码实现比较简陋不够优雅,可以继续完善保证职责单一



以上是关于Go组件学习——手写连接池并没有那么简单的主要内容,如果未能解决你的问题,请参考以下文章

Go 中的简单工作池

Go组件学习——cron定时器

手写数字识别——基于全连接层和MNIST数据集

golang代码片段(摘抄)

Go组件学习——database/sql数据库连接池你用对了吗

Java集合相关学习——手写一个简单的Map接口实现类(HashMap)