Golang MongoDB 连接池缺陷及修复

Posted 佳佳的笔记本

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Golang MongoDB 连接池缺陷及修复相关的知识,希望对你有一定的参考价值。

1 背景

近期线上出现连不上MongoDB的情况,经排查发现我们现有集群的连接数太多,导致MongoDB proxy被打挂,proxy无法自动恢复,需要DBA手动操作。现有golang框架使用mgo驱动库,并且限制了连接池的大小,为什么还出现连接数超限呢?该怎么办?

2 方案

我们的目标是在高并发下保持系统可用性,保证proxy不被打挂,保证服务基本可用,针对超出限制的请求进行降级,走兜底策略。经过内部讨论,提出如下三种方案。

2.1 应用层限制并发数

既然mgo驱动库的连接池无法控制并发数,那就需要在应用层控制并发数。现有MongoDB proxy的最大连接数为4000左右,我们的集群总共有80台机器,每台机器最大连接数为50,而业务中涉及多个接口,无法直接限制goroutine数量,只能再设计一层MongoDB连接池控制并发数,这种方法实现成本较高。

2.2 Redis替换MongoDB

既然MongoDB这么脆弱,那就用高性能redis吧,因数据量较大,需要单独申请redis集群,重新灌入数据,同时应用层所有使用MongoDB相关的逻辑都需要检查和修改,这种方法开发周期较长。

2.3 连接池限制并发数

回到原点,从源码角度分析为什么无法限制连接数,然后修复问题,这种方法最直接高效,值得一试。

3 源码分析

首先看一下mgo设置连接池大小的函数,函数注释说不支持控制并发数,需要上层应用控制并发数,但是为什么不能控制呢?

// SetPoolLimit sets the maximum number of sockets in use in a single server
// before this session will block waiting for a socket to be available.
// The default limit is 4096.
//
// This limit must be set to cover more than any expected workload of the
// application. It is a bad practice and an unsupported use case to use the
// database driver to define the concurrency limit of an application. Prevent
// such concurrency "at the door" instead, by properly restricting the amount
// of used resources and number of goroutines before they are created.
func (s *Session) SetPoolLimit(limit int) {
   s.m.Lock()
   s.poolLimit = limit
   s.m.Unlock()
}

3.1 数据结构

mgo驱动给应用层提供session这个数据结构标识MongoDB连接信息,如下所示:

// Session represents a communication session with the database.
//
// All Session methods are concurrency-safe and may be called from multiple
// goroutines. In all session modes but Eventual, using the session from
// multiple goroutines will cause them to share the same underlying socket.
// See the documentation on Session.SetMode for more details.
type Session struct {
   m                sync.RWMutex
   cluster_         *mongoCluster
   slaveSocket      *mongoSocket
   masterSocket     *mongoSocket
   slaveOk          bool
   consistency      Mode
   queryConfig      query
   safeOp           *queryOp
   syncTimeout      time.Duration
   sockTimeout      time.Duration
   defaultdb        string
   sourcedb         string
   dialCred         *Credential
   creds            []Credential
   poolLimit        int
   bypassValidation bool
}

其中cluster_ 字段代表MongoDB的集群信息,其数据结构如下所示:

// Mongo cluster encapsulation.
//
// A cluster enables the communication with one or more servers participating
// in a mongo cluster.  This works with individual servers, a replica set,
// a replica pair, one or multiple mongos routers, etc.
type mongoCluster struct {
   sync.RWMutex
   serverSynced sync.Cond
   userSeeds    []string
   dynaSeeds    []string
   servers      mongoServers
   masters      mongoServers
   references   int
   syncing      bool
   direct       bool
   failFast     bool
   syncCount    uint
   setName      string
   cachedIndex  map[string]bool
   sync         chan bool
   dial         dialer
}

其中servers和masters中代表含数据节点的信息,其数据结构如下所示:

