pymysql模块

Posted 桥前石头

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了pymysql模块相关的知识,希望对你有一定的参考价值。

安装pymsql模块

pip install pymysql

使用前准备


在进行本文以下内容之前需要注意:
1.你有一个MySQL数据库,并且已经启动。
2.你有可以连接该数据库的用户名和密码
3.你有一个有权限操作的database

基于pymsql模块的数据库操作类(单例)

from conf import setting
import pymysql

class Mysql:
    __instance = None
    def __init__(self):
        self.conn = pymysql.connect(
            host=setting.host,  # 数据库主机
            user=setting.user,  # 数据库用户
            password=setting.password,  # 数据库用户的密码
            database=setting.database,  # 指定数据库
            charset=setting.charset,  # 指定字符编码
            autocommit=setting.autocommit  # 自动提交
            )
        self.cursor =self.conn.cursor(
            cursor=pymysql.cursors.DictCursor  #用字典装数据
            )
    # 关数据库
    def close_db(self):
        self.conn.close()
    # 查看数据
    def select(self, sql, args=None):
        self.cursor.execute(sql, args)
        rs = self.cursor.fetchall()
        return rs

    def execute(self, sql, args):
        try:
            self.cursor.execute(sql, args)
            affected = self.cursor.rowcount
            # self.conn.commit()
        except BaseException as e:
            print(e)
        return affected

    @classmethod
    def singleton(cls):
        if not cls.__instance:
            cls.__instance = cls()
        return cls.__instance

基于数据库连接池实现的数据库操作类

db_pool.py

import pymysql
from conf import setting
from DBUtils.PooledDB import PooledDB

POOL = PooledDB(
    creator=pymysql,  # 使用链接数据库的模块
    maxconnections=6,  # 连接池允许的最大连接数,0和None表示不限制连接数
    mincached=6,  # 初始化时,链接池中至少创建的空闲的链接,0表示不创建
    maxcached=5,  # 链接池中最多闲置的链接,0和None不限制
    maxshared=3,
    # 链接池中最多共享的链接数量,0和None表示全部共享。
    blocking=True,  # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错
    maxusage=None,  # 一个链接最多被重复使用的次数,None表示无限制
    setsession=[],  # 开始会话前执行的命令列表。
    ping=0,
    # ping MySQL服务端,检查是否服务可用。

    host=setting.host,
    port=setting.port,
    user=setting.user,
    password=setting.password,
    database=setting.database,
    charset=setting.charset,
    autocommit=setting.autocommit
)

mysql_pool.py

import pymysql
from ormpool import db_pool
from threading import current_thread


class MysqlPool:
    def __init__(self):
        self.conn = db_pool.POOL.connection()
        # print(db_pool.POOL)
        # print(current_thread().getName(), '拿到连接', self.conn)
        # print(current_thread().getName(), '池子里目前有', db_pool.POOL._idle_cache, '
')
        self.cursor = self.conn.cursor(cursor=pymysql.cursors.DictCursor)# 得到一个可以执行SQL语句并且将结果作为字典返回的游标

    def close_db(self):
        self.cursor.close()
        self.conn.close()

    def select(self, sql, args=None):
        self.cursor.execute(sql, args)
        rs = self.cursor.fetchall()
        return rs

    def execute(self, sql, args):

        try:
            self.cursor.execute(sql, args)
            affected = self.cursor.rowcount
            # self.conn.commit()
        except BaseException as e:
            print(e)
        finally:
            self.close_db()
        return affected

setting.py

host = '127.0.0.1'
port = 3306
user = 'root'
password = 'root'
database = 'db1'
charset = 'utf8'
autocommit = True

增删改查操作

增加数据

# 导入pymysql模块
import pymysql
# 连接database
conn = pymysql.connect(host=“你的数据库地址”, user=“用户名”,password=“密码”,database=“数据库名”,charset=“utf8”)
# 得到一个可以执行SQL语句的光标对象
cursor = conn.cursor()
sql = "INSERT INTO USER1(name, age) VALUES (%s, %s);"
username = "Alex"
age = 18
# 执行SQL语句
cursor.execute(sql, [username, age])
# 提交事务
conn.commit()
cursor.close()
conn.close()

插入数据失败,回滚

# 导入pymysql模块
import pymysql
# 连接database
conn = pymysql.connect(host=“你的数据库地址”, user=“用户名”,password=“密码”,database=“数据库名”,charset=“utf8”)
# 得到一个可以执行SQL语句的光标对象
cursor = conn.cursor()
sql = "INSERT INTO USER1(name, age) VALUES (%s, %s);"
username = "Alex"
age = 18
try:
    # 执行SQL语句
    cursor.execute(sql, [username, age])
    # 提交事务
    conn.commit()
