Flask SQLAlchemy

Posted Rannie

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flask SQLAlchemy相关的知识,希望对你有一定的参考价值。

Flask SQLAlchemy

models 文件

# pip install  sqlalchemy
import datetime
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, Text, ForeignKey, DateTime, UniqueConstraint, Index
from sqlalchemy.orm import relationship
Base = declarative_base()

class Users(Base):
    __tablename__ = 'users'  # 数据库表名称
    id = Column(Integer, primary_key=True)  # id 主键
    name = Column(String(32), index=True, nullable=False)  # name列,索引,不可为空
    age = Column(Integer)
    #email = Column(String(32), unique=True)
    #datetime.datetime.now#不能加括号,加了括号,以后永远是当前时间
    #ctime = Column(DateTime, default=datetime.datetime.now)
    #extra = Column(Text, nullable=True)

    __table_args__ = (
        # UniqueConstraint('id', 'name', name='uix_id_name'), #联合唯一
        # Index('ix_id_name', 'name', 'email'), #索引
    )
    def __repr__(self):
        return self.name
    
class Hobby(Base):
    __tablename__ = "hobby"
    id  = Column(Integer,primary_key=True)
    catption =Column(String(50),default="双色球")

class Person(Base):
    __tablename__ = "person"
    nid = Column(Integer,primary_key=True)
    name = Column(String(32))
    #hobby值tablename而不是Hobby类名,
    hobby_id = Column(Integer,ForeignKey("hobby.id"))

    # 更数据库没有关系,不会新增加字段,只能用于快速的链表查询操作
    #relationship的第一个参数,是类名,第二个参数backref,用于反向查询
    hobby =relationship("Hobby",backref="pres")
 

# 一个男孩可以喜欢多个女孩,一个女孩也可以喜欢多个男孩
class Boy2Girl(Base):
    __tablename__ = "boy2girl"
    id = Column(Integer, primary_key=True)
    girl_id = Column(Integer,ForeignKey("girl.id"))
    boy_id = Column(Integer,ForeignKey("boy.id"))


class Girl(Base):
    __tablename__ = "girl"
    id  = Column(Integer,primary_key=True)
    name =  Column(String(100),nullable=False)

    def __repr__(self):
        return self.name

class Boy(Base):
    __tablename__ = "boy"
    id = Column(Integer, primary_key=True)
    name = Column(String(100), nullable=False)
    #secondary=boy2girl 中间表的表名
    

def init_db():
    """
    根据类创建数据库表
    :return:
    """
    engine = create_engine(
        "mysql+pymysql://root:@127.0.0.1:3307/python13?charset=utf8",
        #"什么数据库(mysql,orcal)+用什么取链接数据库(pymysql)://数据库用户名:密码@mysqlip:端口/数据库名?charset=字符集"
        max_overflow=0,  # 超过连接池大小外最多创建的连接
        pool_size=5,  # 连接池大小
        pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错
        pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置)
    )

    Base.metadata.create_all(engine)

def drop_db():
    """
    根据类删除数据库表
    :return:
    """
    engine = create_engine(
         "mysql+pymysql://root:@127.0.0.1:3307/python13?charset=utf8",
        max_overflow=0,  # 超过连接池大小外最多创建的连接
        pool_size=5,  # 连接池大小
        pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错
        pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置)
    )

    Base.metadata.drop_all(engine)

if __name__ == '__main__':
    drop_db()
    init_db()

orm.py

from sqlalchemy.orm import sessionmaker
from sqlalchemy import create_engine
from models import Users
#"mysql+pymysql://root@127.0.0.1:3306/aaa"
engine = create_engine("mysql+pymysql://root:@127.0.0.1:3307/python13", max_overflow=0, pool_size=5)
Connection = sessionmaker(bind=engine)

# 每次执行数据库操作时,都需要创建一个Connection
con = Connection()

# 1 单增

# obj1  = Users(name="lsb1",age=12)
# con.add(obj1)

# 2 多个增加

