golang 允许连接池的抽象包装器
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了golang 允许连接池的抽象包装器相关的知识,希望对你有一定的参考价值。
golang sql 包连接池分析
golang 在使用 mysql 的时候会用到 database/sql 这个库,每次都在黑盒使用它,有必要来梳理一下整个请求流程和细节,以免以后碰到问题会有解决的思路。
阅读之前的几个问题
- sql 的连接池的连接怎么维护的?
- Query / Exec 如何获取查询的连接?
- 连接池的连接如何释放的?
几个重要的结构
DB struct
先来看看 DB 结构,该结构是 sql 包的核心结构。DB 是表示零个或多个底层连接池的数据库句柄,是并发安全的。
sql 包可以自动创建和释放连接,它还维护一个空闲连接池。如果数据库具有每个连接状态的概念,则只能在事务中可靠地观察到这种状态。调用 DB.Begin 后,返回的 Tx 将绑定到单个连接。在事务上调用 Commit 或 Rollback 后,该事务的连接将返回到 DB 的空闲连接池。SetMaxIdleConns 用来控制连接池大小。
type DB struct { driver driver.Driver // 数据库驱动 dsn string // 数据库连接参数,比如 username,hostname,password 等等 numClosed uint64 // numClosed 是一个原子计数器,表示已关闭连接的总数。Stmt.openStmt 在清除 Stmt.css 中的已关闭连接之前对其进行检查。 mu sync.Mutex // 保护下面的字段 freeConn []*driverConn // 空闲连接 connRequests map[uint64]chan connRequest // 阻塞请求队列。当达到最大连接数时,后续请求将插入该队列来等待可用连接 nextRequest uint64 // connRequests 的下一个 key numOpen int // 已连接或者正等待连接的数量 // 一个创建新连接的信号, // 运行connectionOpener()的goroutine读取此chan,maybeOpenNewConnections发送此chan(每个需要的连接发送一次) // 它在db.Close()时关闭,并通知connectionOpener goroutine退出。 openerCh chan struct{} closed bool dep map[finalCloser]depSet lastPut map[*driverConn]string // 用于 debug maxIdle int // 最大空闲连接数, 0等价于 defaultMaxIdleConns 常量(代码中值为2),负数等价于0 maxOpen int // 数据库的最大连接数,0 等价于不限制最大连接数 maxLifetime time.Duration // 连接的最大生命周期 cleanerCh chan struct{} // 用于释放连接池中过期的连接的信号 }
driverConn struct
driverConn 使用互斥锁封装一个 driver.Conn 结构,在所有对 Conn 的调用期间保持(包括对通过该 Conn 返回的接口的任何调用,例如对 Tx,Stmt,Result,Rows 的调用)
type driverConn struct { db *DB createdAt time.Time sync.Mutex // 保护下面的字段 ci driver.Conn closed bool finalClosed bool // ci.Close 已经被调用则为 true openStmt map[*driverStmt]bool // 下面的字段被 db.mu 保护 inUse bool onPut []func() // 下次返回 conn 时运行的代码 dbmuClosed bool // 与 closed 字段相同,但由 db.mu 保护,用于 removeClosedStmtLocked } // driver.Conn 是具体的接口 用来支持不同的数据库 // Conn 是与数据库的连接,不是 gotoutines 安全的。 // Conn 被认为是有状态的。 type Conn interface { // Prepare 返回绑定到该连接的就绪语句 Stmt。 Prepare(query string) (Stmt, error) // Close 使当前就绪的语句和事务无效并可能停止,将此连接标记为不再使用。 // // 因为sql包维护一个空闲的连接池,并且只有在空闲连接过剩时才调用Close,所以驱动不需要做自己的连接缓存。 Close() error // Begin 启动并返回一个新的事务 Tx Begin() (Tx, error) } // 可以看到driverConn的这个方法,看名字就知道是释放连接的 调用了DB 的 putConn 方法,这里先留个印象 func (dc *driverConn) releaseConn(err error) { dc.db.putConn(dc, err) }
驱动注册绑定
我们在使用指定数据库时需要使用import _ "github.com/go-sql-driver/mysql"
来执行 init() 函数。这个 init() 函数主要用来将指定的数据库驱动注册到 sql 的 一个 map 类型的 drivers 变量中。
mysql/driver.go
// 该方法注册到驱动,也就是db.Open的调用,返回的mc是实现的driver.Conn接口的结构,dsn 为连接该数据库的配置 func (d MySQLDriver) Open(dsn string) (driver.Conn, error) { // New mysqlConn mc := &mysqlConn{ maxAllowedPacket: maxPacketSize, maxWriteSize: maxPacketSize - 1, closech: make(chan struct{}), } ... return mc, nil } // 注册驱动 MySQLDriver 结构实现了 driver.Driver 接口 func init() { sql.Register("mysql", &MySQLDriver{}) }
连接
创建连接的过程
可以看到,调用sql.Open的时候会启动一个 goroutine 一直阻塞读取db.openerCh
。当这个openerCh收到信号时,会启动创建连接的流程,调用驱动提供的创建连接的方法创建连接。如果创建成功,优先把改连接给db.connRequests
中阻塞的请求使用,如果没有阻塞的请求就把这个新连接放入 db.freeConn
中待请求使用。
关键方法
// 调用驱动的 Open 方法创建新连接 func (db *DB) openNewConnection() { // 创建新连接 ci, err := db.driver.Open(db.dsn) db.mu.Lock() defer db.mu.Unlock() if db.closed { if err == nil { ci.Close() } db.numOpen-- return } if err != nil { db.numOpen-- db.putConnDBLocked(nil, err) db.maybeOpenNewConnections() return } dc := &driverConn{ db: db, createdAt: nowFunc(), ci: ci, } // 直接给阻塞的请求使用 或者 放入连接池 if db.putConnDBLocked(dc, err) { db.addDepLocked(dc, dc) } else { db.numOpen-- ci.Close() } } // 给 阻塞在 connRequest 队列的请求分配连接 func (db *DB) putConnDBLocked(dc *driverConn, err error) bool { if db.closed { return false } // 如果超过最大连接,直接返回false,connRequest 队列的请求继续阻塞 if db.maxOpen > 0 && db.numOpen > db.maxOpen { return false } // 把连接分配给 connRequests 阻塞的请求 if c := len(db.connRequests); c > 0 { var req chan connRequest var reqKey uint64 // 取第一个 for reqKey, req = range db.connRequests { break } // 阻塞的请求得到连接 删除 connRequests 的记录 delete(db.connRequests, reqKey) // 标记该连接正在使用 if err == nil { dc.inUse = true } // 通过 chan 把该连接发送给请求 req <- connRequest{ conn: dc, err: err, } return true // 如果空闲连接数小于最大连接限制 把该连接放到 freeConn 中 } else if err == nil && !db.closed && db.maxIdleConnsLocked() > len(db.freeConn) { db.freeConn = append(db.freeConn, dc) // 根据 db.maxLifetime 起一个 goroutine 清除freeConn中过期的连接 db.startCleanerLocked() return true } return false }
查询如何获取连接
先来看提供的两个基本的查询的方法 Query / Exec
// 使用方法 db.Query("SELECT * FROM table") db.Exec("INSERT INTO table VALUES (1)")
- Query:执行需要返回 rows 的操作,例如 SELECT)不释放连接,但在调用后仍然保持连接,即放回 freeConn。
-
Exec:执行没有返回 rows 的操作,例如 INSERT, UPDATE,DELETE)在调用后自动释放连接。
关键方法
// conn 返回新打开的连接,或者从连接池freeConn中取 func (db *DB) conn(ctx context.Context, strategy connReuseStrategy) (*driverConn, error) { ... // 检查上下文是否被取消 select { default: case <-ctx.Done(): db.mu.Unlock() return nil, ctx.Err() } lifetime := db.maxLifetime // cachedOrNewConn 模式获取连接 numFree := len(db.freeConn) if strategy == cachedOrNewConn && numFree > 0 { conn := db.freeConn[0] copy(db.freeConn, db.freeConn[1:]) db.freeConn = db.freeConn[:numFree-1] conn.inUse = true db.mu.Unlock() if conn.expired(lifetime) { conn.Close() return nil, driver.ErrBadConn } return conn, nil } // 如果连接数已经超过限制,将该请求放入connRequest中阻塞,直到有空闲连接 if db.maxOpen > 0 && db.numOpen >= db.maxOpen { // Make the connRequest channel. It\'s buffered so that the // connectionOpener doesn\'t block while waiting for the req to be read. req := make(chan connRequest, 1) reqKey := db.nextRequestKeyLocked() db.connRequests[reqKey] = req db.mu.Unlock() // 上下文判断请求超时 select { case <-ctx.Done(): // 删除 connRequests 中阻塞的请求 db.mu.Lock() delete(db.connRequests, reqKey) db.mu.Unlock() select { default: case ret, ok := <-req: if ok { // 如果收到了连接,由于超时了,回收该连接 db.putConn(ret.conn, ret.err) } } return nil, ctx.Err() // 获取到了连接,返回处理 case ret, ok := <-req: if !ok { return nil, errDBClosed } if ret.err == nil && ret.conn.expired(lifetime) { ret.conn.Close() return nil, driver.ErrBadConn } return ret.conn, ret.err } } // 连接池中没有连接,且打开的连接数没有超限,创建新连接 db.numOpen++ // optimistically db.mu.Unlock() ci, err := db.driver.Open(db.dsn) if err != nil { db.mu.Lock() db.numOpen-- // correct for earlier optimism db.maybeOpenNewConnections() db.mu.Unlock() return nil, err } db.mu.Lock() dc := &driverConn{ db: db, createdAt: nowFunc(), ci: ci, } db.addDepLocked(dc, dc) dc.inUse = true db.mu.Unlock() return dc, nil }
连接的回收或释放
被动回收或释放
我们沿着上面的 Query 请求分析下来,在 queryConn 的方法中会看到一个 releaseConn 的方法,它调用了putConn
方法去处理这个 dc 连接。
func (dc *driverConn) releaseConn(err error) { dc.db.putConn(dc, err) }
再来看看 putConn 方法的定义
// 把 dc 连接放回连接池 freeConn 或者释放 func (db *DB) putConn(dc *driverConn, err error) { db.mu.Lock() // 回收一个没被使用的连接 会panic if !dc.inUse { if debugGetPut { fmt.Printf("putConn(%v) DUPLICATE was: %s\\n\\nPREVIOUS was: %s", dc, stack(), db.lastPut[dc]) } panic("sql: connection returned that was never out") } if debugGetPut { db.lastPut[dc] = stack() } // 将该连接置为 未使用 dc.inUse = false // 执行完该连接的函数 for _, fn := range dc.onPut { fn() } dc.onPut = nil // 不重用无效的连接 if err == driver.ErrBadConn { // 该函数会判断 阻塞在 connRequest 的请求数量,然后在不超限的情况下,通过 openerCh 唤醒 connectionOpener goroutine 创建新连接处理请求。 db.maybeOpenNewConnections() db.mu.Unlock() // 释放连接 dc.Close() return } if putConnHook != nil { putConnHook(db, dc) } // 如果是有效的连接,将该连接给 阻塞在 connRequest 的请求使用,或者放回连接池 added := db.putConnDBLocked(dc, nil) db.mu.Unlock() // 改连接没被回收,释放 if !added { dc.Close() } }
主动回收或释放
除了上述的连接回收释放方式,还有没有其他地方回收释放呢。当我们设置 db.SetConnMaxLifetime
也就是设置连接的最大存活时间时,都会调起一个 goroutine 负责处理连接池中过期的连接。同 openerCh 信号一样,释放也用到了一个 cleanerCh 用于通知该 goroutine 处理任务。
func (db *DB) SetConnMaxLifetime(d time.Duration) { if d < 0 { d = 0 } db.mu.Lock() // 当缩小 maxLifetime 的时候,直接清理不符的连接 if d > 0 && d < db.maxLifetime && db.cleanerCh != nil { select { case db.cleanerCh <- struct{}{}: default: } } db.maxLifetime = d // 该方法会起一个 goroutine 负责释放过期连接 db.startCleanerLocked() db.mu.Unlock() } // 满足条件开启一个 goroutine 维护过期的连接 func (db *DB) startCleanerLocked() { if db.maxLifetime > 0 && db.numOpen > 0 && db.cleanerCh == nil { db.cleanerCh = make(chan struct{}, 1) go db.connectionCleaner(db.maxLifetime) } } // 核心的逻辑 func (db *DB) connectionCleaner(d time.Duration) { const minInterval = time.Second if d < minInterval { d = minInterval } t := time.NewTimer(d) // 阻塞等待 cleanerCh 信号 for { select { case <-t.C: case <-db.cleanerCh: // maxLifetime 修改 或者 db 关闭会发送该信号 } db.mu.Lock() d = db.maxLifetime if db.closed || db.numOpen == 0 || d <= 0 { db.cleanerCh = nil db.mu.Unlock() return } expiredSince := nowFunc().Add(-d) var closing []*driverConn // 从连接池 freeConn 获取过期连接 for i := 0; i < len(db.freeConn); i++ { c := db.freeConn[i] if c.createdAt.Before(expiredSince) { closing = append(closing, c) last := len(db.freeConn) - 1 db.freeConn[i] = db.freeConn[last] db.freeConn[last] = nil db.freeConn = db.freeConn[:last] i-- } } db.mu.Unlock() // 释放连接 for _, c := range closing { c.Close() } if d < minInterval { d = minInterval } t.Reset(d) } }
小结
现在回过头来看看开始的三个问题,基本就有解了。
- sql 的连接池的连接怎么维护的?
有效的连接存储在连接池 freeConn 中。启用一个connectionOpener
goroutine 通过接受openerCh
信号负责调用驱动的 Open 方法创建连接。当用db.SetConnMaxLifetime
设置MaxLifetime
或者调用putConnDBLocked
方法满足条件时候会启用一个connectionCleaner
goroutine 通过接受cleanerCh
信号负责清理连接。 - Query / Exec 如何获取查询的连接?
- 先查看 freeConn 是否有可用的连接,如果有就从连接池取。如果没有进入下一步。
- 判断当前连接数是否超限。如果超限,将该请求放入 connRequests 阻塞等待可用连接。如果没超限进入下一步。
- 创建新的连接
- 连接池的连接如何回收/释放的?
- 被动回收/释放。通过查询等操作返回错误的时候会执行
releaseConn
函数回收连接。当满足条件时 会起一个connectionCleaner
goroutine 清理连接池的无效连接。 - 主动回收/释放。设置
db.SetConnMaxLifetime
的时候会触发一次connectionCleaner
goroutine 清理连接池。
- 被动回收/释放。通过查询等操作返回错误的时候会执行
可以看到sql库的连接池的实现机制其实还是蛮复杂的,生产者connectionOpener
goroutine 阻塞监听 openerCh 创建连接放入连接池。当请求来时,先查询连接池有没有空闲连接,如果没有空闲连接则创建,Query 类的请求用完继续放回连接池重用。当需要清理连接池的连接时调用connectionCleaner
goroutine。分析过一遍,以后遇到问题会更快的处理。
以上是关于golang 允许连接池的抽象包装器的主要内容,如果未能解决你的问题,请参考以下文章