Python SqlAlchemy使用方法

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Python SqlAlchemy使用方法相关的知识,希望对你有一定的参考价值。

    Python SqlAlchemy使用方法

 2.初始化连接:

from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Index
from sqlalchemy.orm import sessionmaker, relationship
from sqlalchemy import create_engine
import os
basedir = os.path.abspath(os.path.dirname(__file__))

# 创建对象的基类:
Base = declarative_base()

 

 2.创建表:

# 创建单表
class Users(Base):
# 表名
__tablename__ = ‘users‘
# 表的结构:
id = Column(Integer, primary_key=True)
username = Column(String(32))
password = Column(String(16))

 

 初始化数据库:

 def __init__(self, username, password):
# 初始化数据库
self.username = username
self.password = password

创建联合唯一索引

# __table_args__(
# UniqueConstraint("列名1","列名2","联合唯一索引名"),
# index("索引名","列名1","列名2"),
# ) #创建联合唯一索引
__table_args__ = (
UniqueConstraint(‘id‘, ‘username‘, name=‘uix_id_name‘),
Index(‘ix_id_name‘, ‘username‘, ‘password‘),
)

 3.创建映射

# 创建映射
class UserORMHelper(object):
def __init__(self, database_name):
DATABASE_PATH = os.path.join(basedir, database_name)
SQLALCHEMY_DATABASE_URI = ‘sqlite:///‘ + DATABASE_PATH
self.engine = create_engine(SQLALCHEMY_DATABASE_URI)

 创建表

def create_db(self):
# 创建表
Base.metadata.create_all(self.engine) # 创建表
Session = sessionmaker(bind=self.engine)
self.session = Session()

 删除表

def drop_db(self):
# 删除表
Base.metadata.drop_all(self.engine) #删除表

 4.接下来就是简单的针对数据库的增删改查操作了,这些操作我是为了方便测试安装自己的做了些修改,后面我会附上测试的代码

   插入数据

 

def addUser(self, users):
# 插入数据
isSucess=False
usrsList=self.query_all_with_user_name_password(users)
if(usrsList and len(usrsList)>0):#用户已经注册
return False
self.session.add(users)
self.session.commit()
isSucess=True
return isSucess

 

删除数据
def delete(self,users):
# 删除数据
isSucess=False
user1 = self.session.query(Users).filter_by(username=users.username).first()
self.session.delete(user1)
self.session.commit()
isSucess=True
return isSucess

 修改

(1)匹配username修改password

def update_user_extra_by_user_name(self, users):
# 匹配并且修改password
isSucess = False
self.session.query(Users).filter(Users.username == users.username).update({"password": users.password}, synchronize_session=‘evaluate‘)
self.session.commit()
isSucess=True
return isSucess

 (2)匹配password修改username

def update_user_name_by_user_extra(self, users):
# 匹配并且修改name
isSucess = False
self.session.query(Users).filter(Users.password == users.password).update({"username": users.username}, synchronize_session=‘evaluate‘)
self.session.commit()
isSucess=True
return isSucess

 

查询

安password查询所有

def query_all_with_user_extra(self, users):
# 查询相同password数据
userList = self.session.query(Users).filter_by(password=users.password).all()
return userList

 安username查询所有,但是只取其一

def query_one(self,users):
# 查询相同name数据,取一个
userList=self.session.query(Users).filter_by(username=users.username).limit(1).all()
self.session.commit()
return userList

  安username查询但是只取第一条数据

    def query_first_with_user_name(self, users):
        # 查询相同name数据,取第一个
        userList=self.session.query(Users).filter_by(username=users.username).first()
        return userList

 安username,password查询所有

def query_all_with_user_name_password(self, users):
# 查询所有相同于name,password数据
userList=self.session.query(Users).filter_by(username=users.username,password=users.password).all()
return userList

 安username查询所有

