Python操作MySQL
Posted 麻 木
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Python操作MySQL相关的知识,希望对你有一定的参考价值。
本篇对于Python操作MySQL主要使用两种方式:
- 原生模块 pymsql
- ORM框架 SQLAchemy
pymysql
pymsql是Python中操作MySQL的模块,其使用方法和MySQLdb几乎相同。
下载安装
pip3 install pymysql
使用操作
1、执行SQL
#!/usr/bin/env python # -*- coding:utf-8 -*- import pymysql # 创建连接 conn = pymysql.connect(host=\'127.0.0.1\', port=3306, user=\'root\', passwd=\'123\', db=\'t1\') # 创建游标 cursor = conn.cursor() # 执行SQL,并返回收影响行数 effect_row = cursor.execute("update hosts set host = \'1.1.1.2\'") # 执行SQL,并返回受影响行数 #effect_row = cursor.execute("update hosts set host = \'1.1.1.2\' where nid > %s", (1,)) # 执行SQL,并返回受影响行数 #effect_row = cursor.executemany("insert into hosts(host,color_id)values(%s,%s)", [("1.1.1.11",1),("1.1.1.11",2)]) # 提交,不然无法保存新建或者修改的数据 conn.commit() # 关闭游标 cursor.close() # 关闭连接 conn.close()
2、获取新创建数据自增ID
#!/usr/bin/env python # -*- coding:utf-8 -*- import pymysql conn = pymysql.connect(host=\'127.0.0.1\', port=3306, user=\'root\', passwd=\'123\', db=\'t1\') cursor = conn.cursor() cursor.executemany("insert into hosts(host,color_id)values(%s,%s)", [("1.1.1.11",1),("1.1.1.11",2)]) conn.commit() cursor.close() conn.close() # 获取最新自增ID new_id = cursor.lastrowid
3、获取查询数据
#!/usr/bin/env python # -*- coding:utf-8 -*- import pymysql conn = pymysql.connect(host=\'127.0.0.1\', port=3306, user=\'root\', passwd=\'123\', db=\'t1\') cursor = conn.cursor() cursor.execute("select * from hosts") # 获取第一行数据 row_1 = cursor.fetchone() # 获取前n行数据 # row_2 = cursor.fetchmany(3) # 获取所有数据 # row_3 = cursor.fetchall() conn.commit() cursor.close() conn.close()
注:在fetch数据时按照顺序进行,可以使用cursor.scroll(num,mode)来移动游标位置,如:
- cursor.scroll(1,mode=\'relative\') # 相对当前位置移动
- cursor.scroll(2,mode=\'absolute\') # 相对绝对位置移动
4、fetch数据类型
关于默认获取的数据是元祖类型,如果想要或者字典类型的数据,即:
#!/usr/bin/env python # -*- coding:utf-8 -*- import pymysql conn = pymysql.connect(host=\'127.0.0.1\', port=3306, user=\'root\', passwd=\'123\', db=\'t1\') # 游标设置为字典类型 cursor = conn.cursor(cursor=pymysql.cursors.DictCursor) r = cursor.execute("call p1()") result = cursor.fetchone() conn.commit() cursor.close() conn.close()
三 增、删、改:conn.commit()
import pymysql #链接 conn=pymysql.connect(host=\'localhost\',user=\'root\',password=\'123\',database=\'egon\') #游标 cursor=conn.cursor() #执行sql语句 #part1 # sql=\'insert into userinfo(name,password) values("root","123456");\' # res=cursor.execute(sql) #执行sql语句,返回sql影响成功的行数 # print(res) #part2 # sql=\'insert into userinfo(name,password) values(%s,%s);\' # res=cursor.execute(sql,("root","123456")) #执行sql语句,返回sql影响成功的行数 # print(res) #part3 sql=\'insert into userinfo(name,password) values(%s,%s);\' res=cursor.executemany(sql,[("root","123456"),("lhf","12356"),("eee","156")]) #执行sql语句,返回sql影响成功的行数 print(res) conn.commit() #提交后才发现表中插入记录成功 cursor.close() conn.close(
四 查:fetchone,fetchmany,fetchall
import pymysql #链接 conn=pymysql.connect(host=\'localhost\',user=\'root\',password=\'123\',database=\'egon\') #游标 cursor=conn.cursor() #执行sql语句 sql=\'select * from userinfo;\' rows=cursor.execute(sql) #执行sql语句,返回sql影响成功的行数rows,将结果放入一个集合,等待被查询 # cursor.scroll(3,mode=\'absolute\') # 相对绝对位置移动 # cursor.scroll(3,mode=\'relative\') # 相对当前位置移动 res1=cursor.fetchone() res2=cursor.fetchone() res3=cursor.fetchone() res4=cursor.fetchmany(2) res5=cursor.fetchall() print(res1) print(res2) print(res3) print(res4) print(res5) print(\'%s rows in set (0.00 sec)\' %rows) conn.commit() #提交后才发现表中插入记录成功 cursor.close() conn.close() \'\'\' (1, \'root\', \'123456\') (2, \'root\', \'123456\') (3, \'root\', \'123456\') ((4, \'root\', \'123456\'), (5, \'root\', \'123456\')) ((6, \'root\', \'123456\'), (7, \'lhf\', \'12356\'), (8, \'eee\', \'156\')) rows in set (0.00 sec) \'\'\'
SQLAchemy
SQLAlchemy是Python编程语言下的一款ORM框架,该框架建立在数据库API之上,使用关系对象映射进行数据库操作,简言之便是:将对象转换成SQL,然后使用数据API执行SQL并获取执行结果。
安装:
pip3 install SQLAlchemy
SQLAlchemy本身无法操作数据库,其必须以来pymsql等第三方插件,Dialect用于和数据API进行交流,根据配置文件的不同调用不同的数据库API,从而实现对数据库的操作,如:
MySQL-Python mysql+mysqldb://<user>:<password>@<host>[:<port>]/<dbname> pymysql mysql+pymysql://<username>:<password>@<host>/<dbname>[?<options>] MySQL-Connector mysql+mysqlconnector://<user>:<password>@<host>[:<port>]/<dbname> cx_Oracle oracle+cx_oracle://user:pass@host:port/dbname[?key=value&key=value...] 更多详见:http://docs.sqlalchemy.org/en/latest/dialects/index.html
一、内部处理
使用 Engine/ConnectionPooling/Dialect 进行数据库操作,Engine使用ConnectionPooling连接数据库,然后再通过Dialect执行SQL语句。
#!/usr/bin/env python # -*- coding:utf-8 -*- from sqlalchemy import create_engine engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/t1", max_overflow=5) # 执行SQL # cur = engine.execute( # "INSERT INTO hosts (host, color_id) VALUES (\'1.1.1.22\', 3)" # ) # 新插入行自增ID # cur.lastrowid # 执行SQL # cur = engine.execute( # "INSERT INTO hosts (host, color_id) VALUES(%s, %s)",[(\'1.1.1.22\', 3),(\'1.1.1.221\', 3),] # ) # 执行SQL # cur = engine.execute( # "INSERT INTO hosts (host, color_id) VALUES (%(host)s, %(color_id)s)", # host=\'1.1.1.99\', color_id=3 # ) # 执行SQL # cur = engine.execute(\'select * from hosts\') # 获取第一行数据 # cur.fetchone() # 获取第n行数据 # cur.fetchmany(3) # 获取所有数据 # cur.fetchall()
二、ORM功能使用
使用 ORM/Schema Type/SQL Expression Language/Engine/ConnectionPooling/Dialect 所有组件对数据进行操作。根据类创建对象,对象转换成SQL,执行SQL。
1、创建表
#!/usr/bin/env python # -*- coding:utf-8 -*- from sqlalchemy.ext.declarative import declarative_base from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Index from sqlalchemy.orm import sessionmaker, relationship from sqlalchemy import create_engine engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/t1", max_overflow=5) Base = declarative_base() # 创建单表 class Users(Base): __tablename__ = \'users\' id = Column(Integer, primary_key=True) name = Column(String(32)) extra = Column(String(16)) __table_args__ = ( UniqueConstraint(\'id\', \'name\', name=\'uix_id_name\'), Index(\'ix_id_name\', \'name\', \'extra\'), ) # 一对多 class Favor(Base): __tablename__ = \'favor\' nid = Column(Integer, primary_key=True) caption = Column(String(50), default=\'red\', unique=True) class Person(Base): __tablename__ = \'person\' nid = Column(Integer, primary_key=True) name = Column(String(32), index=True, nullable=True) favor_id = Column(Integer, ForeignKey("favor.nid")) # 多对多 class Group(Base): __tablename__ = \'group\' id = Column(Integer, primary_key=True) name = Column(String(64), unique=True, nullable=False) port = Column(Integer, default=22) class Server(Base): __tablename__ = \'server\' id = Column(Integer, primary_key=True, autoincrement=True) hostname = Column(String(64), unique=True, nullable=False) class ServerToGroup(Base): __tablename__ = \'servertogroup\' nid = Column(Integer, primary_key=True, autoincrement=True) server_id = Column(Integer, ForeignKey(\'server.id\')) group_id = Column(Integer, ForeignKey(\'group.id\')) def init_db(): Base.metadata.create_all(engine) def drop_db(): Base.metadata.drop_all(engine)
注:设置外检的另一种方式 ForeignKeyConstraint([\'other_id\'], [\'othertable.other_id\'])
2、操作表
#!/usr/bin/env python # -*- coding:utf-8 -*- from sqlalchemy.ext.declarative import declarative_base from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Index from sqlalchemy.orm import sessionmaker, relationship from sqlalchemy import create_engine engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/t1", max_overflow=5) Base = declarative_base() # 创建单表 class Users(Base): __tablename__ = \'users\' id = Column(Integer, primary_key=True) name = Column(String(32)) extra = Column(String(16)) __table_args__ = ( UniqueConstraint(\'id\', \'name\', name=\'uix_id_name\'), Index(\'ix_id_name\', \'name\', \'extra\'), ) def __repr__(self): return "%s-%s" %(self.id, self.name) # 一对多 class Favor(Base): __tablename__ = \'favor\' nid = Column(Integer, primary_key=True) caption = Column(String(50), default=\'red\', unique=True) def __repr__(self): return "%s-%s" %(self.nid, self.caption) class Person(Base): __tablename__ = \'person\' nid = Column(Integer, primary_key=True) name = Column(String(32), index=True, nullable=True) favor_id = Column(Integer, ForeignKey("favor.nid")) # 与生成表结构无关,仅用于查询方便 favor = relationship("Favor", backref=\'pers\') # 多对多 class ServerToGroup(Base): __tablename__ = \'servertogroup\' nid = Column(Integer, primary_key=True, autoincrement=True) server_id = Column(Integer, ForeignKey(\'server.id\')) group_id = Column(Integer, ForeignKey(\'group.id\')) group = relationship("Group", backref=\'s2g\') server = relationship("Server", backref=\'s2g\') class Group(Base): __tablename__ = \'group\' id = Column(Integer, primary_key=True) name = Column(String(64), unique=True, nullable=False) port = Column(Integer, default=22) # group = relationship(\'Group\',secondary=ServerToGroup,backref=\'host_list\') class Server(Base): __tablename__ = \'server\' id = Column(Integer, primary_key=True, autoincrement=True) hostname = Column(String(64), unique=True, nullable=False) def init_db(): Base.metadata.create_all(engine) def drop_db(): Base.metadata.drop_all(engine) Session = sessionmaker(bind=engine) session = Session()
obj = Users(name="alex0", extra=\'sb\') session.add(obj) session.add_all([ Users(name="alex1", extra=\'sb\'), Users(name="alex2", extra=\'sb\'), ]) session.commit()
session.query(Users).filter(Users.id > 2).delete()
session.commit()
session.query(Users).filter(Users.id > 2).update({"name" : "099"}) session.query(Users).filter(Users.id > 2).update({Users.name: Users.name + "099"}, synchronize_session=False) session.query(Users).filter(Users.id > 2).update({"num": Users.num + 1}, synchronize_session="evaluate") session.commit(
ret = session.query(Users).all() ret = session.query(Users.name, Users.extra).all() ret = session.query(Users).filter_by(name=\'alex\').all() ret = session.query(Users).filter_by(name=\'alex\').first() ret = session.query(Users).filter(text("id<:value and name=:name")).params(value=224, name=\'fred\').order_by(User.id).all() ret = session.query(Users).from_statement(text("SELECT * FROM users where name=:name")).params(name=\'ed\').all()
# 条件 ret = session.query(Users).filter_by(name=\'alex\').all() ret = session.query(Users).filter(Users.id > 1, Users.name == \'eric\').all() ret = session.query(Users).filter(Users.id.between(1, 3), Users.name == \'eric\').all() ret = session.query(Users).filter(Users.id.in_([1,3,4])).all() ret = session.query(Users).filter(~Users.id.in_([1,3,4])).all() ret = session.query(Users).filter(Users.id.in_(session.query(Users.id).filter_by(name=\'eric\'))).all() from sqlalchemy import and_, or_ ret = session.query(Users).filter(and_(Users.id > 3, Users.name == \'eric\')).all() ret = session.query(Users).filter(or_(Users.id < 2, Users.name == \'eric\')).all() ret = session.query(Users).filter( or_( Users.id < 2, and_(Users.name == \'eric\', Users.id > 3), Users.extra != "" )).all() # 通配符 ret = session.query(Users).filter(Users.name.like(\'e%\')).all() ret = session.query(Users).filter(~Users.name.like(\'e%\')).all() # 限制 ret = session.query(Users)[1:2] # 排序 ret = session.query(Users).order_by(Users.name.desc()).all() ret = session.query(Users).order_by(Users.name.desc(), Users.id.asc()).all() # 分组 from sqlalchemy.sql import func ret = session.query(Users).group_by(Users.extra).all() ret = session.query( func.max(Users.id), func.sum(Users.id), func.min(Users.id)).group_by(Users.name).all() ret = session.query( func.max(Users.id), func.sum(Users.id), func.min(Users.id)).group_by(Users.name).having(func.min(Users.id) >2).all() # 连表 ret = session.query(Users, Favor).filter(Users.id == Favor.nid).all() ret = session.query(Person).join(Favor).all() ret = session.query(Person).join(Favor, isouter=True).all() # 组合 q1 = session.query(Users.name).filter(Users.id > 2) q2 = session.query(Favor.caption).filter(Favor.nid < 2) ret = q1.union(q2).all() q1 = session.query(Users.name).filter(Users.id > 2) q2 = session.query(Favor.caption).filter(Favor.nid < 2) ret = q1.union_all(q2).all()
以上是关于Python操作MySQL的主要内容,如果未能解决你的问题,请参考以下文章