except Exception as e:
    # 有异常,回滚事务
    conn.rollback()
cursor.close()
conn.close()

获取插入数据的ID

# 导入pymysql模块
import pymysql
# 连接database
conn = pymysql.connect(host=“你的数据库地址”, user=“用户名”,password=“密码”,database=“数据库名”,charset=“utf8”)
# 得到一个可以执行SQL语句的光标对象
cursor = conn.cursor()
sql = "INSERT INTO USER1(name, age) VALUES (%s, %s);"
username = "Alex"
age = 18
try:
    # 执行SQL语句
    cursor.execute(sql, [username, age])
    # 提交事务
    conn.commit()
    # 提交之后,获取刚插入的数据的ID
    last_id = cursor.lastrowid
except Exception as e:
    # 有异常,回滚事务
    conn.rollback()
cursor.close()
conn.close()

批量插入数据

# 导入pymysql模块
import pymysql
# 连接database
conn = pymysql.connect(host=“你的数据库地址”, user=“用户名”,password=“密码”,database=“数据库名”,charset=“utf8”)
# 得到一个可以执行SQL语句的光标对象
cursor = conn.cursor()
sql = "INSERT INTO USER1(name, age) VALUES (%s, %s);"
data = [("Alex", 18), ("Egon", 20), ("Yuan", 21)]
try:
    # 批量执行多条插入SQL语句
    cursor.executemany(sql, data)
    # 提交事务
    conn.commit()
except Exception as e:
    # 有异常,回滚事务
    conn.rollback()
cursor.close()
conn.close()

删除数据

# 导入pymysql模块
import pymysql
# 连接database
conn = pymysql.connect(host=“你的数据库地址”, user=“用户名”,password=“密码”,database=“数据库名”,charset=“utf8”)
# 得到一个可以执行SQL语句的光标对象
cursor = conn.cursor()
sql = "DELETE FROM USER1 WHERE id=%s;"
try:
    cursor.execute(sql, [4])
    # 提交事务
    conn.commit()
except Exception as e:
    # 有异常,回滚事务
    conn.rollback()
cursor.close()
conn.close()

修改数据

# 导入pymysql模块
import pymysql
# 连接database
conn = pymysql.connect(host=“你的数据库地址”, user=“用户名”,password=“密码”,database=“数据库名”,charset=“utf8”)
# 得到一个可以执行SQL语句的光标对象
cursor = conn.cursor()
# 修改数据的SQL语句
sql = "UPDATE USER1 SET age=%s WHERE name=%s;"
username = "Alex"
age = 80
try:
    # 执行SQL语句
    cursor.execute(sql, [age, username])
    # 提交事务
    conn.commit()
except Exception as e:
    # 有异常,回滚事务
    conn.rollback()
cursor.close()
conn.close()

查看数据

查看单条数据

# 导入pymysql模块
import pymysql
# 连接database
conn = pymysql.connect(host=“你的数据库地址”, user=“用户名”,password=“密码”,database=“数据库名”,charset=“utf8”)
# 得到一个可以执行SQL语句的光标对象
cursor = conn.cursor()
# 查询数据的SQL语句
sql = "SELECT id,name,age from USER1 WHERE id=1;"
# 执行SQL语句
cursor.execute(sql)
# 获取单条查询数据
ret = cursor.fetchone()
cursor.close()
conn.close()
# 打印下查询结果
print(ret)

查看多条数据

# 导入pymysql模块
import pymysql
# 连接database
conn = pymysql.connect(host=“你的数据库地址”, user=“用户名”,password=“密码”,database=“数据库名”,charset=“utf8”)
# 得到一个可以执行SQL语句的光标对象
cursor = conn.cursor()
# 查询数据的SQL语句
sql = "SELECT id,name,age from USER1;"
# 执行SQL语句
cursor.execute(sql)
# 获取多条查询数据
ret = cursor.fetchall()
cursor.close()
conn.close()
# 打印下查询结果
print(ret)

补充

# 可以获取指定数量的数据
cursor.fetchmany(3)
# 光标按绝对位置移动1
cursor.scroll(1, mode="absolute")
# 光标按照相对位置(当前位置)移动1
cursor.scroll(1, mode="relative")

以上是关于pymysql模块的主要内容,如果未能解决你的问题,请参考以下文章

Maven类路径错误多个SLF4J绑定

slf4j+logback搭建超实用的日志管理模块

开发经验SpringBoot日志SLF4j+Logback日志模块化

CTS测试CtsWindowManagerDeviceTestCases模块的testShowWhenLockedImeActivityAndShowSoftInput测试fail项解决方法(代码片段

如何将字符串数据从活动发送到片段?

slf4j与logback的结合使用