// Mongo server encapsulation.
type mongoServer struct {
   sync.RWMutex
   Addr          string
   ResolvedAddr  string
   tcpaddr       *net.TCPAddr
   unusedSockets []*mongoSocket
   liveSockets   []*mongoSocket
   closed        bool
   abended       bool
   sync          chan bool
   dial          dialer
   pingValue     time.Duration
   pingIndex     int
   pingCount     uint32
   pingWindow    [6]time.Duration
   info          *mongoServerInfo
}

unusedSockets代表未使用的socket连接,liveSockets代表使用中的socket连接。

3.2 关键函数

一直跟随代码到最终获取socket连接的函数

1) 查询数据 collection.Find(bson.M{"a": 1}).One(&result)

2) 从 session 获取有效连接 session.acquireSocket

3) 从 cluster 获取有效连接 s.cluster().AcquireSocket

4) 从 server 获取有效连接 server.AcquireSocket

// AcquireSocket returns a socket for communicating with the server.
// This will attempt to reuse an old connection, if one is available. Otherwise,
// it will establish a new one. The returned socket is owned by the call site,
// and will return to the cache when the socket has its Release method called
// the same number of times as AcquireSocket + Acquire were called for it.
// If the poolLimit argument is greater than zero and the number of sockets in
// use in this server is greater than the provided limit, errPoolLimit is
// returned.
func (server *mongoServer) AcquireSocket(poolLimit int, timeout time.Duration) (socket *mongoSocket, abended bool, err error) {
   for {
       server.Lock()
       abended = server.abended
       if server.closed {
           server.Unlock()
           return nil, abended, errServerClosed
       }
       n := len(server.unusedSockets)
       if poolLimit > 0 && len(server.liveSockets)-n >= poolLimit {
           server.Unlock()
           return nil, false, errPoolLimit
       }
       if n > 0 {
           socket = server.unusedSockets[n-1]
           server.unusedSockets[n-1] = nil // Help GC.
           server.unusedSockets = server.unusedSockets[:n-1]
           info := server.info
           server.Unlock()
           err = socket.InitialAcquire(info, timeout)
           if err != nil {
               continue
           }
       } else {
           server.Unlock()
           socket, err = server.Connect(timeout)
           if err == nil {
               server.Lock()
               // We've waited for the Connect, see if we got
               // closed in the meantime
               if server.closed {
                   server.Unlock()
                   socket.Release()
                   socket.Close()
                   return nil, abended, errServerClosed
               }
               server.liveSockets = append(server.liveSockets, socket)
               server.Unlock()
           }
       }
       return
   }
   panic("unreachable")
}

终于到底了,这个函数的实现思路如下:

1) 判断连接池是否超限,如果超限则直接返回错误

2) 如果有空闲连接则直接使用

3) 如果没有空闲连接则连接MongoDB

4) 连接使用完后会放回到空闲连接

大家再仔细看一下这个函数的实现是否有问题?思考一分钟,然后看一下这个时序图

1) 假设当前 unusedSockets 的大小为0,liveSockets 的大小为1,连接池大小为2,一大波请求正在袭来

2) 第一个请求拿到锁了,发现连接数没有超限,又没有可用的连接,那就释放锁,然后自己连接吧,连接中...

3) 第二个请求拿到锁,发现连接数没有超限,又没有可用的连接,那就释放锁,然后自己也连接吧,连接中...

4) 当这2个请求都连接成功后,当前 liveSockets 的大小变成3,超过连接数限制了,如果此时并发数是100,1000,那连接池超限会更严重

3.3 修复方法

判断连接数是否超限的逻辑不安全,并没有考虑这部分连接中的数量,因此高并发情况下,连接数会超限,对此,我们引入连接中的数量对源码进行改进,如下所示:

// Mongo server encapsulation.
type mongoServer struct {
   sync.RWMutex
   Addr          string
   ResolvedAddr  string
   tcpaddr       *net.TCPAddr
   unusedSockets []*mongoSocket
   liveSockets   []*mongoSocket
   connectingNum int32  // 连接中的数量
   closed        bool
   abended       bool
   sync          chan bool
   dial          dialer
   pingValue     time.Duration
   pingIndex     int
   pingCount     uint32
   pingWindow    [6]time.Duration
   info          *mongoServerInfo
}