# con.add_all([
#          Users(name="lsb1",age=12),
#         Users(name="esb",age=40),
#         Users(name="jsb",age=30),
#         Users(name="tsb",age=12),
#         #Host(name = "tsb",time=123213)
# ])



# 3 删除
# con.query(Users).delete()


#4 改
# con.query(Users).update({"name":"sb","age":14})

# con.query(Users).update({Users.name:Users.name +" is true","age":1},synchronize_session=False)

# con.query(Users).update({Users.age:Users.age + 10})

# 5查(查是不需要commit,也能拿到结果)
#打印sql
# r1 = con.query(Users)

#查询所有
# r1 = con.query(Users).all()
#
#查单条记录
# r1  = con.query(Users).first()

#查哪些字段
# r1  = con.query(Users.age,Users.name.label("sb")).first()

#过滤用filter_by(传参数)或者filter(传表达式)
# r1 = con.query(Users).filter(Users.name == "tsb").first()
# con.query(Users).filter(Users.name == "tsb").update({"name": "sb", "age": 14})


r1 =  con.query(Users).filter_by(name = "esb").first()

print(r1)

#必须提交才能生效
con.commit()

#关闭链接
con.close()



单表查询

from sqlalchemy.orm import sessionmaker
from sqlalchemy import create_engine
from models import Users
#"mysql+pymysql://root@127.0.0.1:3306/aaa"
engine = create_engine("mysql+pymysql://root:@127.0.0.1:3307/python13", max_overflow=0, pool_size=5)
Connection = sessionmaker(bind=engine)

# 每次执行数据库操作时,都需要创建一个Connection
session = Connection()

# 条件
# ret =  session.query(Users).filter_by(name = "esb").all()

#表达式,and 条件链接
# ret  = session.query(Users).filter(Users.name == "sb",Users.age ==14 ).first()
# print(ret.age,ret.name)

# 表示的between,条件,30<=age<=40
# ret  = session.query(Users).filter(Users.age.between(30,40)).all()
# print(ret)

# sql查询的in_操作,相当于django中的__in
# ret =session.query(Users).filter(Users.id.in_([9,11,13])).all()
# print(ret)
# # sql查询取反
# ret1 = session.query(Users).filter(~Users.id.in_([9,11,13])).all()
# print(ret1)


#or查询 ,or和and ,做整合
from sqlalchemy import or_,and_

# ret =  session.query(Users).filter(or_(Users.id == 9,Users.name=="jsb")).all()
# ret =  session.query(Users).filter(and_(Users.id == 9,Users.name=="lsb1")).all()

# ret =  session.query(Users).filter(or_(
#     Users.id == 9,
#     and_(Users.name=="jsb",Users.id==13),
#
#     )
# ).all()

# like查询,
#必须以b开头
# ret = session.query(Users).filter(Users.name.like("b%")).all()
# #第二字母是b
# ret = session.query(Users).filter(Users.name.like("_b%")).all()
#不以b开头
# ret = session.query(Users).filter(~Users.name.like("b%")).all()

#排序
#desc重大到小排序
# ret = session.query(Users).filter(Users.id>1).order_by(Users.id.desc()).all()
#desc重小到大排序
#ret = session.query(Users).filter(Users.id>1).order_by(Users.id.asc()).all()
#多条件排序,先以年纪从大到小排,如果年龄相同,再以id从小到大排
# ret = session.query(Users).filter(Users.id>1).order_by(Users.age.desc(),Users.id.asc()).all()
# print(ret)


#分组查询
# ret  = session.query(Users).group_by(Users.name).all()

# 再分组的时候如果要用聚合操作,就要导入func
from sqlalchemy.sql import func
#选出组内最小年龄要大于等于30的组
# ret  = session.query(Users).group_by(Users.name).having(func.min(Users.age)>=30).all()

#选出组内最小年龄要大于等于30的组,查询组内的最小年龄,最大年纪,年纪之和,
ret = session.query(
    func.min(Users.age),
    func.max(Users.age),
    func.sum(Users.age),
    Users.name
).group_by(Users.name).having(func.min(Users.age)>=30).all()
print(ret)

