Go 连接池的设计与实现
Posted 亚洲第一中锋_哈达迪
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Go 连接池的设计与实现相关的知识,希望对你有一定的参考价值。
为什么需要连接池
如果不用连接池,而是每次请求都创建一个连接是比较昂贵的,因此需要完成3次tcp握手
同时在高并发场景下,由于没有连接池的最大连接数限制,可以创建无数个连接,耗尽文件描述符
连接池就是为了复用这些创建好的连接
连接池设计
基本上连接池都会设计以下几个参数:
-
初始连接数:在初始化连接池时就会预先创建好的连接数量,如果设置得:
- 过大:可能造成浪费
- 过小:请求到来时需要新建连接
-
最大空闲连接数maxIdle:池中最大缓存的连接个数,如果设置得:
- 过大:造成浪费,自己不用还把持着连接。因为数据库整体的连接数是有限的,当前进程占用多了,其他进程能获取的就少了
- 过小:无法应对突发流量
-
最大连接数maxCap:
- 如果已经用了maxCap个连接,要申请第maxCap+1个连接时,一般会阻塞在那里,直到超时或者别人归还一个连接
-
最大空闲时间idleTimeout:当发现某连接空闲超过这个时间时,会将其关闭,重新去获取连接
- 避免连接长时间没用,自动失效的问题
连接池对外提供两个方法,Get
:获取一个连接,Put
:归还一个连接
大部分连接池的实现大同小异,基本流程如下:
Get
需要注意:
-
当有空闲连接时,需要进一步判断连接是否有过期(超过最大空闲时间idleTimeout)
- 这些连接有可能很久没用过了,在数据库层面已经过期。如果贸然使用可能出现错误,因此最好检查下是否超时
-
当陷入阻塞时,最好设置超时时间,避免一直没等到有人归还连接而一直阻塞
Put
归还连接时:
- 先看有没有阻塞的获取连接的请求,如果有转交连接,并唤醒阻塞请求
- 否则看能否放回去空闲队列,如果不能直接关闭请求
总结
根据上面总结的流程,连接池还需要维护另外两个结构:
- 空闲队列
- 阻塞请求的队列
开源实现
接下来看几个开源连接池的实现,都大体符合上面介绍的流程
silenceper/pool
代码地址:https://github.com/silenceper/pool
数据结构:
// channelPool 存放连接信息
type channelPool struct
mu sync.RWMutex
// 空闲连接
conns chan *idleConn
// 产生新连接的方法
factory func() (interface, error)
// 关闭连接的方法
close func(interface) error
ping func(interface) error
// 最大空闲时间,最大阻塞等待时间(实际没用到)
idleTimeout, waitTimeOut time.Duration
// 最大连接数
maxActive int
openingConns int
// 阻塞的请求
connReqs []chan connReq
可以看出,silenceper/pool
:
- 用channel实现了空闲连接队列
conns
- 为每个阻塞的请求创建一个channel,加入
connReqs
中。这样请求会阻塞在自己的channel上
Get:
func (c *channelPool) Get() (interface, error)
conns := c.getConns()
if conns == nil
return nil, ErrClosed
for
select
// 如果有空闲连接
case wrapConn := <-conns:
if wrapConn == nil
return nil, ErrClosed
//判断是否超时,超时则丢弃
if timeout := c.idleTimeout; timeout > 0
if wrapConn.t.Add(timeout).Before(time.Now())
//丢弃并关闭该连接
c.Close(wrapConn.conn)
continue
//判断是否失效,失效则丢弃,如果用户没有设定 ping 方法,就不检查
if c.ping != nil
if err := c.Ping(wrapConn.conn); err != nil
c.Close(wrapConn.conn)
continue
return wrapConn.conn, nil
// 没有空闲连接
default:
c.mu.Lock()
log.Debugf("openConn %v %v", c.openingConns, c.maxActive)
if c.openingConns >= c.maxActive
// 连接数已经达到上线,不能再创建连接
req := make(chan connReq, 1)
c.connReqs = append(c.connReqs, req)
c.mu.Unlock()
// 将自己阻塞在channel上
ret, ok := <-req
if !ok
return nil, ErrMaxActiveConnReached
// 再检查一次是否超时
if timeout := c.idleTimeout; timeout > 0
if ret.idleConn.t.Add(timeout).Before(time.Now())
//丢弃并关闭该连接
c.Close(ret.idleConn.conn)
continue
return ret.idleConn.conn, nil
// 没有超过最大连接数,创建一个新的连接
if c.factory == nil
c.mu.Unlock()
return nil, ErrClosed
conn, err := c.factory()
if err != nil
c.mu.Unlock()
return nil, err
c.openingConns++
c.mu.Unlock()
return conn, nil
这段代码基本符合上面介绍的Get流程,应该很好理解
需要注意:
- 当收到别人归还的连接狗,这里再检查了一次是否超时。但我认为这次检查是没必要的,因为别人刚用完,一般不可能超时
- 虽然在pool的数据结构定义中有
waitTimeOut
字段,但实际没有使用,即阻塞获取可能无限期阻塞,这是一个优化点
Put:
// Put 将连接放回pool中
func (c *channelPool) Put(conn interface) error
if conn == nil
return errors.New("connection is nil. rejecting")
c.mu.Lock()
if c.conns == nil
c.mu.Unlock()
return c.Close(conn)
// 如果有请求在阻塞获取连接
if l := len(c.connReqs); l > 0
req := c.connReqs[0]
copy(c.connReqs, c.connReqs[1:])
c.connReqs = c.connReqs[:l-1]
// 将连接转交
req <- connReq
idleConn: &idleConnconn: conn, t: time.Now(),
c.mu.Unlock()
return nil
else
// 否则尝试是否能放回空闲连接队列
select
case c.conns <- &idleConnconn: conn, t: time.Now():
c.mu.Unlock()
return nil
default:
c.mu.Unlock()
//连接池已满,直接关闭该连接
return c.Close(conn)
值得注意的是:
- put方法唤醒阻塞请求时,从队头开始唤醒,这样先阻塞的请求先被唤醒,保证了
公平性
sql.DB
Go在官方库sql中就实现了连接池,这样的好处在于:
- 对于开发:就不用像java一样,需要自己找第三方的连接池实现
- 对于driver的实现:只用关心怎么和数据库交互,不用考虑连接池的问题
sql.DB
中和连接池相关的字段如下:
type DB struct
/**
...
*/
// 空闲连接队列
freeConn []*driverConn
// 阻塞请求的队列
connRequests map[uint64]chan connRequest
// 已经打开的连接
numOpen int // number of opened and pending open connections
// 最大空闲连接
maxIdle int // zero means defaultMaxIdleConns; negative means 0
// 最大连接数
maxOpen int // <= 0 means unlimited
// ...
继续看获取连接:
func (db *DB) conn(ctx context.Context, strategy connReuseStrategy) (*driverConn, error)
// 检测连接池是否被关闭
db.mu.Lock()
if db.closed
db.mu.Unlock()
return nil, errDBClosed
select
default:
// 检测ctx是否超时
case <-ctx.Done():
db.mu.Unlock()
return nil, ctx.Err()
lifetime := db.maxLifetime
db.numOpen++ // optimistically
db.mu.Unlock()
ci, err := db.connector.Connect(ctx)
if err != nil
db.mu.Lock()
db.numOpen-- // correct for earlier optimism
db.maybeOpenNewConnections()
db.mu.Unlock()
return nil, err
db.mu.Lock()
dc := &driverConn
db: db,
createdAt: nowFunc(),
ci: ci,
inUse: true,
db.addDepLocked(dc, dc)
db.mu.Unlock()
return dc, nil
接下来检测是否有空闲连接:
numFree := len(db.freeConn)
// 如果有空闲连接
if strategy == cachedOrNewConn && numFree > 0
// 从队头取一个
conn := db.freeConn[0]
copy(db.freeConn, db.freeConn[1:])
db.freeConn = db.freeConn[:numFree-1]
conn.inUse = true
db.mu.Unlock()
if conn.expired(lifetime)
conn.Close()
return nil, driver.ErrBadConn
// Reset the session if required.
if err := conn.resetSession(ctx); err == driver.ErrBadConn
conn.Close()
return nil, driver.ErrBadConn
return conn, nil
以上代码是1.14版本,但是到了1.18以后,获取空闲连接的方式发生了变化:
last := len(db.freeConn) - 1
if strategy == cachedOrNewConn && last >= 0
// 从最后一个位置获取连接
conn := db.freeConn[last]
db.freeConn = db.freeConn[:last]
conn.inUse = true
if conn.expired(lifetime)
db.maxLifetimeClosed++
db.mu.Unlock()
conn.Close()
return nil, driver.ErrBadConn
可以看出,1.14版本从队首获取,1.18改成从队尾获取连接
为啥从队尾拿连接?
- 因为队尾的连接是才放进去的,该连接过期的概率比队首连接小
继续看:
// 如果已经达到最大连接数
if db.maxOpen > 0 && db.numOpen >= db.maxOpen
req := make(chan connRequest, 1)
reqKey := db.nextRequestKeyLocked()
db.connRequests[reqKey] = req
db.waitCount++
db.mu.Unlock()
waitStart := time.Now()
// 阻塞当前请求,要么ctx超时,要么别人归还了连接
select
case <-ctx.Done():
db.mu.Lock()
// 把自己从阻塞队列中删除
delete(db.connRequests, reqKey)
db.mu.Unlock()
atomic.AddInt64(&db.waitDuration, int64(time.Since(waitStart)))
select
default:
case ret, ok := <-req:
if ok && ret.conn != nil
db.putConn(ret.conn, ret.err, false)
return nil, ctx.Err()
case ret, ok := <-req:
// 别人归还连接
atomic.AddInt64(&db.waitDuration, int64(time.Since(waitStart)))
if !ok
return nil, errDBClosed
if strategy == cachedOrNewConn && ret.err == nil && ret.conn.expired(lifetime)
ret.conn.Close()
return nil, driver.ErrBadConn
if ret.conn == nil
return nil, ret.err
return ret.conn, ret.err
这里需要注意,在ctx超时分支中:
- 首先把自己从阻塞队列中删除
- 再检查一下req中是否有连接,如果有,将连接放回连接池
奇怪的是为啥把自己删除后,req
还可能收到连接呢?
因为put
连接时,会先拿出一个阻塞连接的req,如果这里删除req在put拿出req:
- 之前:那没问题,put不可能再放该req发送连接
- 之后:那有可能put往该req发送了连接,因此需要再检查下req中是否有连接,如果有归还
也解释了为啥阻塞队列要用map
:
- 用于快速找到自己的req,并删除
最后看看put:
func (db *DB) putConnDBLocked(dc *driverConn, err error) bool
if db.closed
return false
if db.maxOpen > 0 && db.numOpen > db.maxOpen
return false
// 有阻塞的请求,转移连接
if c := len(db.connRequests); c > 0
var req chan connRequest
var reqKey uint64
for reqKey, req = range db.connRequests
break
delete(db.connRequests, reqKey) // Remove from pending requests.
if err == nil
dc.inUse = true
req <- connRequest
conn: dc,
err: err,
return true
// 判断能否放回空闲队列
else if err == nil && !db.closed
if db.maxIdleConnsLocked() > len(db.freeConn)
db.freeConn = append(db.freeConn, dc)
db.startCleanerLocked()
return true
db.maxIdleClosed++
return false
深入浅出Golang的协程池设计
使用Go语言实现并发的协程调度池阉割版,本文主要介绍协程池的基本设计思路,目的为深入浅出快速了解协程池工作原理,与真实的企业协程池还有很大差距,本文仅供学习参考。
一、何为并发,Go又是如何实现并发?
并行的好处:
同一时刻可以处理多个事务
更加节省时间,效率更高
具有并行处理能力的程序我们称之为“并发程序”
并发程序的处理能力优势体现在哪里?
二、Go语言如何实现并发?
1package main
2
3import "fmt"
4import "time"
5
6func go_worker(name string) {
7 for i := 0; i < 10; i++ {
8 fmt.Println("我是一个go协程, 我的名字是 ", name, "----")
9 time.Sleep(1 * time.Second)
10 }
11 fmt.Println(name, " 执行完毕!")
12}
13
14func main() {
15 go go_worker("小黑") //创建一个goroutine协程去执行 go_worker("小黑")
16 go go_worker("小白") //创建一个goroutine协程去执行 go_worker("小白")
17
18 //防止main函数执行完毕,程序退出
19 for {
20 time.Sleep(1 * time.Second)
21 }
22}
那么多个goroutine之前如何通信呢?
1package main
2
3import "fmt"
4
5func worker(c chan int) {
6 //从channel中去读数据
7 num := <-c
8 fmt.Println("foo recv channel ", num)
9}
10
11func main() {
12 //创建一个channel
13 c := make(chan int)
14
15 go worker(c)
16
17 //main协程 向一个channel中写数据
18 c <- 1
19
20 fmt.Println("send 1 -> channel over")
21}
三、协程池的设计思路
为什么需要协程池?
虽然go语言在调度Goroutine已经优化的非常完成,并且Goroutine作为轻量级执行流程,也不需要CPU调度器的切换,我们一般在使用的时候,如果想处理一个分支流程,直接go
一下即可。
但是,如果无休止的开辟Goroutine依然会出现高频率的调度Groutine,那么依然会浪费很多上下文切换的资源,导致做无用功。所以设计一个Goroutine池限制Goroutine的开辟个数在大型并发场景还是必要的。
四、快速实现并发协程通讯池
1package main
2
3import (
4 "fmt"
5 "time"
6)
7
8/* 有关Task任务相关定义及操作 */
9//定义任务Task类型,每一个任务Task都可以抽象成一个函数
10type Task struct {
11 f func() error //一个无参的函数类型
12}
13
14//通过NewTask来创建一个Task
15func NewTask(f func() error) *Task {
16 t := Task{
17 f: f,
18 }
19
20 return &t
21}
22
23//执行Task任务的方法
24func (t *Task) Execute() {
25 t.f() //调用任务所绑定的函数
26}
27
28/* 有关协程池的定义及操作 */
29//定义池类型
30type Pool struct {
31 //对外接收Task的入口
32 EntryChannel chan *Task
33
34 //协程池最大worker数量,限定Goroutine的个数
35 worker_num int
36
37 //协程池内部的任务就绪队列
38 JobsChannel chan *Task
39}
40
41//创建一个协程池
42func NewPool(cap int) *Pool {
43 p := Pool{
44 EntryChannel: make(chan *Task),
45 worker_num: cap,
46 JobsChannel: make(chan *Task),
47 }
48
49 return &p
50}
51
52//协程池创建一个worker并且开始工作
53func (p *Pool) worker(work_ID int) {
54 //worker不断的从JobsChannel内部任务队列中拿任务
55 for task := range p.JobsChannel {
56 //如果拿到任务,则执行task任务
57 task.Execute()
58 fmt.Println("worker ID ", work_ID, " 执行完毕任务")
59 }
60}
61
62//让协程池Pool开始工作
63func (p *Pool) Run() {
64 //1,首先根据协程池的worker数量限定,开启固定数量的Worker,
65 // 每一个Worker用一个Goroutine承载
66 for i := 0; i < p.worker_num; i++ {
67 go p.worker(i)
68 }
69
70 //2, 从EntryChannel协程池入口取外界传递过来的任务
71 // 并且将任务送进JobsChannel中
72 for task := range p.EntryChannel {
73 p.JobsChannel <- task
74 }
75
76 //3, 执行完毕需要关闭JobsChannel
77 close(p.JobsChannel)
78
79 //4, 执行完毕需要关闭EntryChannel
80 close(p.EntryChannel)
81}
82
83//主函数
84func main() {
85 //创建一个Task
86 t := NewTask(func() error {
87 fmt.Println(time.Now())
88 return nil
89 })
90
91 //创建一个协程池,最大开启3个协程worker
92 p := NewPool(3)
93
94 //开一个协程 不断的向 Pool 输送打印一条时间的task任务
95 go func() {
96 for {
97 p.EntryChannel <- t
98 }
99 }()
100
101 //启动协程池p
102 p.Run()
103
104}
版权申明:内容来源网络,版权归原创者所有。除非无法确认,我们都会标明作者及出处,如有侵权烦请告知,我们会立即删除并表示歉意。谢谢。
Golang语言社区
ID:Golangweb
www.GolangWeb.com
游戏服务器架构丨分布式技术丨大数据丨游戏算法学习
以上是关于Go 连接池的设计与实现的主要内容,如果未能解决你的问题,请参考以下文章