连接池那点事

Posted Python那些事

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了连接池那点事相关的知识,希望对你有一定的参考价值。

连接池的作用就是为了提高性能。

 

连接池的作用:连接池是将已经创建好的连接保存在池中,当有请求来时,直接使用已经创建好的连接对Server端进行访问。这样省略了创建连接和销毁连接的过程。这样性能上得到了提高。

 

基本原理是这样的: 
(1)建立连接池对象(服务器启动)。 
(2)按照事先指定的参数创建初始数量的连接(即:空闲连接数)。 
(3)对于一个访问请求,直接从连接池中得到一个连接。如果连接池对象中没有空闲的连接,且连接数没有达到最大(即:最大活跃连接数),创建一个新的连接;如果达到最大,则设定一定的超时时间,来获取连接。 
(4)运用连接访问Server端服务。 
(5)释放连接(此时的释放连接,并非真正关闭,而是将其放入空闲队列中。如实际空闲连接数大于初始空闲连接数则释放连接)。 

(6)释放连接池对象(服务器停止、维护期间,释放连接池对象,并释放所有连接)。


说的通俗点,可以把连接池理解为一个一个的管道,在管道空闲时,便可以取出使用;同时,也可以铺设新的管道(当然不能超过最大连接数的限制)。使用完之后,管道就变为空闲了。


通常比较常用的连接池是数据库连接池,HTTP Client连接池,小编也自己编写过连接池,如Thrift连接池及插入Rabbitmq队列的连接池。


小编设计的一个连接池如下:



以RabbitMQ队列插入消息的连接池为例,Python代码如下:


# coding:utf-8
import logging

import threading
import Queue
from kombu import Connection
import time


class InsertQueue():
   def __init__(self, host=None, port=None, virtual_host=None, heartbeat_interval=3, name=None, password=None,
               
logger=None, maxIdle=10, maxActive=50, timeout=30, disable_time=20):
       """
       
:param str host: Hostname or IP Address to connect to
       
:param int port: TCP port to connect to
       
:param str virtual_host: RabbitMQ virtual host to use
       
:param int heartbeat_interval:  How often to send heartbeats
       
:param str name: auth credentials name

       :param str password: auth credentials password
       """
       
self.logger = logging if logger is None else logger
       self.host = host
       self.port = port
       self.virtual_host = virtual_host
       self.heartbeat_interval = heartbeat_interval
       self.name = name
       self.password = password
       self.mutex = threading.RLock()
       self.maxIdle = maxIdle
       self.maxActive = maxActive
       self.available = self.maxActive
       self.timeout = timeout
       self._queue = Queue.Queue(maxsize=self.maxIdle)
       self.disable_time = disable_time



def get_new_connection_pipe(self):
       """
       
产生新的队列连接
       
:return:
       """

       
with self.mutex:
           if self.available <= 0:
               raise GetConnectionException
           self.available -= 1

       try:

           conn = Connection(hostname=self.host,
                             
port=self.port,
                             
virtual_host=self.virtual_host,
                             
heartbeat=self.heartbeat_interval,
                             
userid=self.name,
                             
password=self.password)
           producer = conn.Producer()

           return ConnectionPipe(conn, producer)
       except:
           with self.mutex:
               self.available += 1
           
raise GetConnectionException


def get_connection_pipe(self):
       """
       
获取连接
       
:return:
       """

       try:
           connection_pipe = self._queue.get(False)
       except Queue.Empty:
           try:
               connection_pipe = self.get_new_connection_pipe()
           except GetConnectionException:
               timeout = self.timeout
               try:
                   connection_pipe = self._queue.get(timeout=timeout)
               except Queue.Empty:
                   try:
                       connection_pipe = self.get_new_connection_pipe()
                   except GetConnectionException:
                       logging.error("Too much connections, Get Connection Timeout!")
       if (time.time() - connection_pipe.use_time) > self.disable_time:
           self.close(connection_pipe)
           return self.get_connection_pipe()
       return connection_pipe


def close(self, connection_pipe):
       """
       close the connection and the correlative channel
       
:param connection_pipe:
       
:return:
       """
       
with self.mutex:
           self.available += 1
           
connection_pipe.close()
       return


   def insert_message(self, exchange=None, body=None, routing_key='', mandatory=True):
       """
       insert message to queue
       
:param str exchange: exchange name
       
:param str body: message
       
:param str routing_key: routing key
       
:param bool mandatory: is confirm: True means confirm, False means not confirm
       
:return:
       """


       put_into_queue_flag = True
       
insert_result = False
       
connection_pipe = None
       
try:

           connection_pipe = self.get_connection_pipe()
           producer = connection_pipe.channel
           use_time = time.time()
           producer.publish(exchange=exchange,
                                           
body=body,
                                           
delivery_mode=2,
                                           
routing_key=routing_key,
                                           
mandatory=mandatory
                                            )
           insert_result = True

       
except Exception:
           insert_result = False
           
put_into_queue_flag = False
       
finally:

           if put_into_queue_flag is True:
               try:
                   connection_pipe.use_time = use_time
                   self._queue.put_nowait(connection_pipe)
               except Queue.Full:
                   self.close(connection_pipe)
           else:
               if connection_pipe is not None:
                   self.close(connection_pipe)

       return insert_result



class ConnectionPipe(object):
   """
   connection
channel对象的封装
   """

   
def __init__(self, connection, channel):
       self.connection = connection
       self.channel = channel
       self.use_time = time.time()

   def close(self):
       try:
           self.connection.close()
       except Exception as ex:
           pass



class GetConnectionException():
   """
   
获取连接异常
   """
   
pass


你学到了连接池的精髓了吗?

以上是关于连接池那点事的主要内容,如果未能解决你的问题,请参考以下文章

HTML那点事

window 7 更新那点事!

代码提交那点事

require 那点事

ThreadLocal那点事

null 和{}的那点事