Python SqlAlchemy使用方法
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Python SqlAlchemy使用方法相关的知识,希望对你有一定的参考价值。
2.初始化连接:
from sqlalchemy.ext.declarative import declarative_base from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Index from sqlalchemy.orm import sessionmaker, relationship from sqlalchemy import create_engine import os basedir = os.path.abspath(os.path.dirname(__file__)) # 创建对象的基类: Base = declarative_base()
2.创建表:
# 创建单表
class Users(Base):
# 表名
__tablename__ = ‘users‘
# 表的结构:
id = Column(Integer, primary_key=True)
username = Column(String(32))
password = Column(String(16))
初始化数据库:
def __init__(self, username, password):
# 初始化数据库
self.username = username
self.password = password
创建联合唯一索引
# __table_args__(
# UniqueConstraint("列名1","列名2","联合唯一索引名"),
# index("索引名","列名1","列名2"),
# ) #创建联合唯一索引
__table_args__ = (
UniqueConstraint(‘id‘, ‘username‘, name=‘uix_id_name‘),
Index(‘ix_id_name‘, ‘username‘, ‘password‘),
)
3.创建映射
# 创建映射
class UserORMHelper(object):
def __init__(self, database_name):
DATABASE_PATH = os.path.join(basedir, database_name)
SQLALCHEMY_DATABASE_URI = ‘sqlite:///‘ + DATABASE_PATH
self.engine = create_engine(SQLALCHEMY_DATABASE_URI)
创建表
def create_db(self):
# 创建表
Base.metadata.create_all(self.engine) # 创建表
Session = sessionmaker(bind=self.engine)
self.session = Session()
删除表
def drop_db(self):
# 删除表
Base.metadata.drop_all(self.engine) #删除表
4.接下来就是简单的针对数据库的增删改查操作了,这些操作我是为了方便测试安装自己的做了些修改,后面我会附上测试的代码
插入数据
def addUser(self, users):
# 插入数据
isSucess=False
usrsList=self.query_all_with_user_name_password(users)
if(usrsList and len(usrsList)>0):#用户已经注册
return False
self.session.add(users)
self.session.commit()
isSucess=True
return isSucess
删除数据
def delete(self,users):
# 删除数据
isSucess=False
user1 = self.session.query(Users).filter_by(username=users.username).first()
self.session.delete(user1)
self.session.commit()
isSucess=True
return isSucess
修改
(1)匹配username修改password
def update_user_extra_by_user_name(self, users):
# 匹配并且修改password
isSucess = False
self.session.query(Users).filter(Users.username == users.username).update({"password": users.password}, synchronize_session=‘evaluate‘)
self.session.commit()
isSucess=True
return isSucess
(2)匹配password修改username
def update_user_name_by_user_extra(self, users):
# 匹配并且修改name
isSucess = False
self.session.query(Users).filter(Users.password == users.password).update({"username": users.username}, synchronize_session=‘evaluate‘)
self.session.commit()
isSucess=True
return isSucess
查询
安password查询所有
def query_all_with_user_extra(self, users):
# 查询相同password数据
userList = self.session.query(Users).filter_by(password=users.password).all()
return userList
安username查询所有,但是只取其一
def query_one(self,users):
# 查询相同name数据,取一个
userList=self.session.query(Users).filter_by(username=users.username).limit(1).all()
self.session.commit()
return userList
安username查询但是只取第一条数据
def query_first_with_user_name(self, users): # 查询相同name数据,取第一个 userList=self.session.query(Users).filter_by(username=users.username).first() return userList
安username,password查询所有
def query_all_with_user_name_password(self, users):
# 查询所有相同于name,password数据
userList=self.session.query(Users).filter_by(username=users.username,password=users.password).all()
return userList
安username查询所有
def query_all_with_user_name(self, users):
# 查询所有相同name数据
userList=self.session.query(Users).filter_by(username=users.username).all()
return userList
下面的是数据库的测试代码,写测试我用的是TDD
import unittest from UserORM import Users,UserORMHelper TEST_DB = ‘test.db‘ class UserORMHelperTestCase(unittest.TestCase): def setUp(self): """Set up a blank temp database before each test""" self.helper=UserORMHelper("user.db") self.helper.drop_db() self.helper.create_db() def tearDown(self): """Destroy blank temp database after each test""" self.helper.drop_db() def test_add(self): # 插入测试 isSuccess=self.helper.addUser(Users("tome", "tom jobn")) self.assertTrue(isSuccess) isSuccess=self.helper.addUser(Users("tome", "tom jobn")) self.assertFalse(isSuccess) def test_delete(self): # 删除测试 isSuccess = self.helper.addUser(Users("tome", "tom jobn")) self.assertTrue(isSuccess) isSucess=self.helper.delete(Users("tome", "tom jobn")) self.assertTrue(isSucess) def test_query_all(self): # 查询所有相同name数据测试 isSuccess=self.helper.addUser(Users("tome", "tom jobn")) self.assertTrue(isSuccess) isSuccess=self.helper.addUser(Users("tome", "tom jobn2")) self.assertTrue(isSuccess) isSuccess=self.helper.addUser(Users("tome", "tom jobn3")) self.assertTrue(isSuccess) userList=self.helper.query_all_with_user_name(Users("tome", "tom jobn")) # print userList self.assertEqual(len(userList),3) def test_query_none(self): # 查询相同name数据,取第一个 测试 userList=self.helper.query_first_with_user_name(Users("tom", "tom job")) self.assertEqual(userList,None) def test_query_one(self): # 查询相同name数据,取一个 测试 isSuccess = self.helper.addUser(Users("tome", "tom jobn")) self.assertTrue(isSuccess) isSuccess = self.helper.addUser(Users("tome", "tom jobn2")) self.assertTrue(isSuccess) isSuccess = self.helper.addUser(Users("tome", "tom jobn3")) self.assertTrue(isSuccess) userList = self.helper.query_one(Users("tome", "tom jobn4")) self.assertEqual(len(userList), 1) def test_revise_extra(self): isSuccess = self.helper.addUser(Users("tome", "tom jobn")) self.assertTrue(isSuccess) isSuccess=self.helper.update_user_extra_by_user_name(Users("tome", "tomUpdate")) self.assertTrue(isSuccess) userList = self.helper.query_all_with_user_name(Users("tome", "tom job")) self.assertEqual(len(userList), 1) def test_recise_name(self): isSuccess = self.helper.addUser(Users("tome", "tom jobn")) self.assertTrue(isSuccess) isSuccess = self.helper.update_user_name_by_user_extra(Users("tomeUpdate", "tom jobn")) self.assertTrue(isSuccess) userList = self.helper.query_all_with_user_name(Users("tomeUpdate", "tom job")) self.assertEqual(len(userList),1) if __name__ == ‘__main__‘: unittest.main()
以上是关于Python SqlAlchemy使用方法的主要内容,如果未能解决你的问题,请参考以下文章
from flask_sqlalchemy import SQLAlchemy 无法创建数据库表,代码无报错,代码如下,python版本是3.5