Python mysql(使用pymysql)自动重新连接
Posted
技术标签:
【中文标题】Python mysql(使用pymysql)自动重新连接【英文标题】:Python mysql (using pymysql) auto reconnect 【发布时间】:2014-05-07 03:30:24 【问题描述】:我不确定这是否可能,但我正在寻找一种在连接丢失时重新连接到 mysql 数据库的方法。所有连接都保存在 gevent 队列中,但我认为这无关紧要。我敢肯定,如果我花一些时间,我可以想出一种重新连接到数据库的方法。但是,我浏览了 pymysql 代码,发现 Connection 类中有一个“ping”方法,我不确定如何使用。
该方法看起来会第一次重新连接,但之后又将重新连接标志切换为 False?我可以使用这种方法吗,或者如果连接丢失,是否有不同的方法来建立连接?即使不是 pymysql,人们如何解决数据库服务器宕机,不得不重新建立与 mysql 服务器的连接?
def ping(self, reconnect=True):
''' Check if the server is alive '''
if self.socket is None:
if reconnect:
self._connect()
reconnect = False
else:
raise Error("Already closed")
try:
self._execute_command(COM_PING, "")
return self._read_ok_packet()
except Exception:
if reconnect:
self._connect()
return self.ping(False)
else:
raise
【问题讨论】:
不确定这是否有用,但请查看 Twisted gist.github.com/powdahound/174056 的 ReconnectingConnectionPool 配方 我已经实现了 - gist.github.com/opensourcegeek/9822127 在运行查询之前 Ping 被认为是一种浪费资源且不可靠的反模式:percona.com/blog/2010/05/05/… 我在 PyMySQL 文档中找到了使用“连接”的“ping”方法的解决方案,并举了一个例子here 【参考方案1】:终于找到了一个可行的解决方案,可能会对某人有所帮助。
from gevent import monkey
monkey.patch_socket()
import logging
import gevent
from gevent.queue import Queue
import pymysql as db
logging.basicConfig(level=logging.DEBUG)
LOGGER = logging.getLogger("connection_pool")
class ConnectionPool:
def __init__(self, db_config, time_to_sleep=30, test_run=False):
self.username = db_config.get('user')
self.password = db_config.get('password')
self.host = db_config.get('host')
self.port = int(db_config.get('port'))
self.max_pool_size = 20
self.test_run = test_run
self.pool = None
self.time_to_sleep = time_to_sleep
self._initialize_pool()
def get_initialized_connection_pool(self):
return self.pool
def _initialize_pool(self):
self.pool = Queue(maxsize=self.max_pool_size)
current_pool_size = self.pool.qsize()
if current_pool_size < self.max_pool_size: # this is a redundant check, can be removed
for _ in xrange(0, self.max_pool_size - current_pool_size):
try:
conn = db.connect(host=self.host,
user=self.username,
passwd=self.password,
port=self.port)
self.pool.put_nowait(conn)
except db.OperationalError, e:
LOGGER.error("Cannot initialize connection pool - retrying in seconds".format(self.time_to_sleep))
LOGGER.exception(e)
break
self._check_for_connection_loss()
def _re_initialize_pool(self):
gevent.sleep(self.time_to_sleep)
self._initialize_pool()
def _check_for_connection_loss(self):
while True:
conn = None
if self.pool.qsize() > 0:
conn = self.pool.get()
if not self._ping(conn):
if self.test_run:
self.port = 3306
self._re_initialize_pool()
else:
self.pool.put_nowait(conn)
if self.test_run:
break
gevent.sleep(self.time_to_sleep)
def _ping(self, conn):
try:
if conn is None:
conn = db.connect(host=self.host,
user=self.username,
passwd=self.password,
port=self.port)
cursor = conn.cursor()
cursor.execute('select 1;')
LOGGER.debug(cursor.fetchall())
return True
except db.OperationalError, e:
LOGGER.warn('Cannot connect to mysql - retrying in seconds'.format(self.time_to_sleep))
LOGGER.exception(e)
return False
# test (pytest compatible) -------------------------------------------------------------------------------------------
import logging
from src.py.ConnectionPool import ConnectionPool
logging.basicConfig(level=logging.DEBUG)
LOGGER = logging.getLogger("test_connection_pool")
def test_get_initialized_connection_pool():
config =
'user': 'root',
'password': '',
'host': '127.0.0.1',
'port': 3305
conn_pool = ConnectionPool(config, time_to_sleep=5, test_run=True)
pool = conn_pool.get_initialized_connection_pool()
# when in test run the port will be switched back to 3306
# so the queue size should be 20 - will be nice to work
# around this rather than test_run hack
assert pool.qsize() == 20
【讨论】:
【参考方案2】:好吧,我在我的应用程序中遇到了同样的问题,我在PyMySQL documentation 上找到了一个方法,它可以 ping 到服务器并检查连接是否已关闭,如果已关闭,则它会再次重新连接。
from pymysql import connect
from pymysql.cursors import DictCursor
# create the connection
connection = connect(host='host', port='port', user='user',
password='password', db='db',
cursorclass=DictCursor)
# get the cursor
cursor = connection.cursor()
# if the connection was lost, then it reconnects
connection.ping(reconnect=True)
# execute the query
cursor.execute(query)
希望对你有帮助。
【讨论】:
当然,这应该是首选的解决方案。 值得注意的是,ping 是一个便利功能,reconnect=True 会执行 pymysql.connection.connect(),这意味着应该重新配置任何连接设置(模式、自动提交、tx 隔离)。 【参考方案3】:最简单的方法是在发送查询之前检查连接。
您可以通过创建一个包含两个方法的小类来做到这一点:connect
和 query
:
import pymysql
import pymysql.cursors
class DB:
def connect(self):
self.conn = pymysql.connect(
host=hostname,
user=username,
password=password,
db=dbname,
charset='utf8mb4',
cursorclass=pymysql.cursors.DictCursor,
port=3306)
def query(self, sql):
try:
cursor = self.conn.cursor()
cursor.execute(sql)
except pymysql.OperationalError:
self.connect()
cursor = self.conn.cursor()
cursor.execute(sql)
return cursor
db = DB()
现在,每当您使用 db.query("example SQL")
发送查询时,请求都会自动准备好遇到连接错误,并在需要时使用 self.connect()
重新连接。
记住:这是一个简化的例子。通常,您希望让 PyMySQL 帮助您转义查询中的特殊字符。为此,您必须在 query
方法中添加第二个参数并从那里开始。
【讨论】:
你永远不应该使用“except:”,甚至不限于异常。在这种情况下,它至少应该是 pymssql.StandardError。通常您不想捕获 SystemExit 异常 sys.exit(),例如,以防止进程关闭,因为继续执行此操作会使您的进程处于非常不稳定的状态。【参考方案4】:逻辑很简单,如果连接关闭则尝试重新连接几次,在这种情况下,我使用最大尝试 15 次重新连接或 ping。
import pymysql, pymysql.cursors
conn = pymysql.connect(
host=hostname,
user=username,
password=password,
db=dbname,
charset='utf8mb4',
cursorclass=pymysql.cursors.DictCursor,
)
cursor = conn.cursor()
# you can do transactions to database and when you need conn later, just make sure the server is still connected
if conn.open is False:
max_try = 15
try = 0
while conn.open is False:
if try < max_try:
conn.ping() # autoreconnect is true by default
try +=1
# check the conn again to make sure it connected
if conn.open:
# statements when conn is successfully reconnect to the server
else:
# it must be something wrong : server, network etc
【讨论】:
conn.open 不在 PEP 249 中,它似乎检查是否存在套接字对象(在 ssl 上下文中)。如果在底层套接字选择/轮询时连接不工作,则该检查可能无法保证当远程服务器消失时 conn.open 将返回 false。【参考方案5】:旧的,但我在访问程序中的托管数据库时遇到了类似的问题。我最终使用的解决方案是创建一个装饰器,以便在进行查询时自动重新连接。
给定一个连接函数:
def connect(self):
self.conn = mysql.connector.connect(host=self.host, user=self.user,
database=self.database, password=self.password)
self.cursor = self.conn.cursor()
print("Established connectionn...")
我创造了
def _reconnect(func):
@wraps(func)
def rec(self,*args,**kwargs):
try:
result = func(self,*args,**kwargs)
return result
except (mysql.connector.Error, mysql.connector.Warning) as e:
self.connect()
result = func(self,*args,**kwargs)
return result
return rec
这样任何使用连接的函数现在都可以这样装饰
@_reconnect
def check_user_exists(self,user_id):
self.cursor.execute("SELECT COUNT(*) FROM _ where user_id=;".format(user_id))
if self.cursor.fetchall()[0][0]==0:
return False
else:
return True
此装饰器将重新建立连接并重新运行任何涉及到数据库查询的函数。
【讨论】:
以上是关于Python mysql(使用pymysql)自动重新连接的主要内容,如果未能解决你的问题,请参考以下文章
用pymysql封装连接mysql数据库的工具类Python+Requests库做接口自动化框架设计系列多测师