上一篇主要粗略讲了Flask+mysql+sqlalchemy的使用,这次精讲下sqlalchemy的用法,话不多说,上代码。
----------sqlalchemy_test.py
# -*- coding: utf-8 -*- # Flask hello world ##链接数据库 mysql from sqlalchemy import * from sqlalchemy.orm import scoped_session, sessionmaker ###连接数据库 db_connect_string = ‘mysql://root:[email protected]:3306/flask?charset=utf8‘ ssl_args = {‘ssl‘:{‘cert‘:‘/home//ssl/client-cert.pem‘, ‘key‘:‘/home/shouse/ssl/client-key.pem‘, ‘ca‘:‘/home/shouse/ssl/ca-cert.pem‘} } engine = create_engine(db_connect_string, connect_args =ssl_args) SessionType = scoped_session(sessionmaker(bind=engine, expire_on_commit=False)) ###构建连接数据库函数 def get_session(): return SessionType ####创建自动事务函数 from contextlib import contextmanager @contextmanager def session_scope(): session = get_session() try: yield session session.commit() except: session.rollback() raise finally: session.close() ####开始进行数据库操作 # # # # ‘‘‘ class db_admin(): # 表的名字: __tablename__ = ‘db_admin‘ # 表的结构: id = Column(Integer, primary_key=True) name = Column(String(255)) pwd = Column(String(255)) power = Column(String(20)) date = Column(Date()) ‘‘‘ from sqlalchemy.ext.declarative import declarative_base from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Index from sqlalchemy.orm import sessionmaker, relationship from sqlalchemy import create_engine Base = declarative_base() # 创建单表 class Users(Base): __tablename__ = ‘users‘ id = Column(Integer, primary_key=True) name = Column(String(32)) extra = Column(String(16)) __table_args__ = ( UniqueConstraint(‘id‘, ‘name‘, name=‘uix_id_name‘), Index(‘ix_id_name‘, ‘name‘, ‘extra‘), ) class Address(Base): __tablename__ = ‘address‘ id = Column(Integer, primary_key=True) address = Column(String(32)) user_id = Column(Integer, ForeignKey(‘users.id‘)) ###添加示例 ‘‘‘ obj = Users(name="alex0", extra=‘sb‘) session.add(obj) session.add_all([ Users(name="alex1", extra=‘sb‘), Users(name="alex2", extra=‘sb‘), ]) ‘‘‘ ###创建表 Base.metadata.create_all(engine) #添加一条数据 def insert(name, fullname): with session_scope() as session: insert_data = Users(name = name, extra = fullname) session.add(insert_data) #添加多条数据 def insert_data(data): with session_scope() as session: session.add_all(data) if __name__ == ‘__main__‘: #insert(‘taotao11‘, ‘kk‘) ‘‘‘ data = [m Users(name = ‘xx‘, extra = ‘111‘), Users(name = ‘xxxx‘, extra = ‘xx3131‘), ] insert_data(data) ‘‘‘ ####查询数据 with session_scope() as session: ###查询所有 #list = session.query(Users).filter() ###查询== != #list = session.query(Users).filter(Users.name==‘xx‘) ##list = session.query(Users).filter(Users.name!=‘xx‘) ###like 模糊查询 #list = session.query(Users).filter(Users.name.like(‘%tao%‘)) ##过滤器in_ #list = session.query(Users).filter(Users.id.in_([1,3,5,7])) ##查询空值 #list = session.query(Users).filter(Users.name == None) #list = session.query(Users).filter(Users.name.is_(None)) ##非逻辑 在前面加上~ ##list = session.query(Users).filter(~Users.id.in_([1,3,5,7])) ####多条件查询 3种方法 #并列条件 #list = session.query(Users).filter(Users.name == ‘xx‘, Users.id>1) ##and_方法 #list = session.query(Users).filter(and_(Users.name == ‘xx‘, Users.id>1)) ###多个filter #list =session.query(Users).filter(Users.name == ‘xx‘).filter(Users.id>1) #####单一条件并列or_ #list = session.query(Users).filter(or_(Users.name == ‘xx‘, Users.extra==‘kk‘)) ####执行原生sql #list = session.execute(‘select * from Users‘) ‘‘‘ for v in list: print v.id, v.extra, v.name ‘‘‘