def query_all_with_user_name(self, users):
# 查询所有相同name数据
userList=self.session.query(Users).filter_by(username=users.username).all()
return userList

 下面的是数据库的测试代码,写测试我用的是TDD

import unittest
from UserORM import Users,UserORMHelper




TEST_DB = ‘test.db‘

class UserORMHelperTestCase(unittest.TestCase):



    def setUp(self):
        """Set up a blank temp database before each test"""
        self.helper=UserORMHelper("user.db")
        self.helper.drop_db()
        self.helper.create_db()


    def tearDown(self):
        """Destroy blank temp database after each test"""
        self.helper.drop_db()

    def test_add(self):
        # 插入测试
        isSuccess=self.helper.addUser(Users("tome", "tom jobn"))
        self.assertTrue(isSuccess)
        isSuccess=self.helper.addUser(Users("tome", "tom jobn"))
        self.assertFalse(isSuccess)


    def test_delete(self):
        # 删除测试
        isSuccess = self.helper.addUser(Users("tome", "tom jobn"))
        self.assertTrue(isSuccess)
        isSucess=self.helper.delete(Users("tome", "tom jobn"))
        self.assertTrue(isSucess)

    def test_query_all(self):
        # 查询所有相同name数据测试
         isSuccess=self.helper.addUser(Users("tome", "tom jobn"))
         self.assertTrue(isSuccess)
         isSuccess=self.helper.addUser(Users("tome", "tom jobn2"))
         self.assertTrue(isSuccess)
         isSuccess=self.helper.addUser(Users("tome", "tom jobn3"))
         self.assertTrue(isSuccess)
         userList=self.helper.query_all_with_user_name(Users("tome", "tom jobn"))
         # print userList
         self.assertEqual(len(userList),3)


    def test_query_none(self):
        # 查询相同name数据,取第一个  测试
         userList=self.helper.query_first_with_user_name(Users("tom", "tom job"))
         self.assertEqual(userList,None)

    def test_query_one(self):
        # 查询相同name数据,取一个   测试
        isSuccess = self.helper.addUser(Users("tome", "tom jobn"))
        self.assertTrue(isSuccess)
        isSuccess = self.helper.addUser(Users("tome", "tom jobn2"))
        self.assertTrue(isSuccess)
        isSuccess = self.helper.addUser(Users("tome", "tom jobn3"))
        self.assertTrue(isSuccess)
        userList = self.helper.query_one(Users("tome", "tom jobn4"))
        self.assertEqual(len(userList), 1)


    def test_revise_extra(self):
        isSuccess = self.helper.addUser(Users("tome", "tom jobn"))
        self.assertTrue(isSuccess)

        isSuccess=self.helper.update_user_extra_by_user_name(Users("tome", "tomUpdate"))

        self.assertTrue(isSuccess)
        userList = self.helper.query_all_with_user_name(Users("tome", "tom job"))
        self.assertEqual(len(userList), 1)


    def test_recise_name(self):
        isSuccess = self.helper.addUser(Users("tome", "tom jobn"))
        self.assertTrue(isSuccess)
        isSuccess = self.helper.update_user_name_by_user_extra(Users("tomeUpdate", "tom jobn"))
        self.assertTrue(isSuccess)
        userList = self.helper.query_all_with_user_name(Users("tomeUpdate", "tom job"))
        self.assertEqual(len(userList),1)















if __name__ == ‘__main__‘:
    unittest.main()

 

 


 

 

 

 

 

 








































































以上是关于Python SqlAlchemy使用方法的主要内容,如果未能解决你的问题,请参考以下文章

python SQLAlchemy 缓存问题

python SQLAlchemy 缓存问题

from flask_sqlalchemy import SQLAlchemy 无法创建数据库表,代码无报错,代码如下,python版本是3.5

Python - 使用 sqlalchemy 的 Postgres 查询返回“空数据框”

python第四十五天 (SQLAlchemy) 的操作

Python SQLAlchemy入门教程