Golang MongoDB 连接池缺陷及修复
Posted 佳佳的笔记本
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Golang MongoDB 连接池缺陷及修复相关的知识,希望对你有一定的参考价值。
1 背景
近期线上出现连不上MongoDB的情况,经排查发现我们现有集群的连接数太多,导致MongoDB proxy被打挂,proxy无法自动恢复,需要DBA手动操作。现有golang框架使用mgo驱动库,并且限制了连接池的大小,为什么还出现连接数超限呢?该怎么办?
2 方案
我们的目标是在高并发下保持系统可用性,保证proxy不被打挂,保证服务基本可用,针对超出限制的请求进行降级,走兜底策略。经过内部讨论,提出如下三种方案。
2.1 应用层限制并发数
既然mgo驱动库的连接池无法控制并发数,那就需要在应用层控制并发数。现有MongoDB proxy的最大连接数为4000左右,我们的集群总共有80台机器,每台机器最大连接数为50,而业务中涉及多个接口,无法直接限制goroutine数量,只能再设计一层MongoDB连接池控制并发数,这种方法实现成本较高。
2.2 Redis替换MongoDB
既然MongoDB这么脆弱,那就用高性能redis吧,因数据量较大,需要单独申请redis集群,重新灌入数据,同时应用层所有使用MongoDB相关的逻辑都需要检查和修改,这种方法开发周期较长。
2.3 连接池限制并发数
回到原点,从源码角度分析为什么无法限制连接数,然后修复问题,这种方法最直接高效,值得一试。
3 源码分析
首先看一下mgo设置连接池大小的函数,函数注释说不支持控制并发数,需要上层应用控制并发数,但是为什么不能控制呢?
// SetPoolLimit sets the maximum number of sockets in use in a single server
// before this session will block waiting for a socket to be available.
// The default limit is 4096.
//
// This limit must be set to cover more than any expected workload of the
// application. It is a bad practice and an unsupported use case to use the
// database driver to define the concurrency limit of an application. Prevent
// such concurrency "at the door" instead, by properly restricting the amount
// of used resources and number of goroutines before they are created.
func (s *Session) SetPoolLimit(limit int) {
s.m.Lock()
s.poolLimit = limit
s.m.Unlock()
}
3.1 数据结构
mgo驱动给应用层提供session这个数据结构标识MongoDB连接信息,如下所示:
// Session represents a communication session with the database.
//
// All Session methods are concurrency-safe and may be called from multiple
// goroutines. In all session modes but Eventual, using the session from
// multiple goroutines will cause them to share the same underlying socket.
// See the documentation on Session.SetMode for more details.
type Session struct {
m sync.RWMutex
cluster_ *mongoCluster
slaveSocket *mongoSocket
masterSocket *mongoSocket
slaveOk bool
consistency Mode
queryConfig query
safeOp *queryOp
syncTimeout time.Duration
sockTimeout time.Duration
defaultdb string
sourcedb string
dialCred *Credential
creds []Credential
poolLimit int
bypassValidation bool
}
其中cluster_ 字段代表MongoDB的集群信息,其数据结构如下所示:
// Mongo cluster encapsulation.
//
// A cluster enables the communication with one or more servers participating
// in a mongo cluster. This works with individual servers, a replica set,
// a replica pair, one or multiple mongos routers, etc.
type mongoCluster struct {
sync.RWMutex
serverSynced sync.Cond
userSeeds []string
dynaSeeds []string
servers mongoServers
masters mongoServers
references int
syncing bool
direct bool
failFast bool
syncCount uint
setName string
cachedIndex map[string]bool
sync chan bool
dial dialer
}
其中servers和masters中代表含数据节点的信息,其数据结构如下所示:
// Mongo server encapsulation.
type mongoServer struct {
sync.RWMutex
Addr string
ResolvedAddr string
tcpaddr *net.TCPAddr
unusedSockets []*mongoSocket
liveSockets []*mongoSocket
closed bool
abended bool
sync chan bool
dial dialer
pingValue time.Duration
pingIndex int
pingCount uint32
pingWindow [6]time.Duration
info *mongoServerInfo
}
unusedSockets代表未使用的socket连接,liveSockets代表使用中的socket连接。
3.2 关键函数
一直跟随代码到最终获取socket连接的函数
1) 查询数据 collection.Find(bson.M{"a": 1}).One(&result)
2) 从 session 获取有效连接 session.acquireSocket
3) 从 cluster 获取有效连接 s.cluster().AcquireSocket
4) 从 server 获取有效连接 server.AcquireSocket
// AcquireSocket returns a socket for communicating with the server.
// This will attempt to reuse an old connection, if one is available. Otherwise,
// it will establish a new one. The returned socket is owned by the call site,
// and will return to the cache when the socket has its Release method called
// the same number of times as AcquireSocket + Acquire were called for it.
// If the poolLimit argument is greater than zero and the number of sockets in
// use in this server is greater than the provided limit, errPoolLimit is
// returned.
func (server *mongoServer) AcquireSocket(poolLimit int, timeout time.Duration) (socket *mongoSocket, abended bool, err error) {
for {
server.Lock()
abended = server.abended
if server.closed {
server.Unlock()
return nil, abended, errServerClosed
}
n := len(server.unusedSockets)
if poolLimit > 0 && len(server.liveSockets)-n >= poolLimit {
server.Unlock()
return nil, false, errPoolLimit
}
if n > 0 {
socket = server.unusedSockets[n-1]
server.unusedSockets[n-1] = nil // Help GC.
server.unusedSockets = server.unusedSockets[:n-1]
info := server.info
server.Unlock()
err = socket.InitialAcquire(info, timeout)
if err != nil {
continue
}
} else {
server.Unlock()
socket, err = server.Connect(timeout)
if err == nil {
server.Lock()
// We've waited for the Connect, see if we got
// closed in the meantime
if server.closed {
server.Unlock()
socket.Release()
socket.Close()
return nil, abended, errServerClosed
}
server.liveSockets = append(server.liveSockets, socket)
server.Unlock()
}
}
return
}
panic("unreachable")
}
终于到底了,这个函数的实现思路如下:
1) 判断连接池是否超限,如果超限则直接返回错误
2) 如果有空闲连接则直接使用
3) 如果没有空闲连接则连接MongoDB
4) 连接使用完后会放回到空闲连接
大家再仔细看一下这个函数的实现是否有问题?思考一分钟,然后看一下这个时序图
1) 假设当前 unusedSockets 的大小为0,liveSockets 的大小为1,连接池大小为2,一大波请求正在袭来
2) 第一个请求拿到锁了,发现连接数没有超限,又没有可用的连接,那就释放锁,然后自己连接吧,连接中...
3) 第二个请求拿到锁,发现连接数没有超限,又没有可用的连接,那就释放锁,然后自己也连接吧,连接中...
4) 当这2个请求都连接成功后,当前 liveSockets 的大小变成3,超过连接数限制了,如果此时并发数是100,1000,那连接池超限会更严重
3.3 修复方法
判断连接数是否超限的逻辑不安全,并没有考虑这部分连接中的数量,因此高并发情况下,连接数会超限,对此,我们引入连接中的数量对源码进行改进,如下所示:
// Mongo server encapsulation.
type mongoServer struct {
sync.RWMutex
Addr string
ResolvedAddr string
tcpaddr *net.TCPAddr
unusedSockets []*mongoSocket
liveSockets []*mongoSocket
connectingNum int32 // 连接中的数量
closed bool
abended bool
sync chan bool
dial dialer
pingValue time.Duration
pingIndex int
pingCount uint32
pingWindow [6]time.Duration
info *mongoServerInfo
}
// AcquireSocket returns a socket for communicating with the server.
// This will attempt to reuse an old connection, if one is available. Otherwise,
// it will establish a new one. The returned socket is owned by the call site,
// and will return to the cache when the socket has its Release method called
// the same number of times as AcquireSocket + Acquire were called for it.
// If the poolLimit argument is greater than zero and the number of sockets in
// use in this server is greater than the provided limit, errPoolLimit is
// returned.
func (server *mongoServer) AcquireSocket(poolLimit int, timeout time.Duration) (socket *mongoSocket, abended bool, err error) {
for {
server.Lock()
abended = server.abended
if server.closed {
server.Unlock()
return nil, abended, errServerClosed
}
n := len(server.unusedSockets)
if n > 0 {
socket = server.unusedSockets[n-1]
server.unusedSockets[n-1] = nil // Help GC.
server.unusedSockets = server.unusedSockets[:n-1]
info := server.info
server.Unlock()
err = socket.InitialAcquire(info, timeout)
if err != nil {
continue
}
} else {
// 引入连接中的数量
if poolLimit > 0 && int(server.connectingNum)+len(server.liveSockets) >= poolLimit {
server.Unlock()
return nil, false, errPoolLimit
}
server.connectingNum++
server.Unlock()
defer atomic.AddInt32(&server.connectingNum, -1)
socket, err = server.Connect(timeout)
if err == nil {
server.Lock()
// We've waited for the Connect, see if we got
// closed in the meantime
if server.closed {
server.Unlock()
socket.Release()
socket.Close()
return nil, abended, errServerClosed
}
server.liveSockets = append(server.liveSockets, socket)
server.Unlock()
}
}
return
}
panic("unreachable")
}
3.4 测试Demo
下面给出一段测试代码,将连接池大小设置为10,并发数设置为100,最后看MongoDB的连接数是否能限制住,感兴趣的同学可以试试。
import (
"fmt"
"sync"
"time"
mgo "gopkg.in/mgo.v2"
"gopkg.in/mgo.v2/bson"
)
var globalSession *mgo.Session
func main() {
var err error
globalSession, err = mgo.Dial("127.0.0.1:27000")
globalSession.SetSocketTimeout(1 * time.Second)
globalSession.SetPoolLimit(10)
if err != nil {
fmt.Println("connect db fail")
return
}
var wg sync.WaitGroup
concurrent := 100
wg.Add(concurrent)
for i := 0; i < concurrent; i++ {
go func(index int) {
defer wg.Done()
testMongo(index)
}(i)
}
wg.Wait()
}
func testMongo(index int) {
session := globalSession.Clone()
defer session.Close()
var result interface{}
err := session.DB("local").C("startup_log").Find(bson.M{"hostname": "www.baidu.com"}).One(&result)
if err != nil {
fmt.Println(index, "query fail")
} else {
fmt.Println(index, "query ok")
}
}
4 总结
本文从源码角度分析golang MongoDB驱动库连接数限制不安全的问题,并提出一种修复方法。因本人水平有限,难免有错误或者考虑不周的地方,请多多指教。
以上是关于Golang MongoDB 连接池缺陷及修复的主要内容,如果未能解决你的问题,请参考以下文章