如何实现python的mysql连接池并加入缓存过期
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了如何实现python的mysql连接池并加入缓存过期相关的知识,希望对你有一定的参考价值。
import mysqldbimport time
import string
import redis
class PooledConnection:
#构建连接池实例
def __init__(self, maxconnections, connstr,dbtype):
from Queue import Queue
self._pool = Queue(maxconnections) # create the queue
self.connstr = connstr
self.dbtype=dbtype
self.maxconnections=maxconnections
#根据你给数目来创建链接,并且写入刚才创建的队列里面。
try:
for i in range(maxconnections):
self.fillConnection(self.CreateConnection(connstr,dbtype))
except Exception,e:
raise e
def fillConnection(self,conn):
try:
self._pool.put(conn)
except Exception,e:
raise "fillConnection error:"+str(e)
def returnConnection(self, conn):
try:
self._pool.put(conn)
except Exception,e:
raise "returnConnection error:"+str(e)
def getConnection(self):
try:
return self._pool.get()
except Exception,e:
raise "getConnection error:"+str(e)
def ColseConnection(self,conn):
try:
self._pool.get().close()
self.fillConnection(self.CreateConnection(connstr,dbtype))
except Exception,e:
raise "CloseConnection error:"+str(e)
def CreateConnection(self,connstr,dbtype):
if dbtype==\'xxx\':
pass
elif dbtype==\'mysql\':
try:
db_conn = connstr.split("#");
#conndb=MySQLdb.connect(db=conf.mydb,host=conf.dbip,user=conf.myuser,passwd=conf.mypasswd);
conndb=MySQLdb.connect(user=db_conn[0],passwd=db_conn[1],host=db_conn[2],port=string.atoi(db_conn[3]),db=db_conn[4]);
conndb.clientinfo = \'datasync connection pool from datasync.py\';
conndb.ping();
except Exception, e:
raise \'conn targetdb datasource Excepts,%s!!!(%s).\'%(db_conn[2],str(e))
return None
#mysql如下创建连接池:
connstring="xiaorui.cc#xiaoru.cc#xiaorui.cc#3306#dbname";
mysqlpool=PooledConnection(10,connstring,"mysql");
#获取连接:
mysqlpool.getConnection() 参考技术A 连接池不是几行代码 能搞定的,建议使用成熟的模块——DBUtils
基于python的MySQL和redis数据同步实现(redis做缓存)
一、背景原理
1、MySQL数据库
MySQL是一种关系型数据库,主要用于存放持久化数据,将数据存储在硬盘中,读取速度较慢。每次请求访问数据库时,都存在着I/O操作,如果反复频繁的访问数据库:会在反复链接数据库上花费大量时间,从而导致运行效率过慢;反复的访问数据库也会导致数据库的负载过高。所以,针对MySQL的缺点,衍生出了缓存的概念。
2、redis数据库
redis是一款非关系型数据库,是一种缓存数据库,数据存放在内存中,用于存储使用频繁的数据,这样减少访问数据库的次数,提高运行效率。所以redis数据库读取速度比较快,运行效率高。
3、二者区别与联系
(1) 类型:MySQL是关系型数据库,redis是缓存数据库;
(2) 作用:MySQL用于持久化的存储数据到硬盘,功能强大,速度较慢,基于磁盘,读写redis快,但是不受空间容量限制,性价比高;redis用于存储使用较为频繁的数据到缓存中,读取速度快,基于内存,读写速度快,也可做持久化,但是内存空间有限,当数据量超过内存空间时,需扩充内存,但内存价格贵;
(3) 需求:mysql和redis因为需求的不同,一般都是配合使用。需要高性能的地方使用Redis,不需要高性能的地方使用MySQL。存储数据在MySQL和Redis之间做同步。所以一般情形下,使用MySQL作为持久化存储数据库存储数据,使用redis作为缓存提升读取速度。
二、数据同步实现方案
本文实验,设计一个学生信息表,持久化数据存储在MySQL数据库中,然后利用redis作为缓存数据库,实现数据的快速读取。这样就需要保持redis和MySQL数据库的数据一致性,接下来,主要讲解查询和数据更新过程的数据库一致性实现。
1、查询一致性
查询数据时,由于redis作为缓存实现快速读取数据,所以首先查询redis中是否存在数据,若存在则返回查询结果,若不存在,则向MySQL数据库请求查询数据,然后由MySQL数据库返回结果。查询流程如下如所示。而且,由于本文中redis作为缓存使用,所以需要添加过期时间,也就是为redis的每条数据记录添加过期时间,若过期时间数据没有被查询则清除,若此时间内,数据被查询,则过期时间重置,这样可以定时清除查询不频繁的数据存在redis中,增加数据读取速度。
2、数据更新一致性
更新数据时,如果先更新MySQL数据库,在尚未更新redis时,如果此时有查询进行,则redis返回尚未更新的数据,返回结果有误。所以为了避免这种情况,采用先更新redis,再更新MySQL的方案。也就是,首先查询redis是否存在要更新的数据,存在则清除redis的这条数据,进行重新添加更新,然后再更新MySQL数据库数据,当MySQL更新成功后,再次更新redis,防止MySQL更新失败,而redis更新成功的情况发生;若不存在,则更新MySQL数据库数据,MySQL更新成功后,然后再更新redis。
三、数据库设计
1、MySQL设计
设计一张学生信息表,存储学生学号、姓名、出生日期、电话号码等信息,主键为学生学号。SQL语句如下。
CREATE DATABASE python_mysql_test01;
CREATE TABLE tb_student(
stu_id INT PRIMARY KEY NOT NULL,
stu_name VARCHAR(20) NOT NULL,
stu_birth DATE,
stu_phone VARCHAR(100)
);
2、redis设计
这里redis表的设计采用hash类型的数据,这样可以存在多个key-value对,以用户ID作为hash表的名称,stu_name、stu_birth等作为hash表的键值对,即一个记录:
hmset stu_id:1001 stu_name 'Alice' stu_birth '1990-12-15' stu_phone '15522222141'
同时设置过期时间:expire stu_id:1001 600,即过期时间为10分钟。
四、程序编写
1、查询数据
查询数据,先去缓存redis中查找,如存在数据则返回结果,若不存在,则去MySQL中查找。
# 查询数据
def get_data(self, stu_id):
# redis hash表名称
find_info = 'stu_id:' + str(stu_id)
# 先查询redis数据库是否存在数据,如果存在数据则返回输出,若不存在则去MySQL中查询,然后再将结果更新到redis中
result = self.r0.hgetall(find_info)
# 长度>0 即redis存在查询的信息,直接输出信息,否则redis中不存在,需要查询MySQL
if len(result) > 0:
"""
每次在redis中更新或者写入数据都需要设置过期时间10分钟,然后每查询到一次就重置过期时间10分钟,
若10分钟没有查询到这个数据,就会被清除。这样设置过期时间主要防止redis缓存数据过多,清除不常用缓存数据"""
self.r0.expire(find_info, 600)
print(result)
return result
else:
with self.conn.cursor() as cursor:
try:
# 执行MySQL的查询操作
cursor.execute('select stu_name, stu_birth, stu_phone from tb_student '
'where stu_id=%s', (stu_id,))
result_sql = cursor.fetchall()
print(result_sql)
# 将查询结果更新写入redis数据库中
stu_name, stu_birth, stu_phone = result_sql[0][0], result_sql[0][1], result_sql[0][2]
data_info = 'stu_name': stu_name,
'stu_birth': str(stu_birth),
'stu_phone': stu_phone
self.r0.hmset(find_info, data_info)
self.r0.expire(find_info, 600) # 设置过期时间
return result_sql
except Exception as error:
print(error)
finally:
self.conn.close()
2、更新数据
更新数据,这里主要是以插入数据为例。
"""
更新数据的操作,为了避免更新MySQL后,redis没更新的这一段空挡时间的查询,所以先更新redis,
再更新MySQL,然后MySQL成功提交后,再次对redis进行重新更新
"""
def post_data(self):
# 插入数据
stu_id, stu_name, stu_birth, stu_phone = 1004, 'Tom', '1993-07-04', '19909092332'
# redis hash表名称
find_info = 'stu_id:' + str(stu_id)
# 先查询redis数据库是否存在数据,如果存在数据则更新redis,再更新MySQL,若不存在则去MySQL中更新,提交成功再次更新redis
result = self.r0.hgetall(find_info)
# reids存在数据,则需要对数据进行更新,即先清除再写入; 写入redis后,再将数据写入MySQL
if len(result) > 0:
# 清除数据
all_keys = self.r0.hkeys(find_info)
self.r0.hdel(find_info, *all_keys)
data_info = 'stu_name': stu_name,
'stu_birth': stu_birth,
'stu_phone': stu_phone
self.r0.hmset(find_info, data_info)
self.r0.expire(find_info, 600) # 设置过期时间
with self.conn.cursor() as cursor:
try:
# 插入SQL语句,result为返回的结果
res_info = cursor.execute(
'insert into tb_student values (%s, %s, %s, %s)', (stu_id, stu_name, stu_birth, stu_phone,)
)
# 成功插入后需要提交才能同步在数据库中
if isinstance(res_info, int):
print('数据更新成功')
self.conn.commit()
all_keys = self.r0.hkeys(find_info)
# 再次更新redis
self.r0.hdel(find_info, *all_keys)
self.r0.hmset(find_info, data_info)
self.r0.expire(find_info, 600) # 设置过期时间
except MySQLError as error:
# 如果MySQL提交不成功,清除redis数据
all_keys = self.r0.hkeys(find_info)
self.r0.hdel(find_info, *all_keys)
print(error)
self.conn.rollback()
finally:
# 操作执行完成后,需要关闭连接
self.conn.close()
else:
with self.conn.cursor() as cursor:
try:
# 插入SQL语句,result为返回的结果
res_info = cursor.execute(
'insert into tb_student values (%s, %s, %s, %s)', (stu_id, stu_name, stu_birth, stu_phone,)
)
# 成功插入后需要提交才能同步在数据库中
if isinstance(res_info, int):
print('数据更新成功')
self.conn.commit()
except MySQLError as error:
print(error)
self.conn.rollback()
finally:
# 操作执行完成后,需要关闭连接
self.conn.close()
附录代码:
import pymysql
import redis
from pymysql import MySQLError
import time, datetime
class DatabaseSync:
def __init__(self):
# 连接MySQL数据库
try:
self.conn = pymysql.connect(host='1.1.1.1', port=3306,
user='root', password='111111',
database='python_mysql_test01', charset='utf8')
except Exception as error:
print('连接MySQL出现问题!')
print('失败原因:', error)
exit()
try:
# 建立redis连接池
self.conn_pool = redis.ConnectionPool(host='1.1.1.1', port=6379, db=0, decode_responses=True,
password='111111')
# 客户端0连接数据库
self.r0 = redis.StrictRedis(connection_pool=self.conn_pool)
except Exception as error:
print('连接redis出现问题!')
print('失败原因:', error)
exit()
# 查询数据
def get_data(self, stu_id):
# redis hash表名称
find_info = 'stu_id:' + str(stu_id)
# 先查询redis数据库是否存在数据,如果存在数据则返回输出,若不存在则去MySQL中查询,然后再将结果更新到redis中
result = self.r0.hgetall(find_info)
# 长度>0 即redis存在查询的信息,直接输出信息,否则redis中不存在,需要查询MySQL
if len(result) > 0:
"""
每次在redis中更新或者写入数据都需要设置过期时间10分钟,然后每查询到一次就重置过期时间10分钟,
若10分钟没有查询到这个数据,就会被清除。这样设置过期时间主要防止redis缓存数据过多,清除不常用缓存数据"""
self.r0.expire(find_info, 600)
print(result)
return result
else:
with self.conn.cursor() as cursor:
try:
# 执行MySQL的查询操作
cursor.execute('select stu_name, stu_birth, stu_phone from tb_student '
'where stu_id=%s', (stu_id,))
result_sql = cursor.fetchall()
print(result_sql)
# 将查询结果更新写入redis数据库中
stu_name, stu_birth, stu_phone = result_sql[0][0], result_sql[0][1], result_sql[0][2]
data_info = 'stu_name': stu_name,
'stu_birth': str(stu_birth),
'stu_phone': stu_phone
self.r0.hmset(find_info, data_info)
self.r0.expire(find_info, 600) # 设置过期时间
return result_sql
except Exception as error:
print(error)
finally:
self.conn.close()
"""
更新数据的操作,为了避免更新MySQL后,redis没更新的这一段空挡时间的查询,所以先更新redis,
再更新MySQL,然后MySQL成功提交后,再次对redis进行重新更新
"""
def post_data(self):
# 插入数据
stu_id, stu_name, stu_birth, stu_phone = 1004, 'Tom', '1993-07-04', '19909092332'
# redis hash表名称
find_info = 'stu_id:' + str(stu_id)
# 先查询redis数据库是否存在数据,如果存在数据则更新redis,再更新MySQL,若不存在则去MySQL中更新,提交成功再次更新redis
result = self.r0.hgetall(find_info)
# reids存在数据,则需要对数据进行更新,即先清除再写入; 写入redis后,再将数据写入MySQL
if len(result) > 0:
# 清除数据
all_keys = self.r0.hkeys(find_info)
self.r0.hdel(find_info, *all_keys)
data_info = 'stu_name': stu_name,
'stu_birth': stu_birth,
'stu_phone': stu_phone
self.r0.hmset(find_info, data_info)
self.r0.expire(find_info, 600) # 设置过期时间
with self.conn.cursor() as cursor:
try:
# 插入SQL语句,result为返回的结果
res_info = cursor.execute(
'insert into tb_student values (%s, %s, %s, %s)', (stu_id, stu_name, stu_birth, stu_phone,)
)
# 成功插入后需要提交才能同步在数据库中
if isinstance(res_info, int):
print('数据更新成功')
self.conn.commit()
all_keys = self.r0.hkeys(find_info)
# 再次更新redis
self.r0.hdel(find_info, *all_keys)
self.r0.hmset(find_info, data_info)
self.r0.expire(find_info, 600) # 设置过期时间
except MySQLError as error:
# 如果MySQL提交不成功,清除redis数据
all_keys = self.r0.hkeys(find_info)
self.r0.hdel(find_info, *all_keys)
print(error)
self.conn.rollback()
finally:
# 操作执行完成后,需要关闭连接
self.conn.close()
else:
with self.conn.cursor() as cursor:
try:
# 插入SQL语句,result为返回的结果
res_info = cursor.execute(
'insert into tb_student values (%s, %s, %s, %s)', (stu_id, stu_name, stu_birth, stu_phone,)
)
# 成功插入后需要提交才能同步在数据库中
if isinstance(res_info, int):
print('数据更新成功')
self.conn.commit()
except MySQLError as error:
print(error)
self.conn.rollback()
finally:
# 操作执行完成后,需要关闭连接
self.conn.close()
if __name__ == '__main__':
dbs = DatabaseSync()
# dbs.get_data(1003)
dbs.post_data()
以上是关于如何实现python的mysql连接池并加入缓存过期的主要内容,如果未能解决你的问题,请参考以下文章
基于python的MySQL和redis数据同步实现(redis做缓存)
基于python的MySQL和redis数据同步实现(redis做缓存)
基于python的MySQL和redis数据同步实现(redis做缓存)