// AcquireSocket returns a socket for communicating with the server.
// This will attempt to reuse an old connection, if one is available. Otherwise,
// it will establish a new one. The returned socket is owned by the call site,
// and will return to the cache when the socket has its Release method called
// the same number of times as AcquireSocket + Acquire were called for it.
// If the poolLimit argument is greater than zero and the number of sockets in
// use in this server is greater than the provided limit, errPoolLimit is
// returned.
func (server *mongoServer) AcquireSocket(poolLimit int, timeout time.Duration) (socket *mongoSocket, abended bool, err error) {
   for {
       server.Lock()
       abended = server.abended
       if server.closed {
           server.Unlock()
           return nil, abended, errServerClosed
       }
       n := len(server.unusedSockets)
       if n > 0 {
           socket = server.unusedSockets[n-1]
           server.unusedSockets[n-1] = nil // Help GC.
           server.unusedSockets = server.unusedSockets[:n-1]
           info := server.info
           server.Unlock()
           err = socket.InitialAcquire(info, timeout)
           if err != nil {
               continue
           }
       } else {
           // 引入连接中的数量
           if poolLimit > 0 && int(server.connectingNum)+len(server.liveSockets) >= poolLimit {
               server.Unlock()
               return nil, false, errPoolLimit
           }
           server.connectingNum++
           server.Unlock()
           defer atomic.AddInt32(&server.connectingNum, -1)
           socket, err = server.Connect(timeout)
           if err == nil {
               server.Lock()
               // We've waited for the Connect, see if we got
               // closed in the meantime
               if server.closed {
                   server.Unlock()
                   socket.Release()
                   socket.Close()
                   return nil, abended, errServerClosed
               }
               server.liveSockets = append(server.liveSockets, socket)
               server.Unlock()
           }
       }
       return
   }
   panic("unreachable")
}

3.4 测试Demo

下面给出一段测试代码,将连接池大小设置为10,并发数设置为100,最后看MongoDB的连接数是否能限制住,感兴趣的同学可以试试。

import (
   "fmt"
   "sync"
   "time"
   mgo "gopkg.in/mgo.v2"
   "gopkg.in/mgo.v2/bson"
)

var globalSession *mgo.Session

func main() {
   var err error
   globalSession, err = mgo.Dial("127.0.0.1:27000")
   globalSession.SetSocketTimeout(1 * time.Second)
   globalSession.SetPoolLimit(10)
   if err != nil {
       fmt.Println("connect db fail")
       return
   }
   var wg sync.WaitGroup
   concurrent := 100
   wg.Add(concurrent)
   for i := 0; i < concurrent; i++ {
       go func(index int) {
           defer wg.Done()
           testMongo(index)
       }(i)
   }
   wg.Wait()
}

func testMongo(index int) {
   session := globalSession.Clone()
   defer session.Close()
   var result interface{}
   err := session.DB("local").C("startup_log").Find(bson.M{"hostname": "www.baidu.com"}).One(&result)
   if err != nil {
       fmt.Println(index, "query fail")
   } else {
       fmt.Println(index, "query ok")
   }
}

4 总结

本文从源码角度分析golang MongoDB驱动库连接数限制不安全的问题,并提出一种修复方法。因本人水平有限,难免有错误或者考虑不周的地方,请多多指教。


以上是关于Golang MongoDB 连接池缺陷及修复的主要内容,如果未能解决你的问题,请参考以下文章

MongoDB Golang驱动mgo的连接池使用问题

golang 如何连接redis --- 2022-04-03

MongoDB连接池理解及测试 NodeJS

代码缺陷解读——不受控制的资源消耗及表达式永假永真

golang redis事务 --- 2022-04-03

node.js如何配置mongodb连接池?