一对多关联

from sqlalchemy.orm import sessionmaker
from sqlalchemy import create_engine
from models import Hobby,Person
#"mysql+pymysql://root@127.0.0.1:3306/aaa"
engine = create_engine("mysql+pymysql://root:@127.0.0.1:3307/python13", max_overflow=0, pool_size=5)
Connection = sessionmaker(bind=engine)

# 每次执行数据库操作时,都需要创建一个Connection
session = Connection()

#1添加,没有用关联关系
# session.add_all([
#     Hobby(catption="淫诗"),
#     Hobby(catption="推背"),
#     Person(name="tank",hobby_id=1),
#     Person(name="jason",hobby_id=2)
# ])

# 2添加 用关联关系
# preson = Person(name="egon",hobby=Hobby(catption="相亲"))
#session.add(preson)
#
# hobb = Hobby(catption="人妖")
# hobb.pres = [Person(name="owen"),Person(name="sean")]
# session.add(hobb)

#session.commit()

#正向查询
# pr = session.query(Person).filter( Person.name == "tank").first()
# print(pr.name)
# print(pr.hobby.catption)


#反向查
# v = session.query(Hobby).filter(Hobby.catption=="人妖").first()
# print(v.catption)
# print(v.pres)


# 自己连表,isouter=True表示是left join,不填默认为inner join
person_list =  session.query( Hobby).join(Person,Person.hobby_id==Hobby.id,isouter=True)
#
print(person_list)

session.close()

多对多

from sqlalchemy.orm import sessionmaker
from sqlalchemy import create_engine
from models import Boy,Boy2Girl,Girl
#"mysql+pymysql://root@127.0.0.1:3306/aaa"
engine = create_engine("mysql+pymysql://root:@127.0.0.1:3307/python13", max_overflow=0, pool_size=5)
Connection = sessionmaker(bind=engine)

# 每次执行数据库操作时,都需要创建一个Connection
session = Connection()

#添加
# session.add_all([
#     Boy(name="tank"),
#     Boy(name="sean"),
#     Girl(name="仓老师"),
#     Girl(name="小泽老师")
# ])

# b2g = Boy2Girl(boy_id=1,girl_id=2)
# session.add(b2g )
# b2g = Boy2Girl(boy_id=2,girl_id=1)
# session.add(b2g )
# session.commit()
# session.close()

#
# boy = Boy(name="亚峰")
# boy.girl=[Girl(name="迪丽热巴"),Girl(name="三上")]
# session.add(boy)
# session.commit()

#
# girl = Girl(name="丹丹")
# girl.boys=[Boy(name="吴彦祖"),Boy(name="鹿晗")]
# session.add(girl)
# session.commit()

# 使用relationship的关系,正向查

# b = session.query(Boy).filter(Boy.name == "亚峰").first()
# print(b.name)
# print(b.girl)

#反向查询
# g = session.query(Girl).filter(Girl.name=="丹丹").first()
# print(g.name)
# print(g.boys)

flask_sqlalchemy

要用就必须先安装。
所有的到导入都找 下面的db
from flask_sqlalchemy import SQLAlchemy
db = SQLAlchemy()

flask_migrate

命令:manager.add_command('db1', MigrateCommand)
1 当项目第一次执行迁移的时候。
python3 manage.py db1 init 只需要初始化一次

2 python3 manage.py db1 migrate # 等同于django的makemigrations

3 python3 manage.py db1 upgrade # 等同于django的migrate

以上是关于Flask SQLAlchemy的主要内容,如果未能解决你的问题,请参考以下文章

flask_sqlalchemy 调用存储过程

如何强制 Flask/SQLAlchemy 创建表?

Flask 学习-13.Flask-SQLAlchemy 新建模型和字段

深入浅出Flask(47):flask_sqlalchemy的应用动态绑定

使用 Flask-SQLAlchemy 连接到 MSSQL 数据库

07-flask-使用sqlalchemy