mysql连接池实现
Posted ?养花怪兽
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了mysql连接池实现相关的知识,希望对你有一定的参考价值。
代码如下:
import pymysql
import logging
import traceback
import threading
if __name__ == \'__main__\':
from config import mysql_conf
else:
from config.config import mysql_conf
from dbutils.pooled_db import PooledDB
# MySQL连接池
class MySQLPool(object):
# 类变量
pool = PooledDB(creator=pymysql, **mysql_conf)
print("创建数据库连接池 >>>", id(pool))
# with上下文
def __enter__(self):
self.conn = self.pool.connection()
self.cursor = self.conn.cursor(cursor=pymysql.cursors.DictCursor)
# 记得return self
return self
def __exit__(self, exc_type, exc_val, exc_tb):
# 关闭连接池
self.cursor.close()
self.conn.close()
# 插入或修改操作
def insert_or_update(self, sql):
try:
self.cursor.execute(sql)
rowid = self.cursor.lastrowid
self.conn.commit()
return rowid
except Exception as error:
print(traceback.format_exc())
# 回滚
self.conn.rollback()
# 简单的日志处理
print(error)
# logging.error("=======ERROR=======\\n%s\\nsql:%s" % (error, sql))
raise
# 插入或修改操作
def insert_many(self, sql, data):
try:
self.cursor.executemany(sql, data)
self.conn.commit()
return 1
except Exception as error:
print(traceback.format_exc())
# 回滚
self.conn.rollback()
# 简单的日志处理
print(error)
# logging.error("=======ERROR=======\\n%s\\nsql:%s" % (error, sql))
raise
# 查询操作
def query(self, sql):
try:
self.cursor.execute(sql)
results = self.cursor.fetchall()
return results
except Exception as error:
# 简单的日志处理
print(error)
# logging.error("=======ERROR=======:\\n%s\\nsql:%s" % (error, sql))
raise
if __name__ == \'__main__\':
mysql = MySQLPool()
直接调用即可
with MySQLPool() as poolHandler:
# 插入position_pool
poolHandler.insert_many(sql_insert_pool, data)
# 插入position_increase
poolHandler.insert_many(sql_insert_increase, data)
print("创建数据库连接池 >>>", id(pool)),打印这一行代码其实就知道是不是单个连接池了
以上是关于mysql连接池实现的主要内容,如果未能解决你的问题,请参考以下文章