sqlalchemy学习
Posted lilied
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了sqlalchemy学习相关的知识,希望对你有一定的参考价值。
楔子
sql语言
SELECT
a.uid,
count( if (a.total_sell_balance>a.total_buy_balance,true,null)) / count(*) as num1,
count(*) as num2,
count( if (a.total_sell_balance > a.total_buy_balance,true,null)) as profitnum,
CASE
# 如果b.trade_days <= 20则显示b.trade_days
WHEN b.trade_days <= 20 THEN b.trade_days
WHEN b.trade_days > 20 THEN (b.trade_num / b.trade_days) * 20
END as avgnum,
b.trade_days as trade_days
FROM tb_stat_win_rate as a
LEFT JOIN tb_stat_month_trade as b on a.uid=b.uid
对应的sqlalchemy ORM
query = db.session.query(
TbStatWinRate.uid,
func.count(func.if_(TbStatWinRate.total_sell_balance > TbStatWinRate.total_buy_balance,True,None)) / func.count(TbStatWinRate.uid),
func.count(TbStatWinRate.uid),
# 如果TbStatWinRate.total_sell_balance > TbStatWinRate.total_buy_balance,则显示True,否则显示None
func.count(func.if_(TbStatWinRate.total_sell_balance > TbStatWinRate.total_buy_balance,True,None)),
func.if_(TbStatMonthTrade.trade_days <= 20, TbStatMonthTrade.trade_days,(TbStatMonthTrade.trade_num / TbStatMonthTrade.trade_days) * 20),
TbStatMonthTrade.trade_days,
).outerjoin(TbStatWinRate,TbStatMonthTrade.uid == TbStatWinRate.uid,isouter=True)
第一节:sqlalchemy介绍:
1.SQLAlchemy是Python编程语言下的一款ORM框架,该框架建立在数据库API之上,使用关系对象映射进行数据库操作,简言之便是:将对象转换成SQL,然后使用数据API执行SQL并获取执行结果。
2.SQLAlchemy本身无法操作数据库,其必须以来pymsql等第三方插件,Dialect用于和数据API进行交流,根据配置文件的不同调用不同的数据库API,从而实现对数据库的操作,如:
mysql-Python
mysql+mysqldb://<username>:<password>@<host>/<dbname>:<port>[?<options>]
pymysql
mysql+pymysql://<username>:<password>@<host>/<dbname>:<port>[?<options>]
cx_Oracle
oracle+cx_oracle://<user>:<pass>@<host>:<port>/<dbname>[?key=value&key=value...]
更多详见:http://docs.sqlalchemy.org/en/latest/dialects/index.html
注意 :
写入数据库表汉字时候 需要加编码 charset=utf8
engine = create_engine('mysql+pymysql://[email protected]:3306/db2?charset=utf8')
#1 连接已存在的数据库
engine = create_engine('sqlite:///dbyuan67.db', echo=True)
#2 打印sql语言
第二节: 创建表
engine = create_engine('mysql+pymysql://[email protected]:3306/db2?charset=utf8')
#1 连接已存在的数据库
engine = create_engine('sqlite:///dbyuan67.db', echo=True)
#2 打印sql语言
第二节: 创建表
sql 写法
CREATE TABLE `person`(
`id` INT NOT NULL AUTO_INCREMENT PRIMARY KEY,
`name` VARCHAR(20) NOT NULL,
`age` INT NOT NULL DEFAULT 20,
`sex` SMALLINT DEFAULT 1,
`t_time` DATETIME NULL
) DEFAULT CHARSET='utf8';
sql expression language 写法
# -*- coding: utf-8 -*-
# @Author: Lai
from sqlalchemy import (Table, MetaData, create_engine,
Column, Integer, String, SmallInteger, DateTime)
from datetime import datetime
from sqlalchemy.orm import mapper, sessionmaker
engine = create_engine("mysql+mysqldb://root:[email protected]/todo?charset=utf8")
metadata = MetaData()
# table
user = Table("user", metadata,
Column("id", Integer, nullable=False, primary_key=True, autoincrement=True),
Column("username", String(20), nullable=False),
Column("age", Integer, nullable=False),
Column("sex", SmallInteger, default=1),
Column("create_time", DateTime, default=datetime.now)
)
# model
class User(object):
def __init__(self, username=None, age=None, sex=None):
if username:
self.username = username
if age:
self.age =age
if sex:
self.sex =sex
# table与model映射
mapper(User, user)
if __name__ == "__main__":
# metadata.create_all(bind=engine) #建表
Session = sessionmaker(bind=engine)
session = Session()
try:
user = User("rose", 20, 0)
session.add(user)
session.commit()
except Exception as e:
print(e)
session.rollback()
session.close()
sql orm 写法
# -*- coding: utf-8 -*-
# @Author: Lai
from datetime import datetime
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import (create_engine, Column, Integer, String, SmallInteger, DateTime)
from sqlalchemy.orm import Session
engine = create_engine("mysql+mysqldb://root:[email protected]/todo?charset=utf8")
Base = declarative_base()
class Human(Base):
__tablename__ = "human"
id = Column("id", Integer, autoincrement=True, primary_key=True)
name = Column("name", String(20), nullable=False, unique=True)
age = Column("age", Integer, nullable=False)
sex = Column("sex", SmallInteger, default=1)
create_time = Column("create_time", DateTime, default=datetime.now)
def __repr__(self):
return "name ".format(self.name)
if __name__ == "__main__":
# Base.metadata.create_all(bind=engine) # 建表
session = Session(bind=engine)
# h = Human(name="king001", age=30, sex=1)
# session.add(h)
# Human.__table__ ------------------------------>orm转经典类型(table类型)
try:
res = []
for i in range(2,11):
h = Human(name="king00".format(str(i)), age=i, sex=1)
res.append(h)
session.add_all(res)
session.commit()
except Exception as e:
print(e)
session.rollback()
session.close()
第三节: 反射表
sql expression language 写法
# -*- coding: utf-8 -*-
# @Author: Lai
from datetime import datetime
from sqlalchemy import Table, create_engine, MetaData, select, func
engine = create_engine("mysql+mysqldb://root:[email protected]/todo?charset=utf8")
metadata = MetaData()
# 开启一个连接
conn = engine.connect()
# 反射表
human = Table("human", metadata, autoload=True, autoload_with=engine)
#反射库
metadata.reflect(bind=engine)
human = metadata.tables.get('human')
# 批量插入
def insert():
ins = human.insert()
conn.execute(ins, "name":"ppp","age":20,"sex":1)
conn.execute(ins, ["name":"ppp","age":20,"sex":1,"name":"mmm","age":30,"sex":0])
conn.close()
# 插入2
def insert2():
ins = human.insert().values(name="bbb", age=40, sex=0, create_time=datetime.now())
conn.execute(ins)
conn.close()
# 修改
def update():
up = human.update().values(name="vvv").where(human.c.name=="bbb")
conn.execute(up)
conn.close()
# 删除
def delete():
de = human.delete().where(human.c.name=="vvv")
conn.execute(de)
conn.close()
# 查询
def _select():
s = select([human.c.name, human.c.age]).where(human.c.age>20).limit(2).offset(0)
res = conn.execute(s)
print(res.fetchall())
# 查询2
def _select2():
s = select([human.c.sex, func.count(human.c.id), func.sum(human.c.age)]).group_by(human.c.sex)
res = conn.execute(s)
print(res.fetchall())
#product---->project---->requirement
# 查询3(连接查询)
def select3():
i = requirement.join(project, project.c.id==requirement.c.prj_id).join(product, product.c.id==project.c.prod_id)
s = select([project.c.prj_name.label("prj_name"), product.c.prod_name.label("prod_name"), requirement.c.req_name.label("req_name")]).select_from(i)
res = self.conn.execute(s).fetchall()
####################################################################
# 注意:多次连表查询,xxx_table.join(xx,xx==xx).join(xx,xx==xx)以此类推
####################################################################
"""
SELECT project.prj_name AS prj_name, product.prod_name AS prod_name, requirement.req_name AS req_name
FROM requirement JOIN project ON project.id = requirement.prj_id JOIN product ON product.id = project.prod_id
"""
if __name__ == "__main__":
_select2()
sql orm 写法
# -*- coding: utf-8 -*-
# @Author: Lai
from sqlalchemy.ext.automap import automap_base
from sqlalchemy import create_engine
from sqlalchemy.orm import Session
from datetime import datetime
engine = create_engine("mysql+mysqldb://root:[email protected]/todo?charset=utf8")
Base = automap_base()
Base.prepare(engine, reflect=True)
# 反射得到orm
Human = Base.classes.human
# 通信
session = Session(bind=engine)
# 插入数据
def insert():
h = Human(name="vcr", age=67, sex=1, create_time=datetime.now())
session.add(h)
#session.add_all([x,y])
session.commit()
# 修改数据
def update():
h_obj = session.query(Human).filter_by(name="vcr").first()
h_obj.name = "vccrr"
session.add(h_obj)
session.commit()
# 删除数据
def delete():
h_obj = session.query(Human).filter_by(name="vccrr").first()
session.delete(h_obj)
session.commit()
# 查询数据
def select():
res = session.query(Human).filter(Human.id > 7)
print([i for i in res])
if __name__ == "__main__":
select()
以上是关于sqlalchemy学习的主要内容,如果未能解决你的问题,请参考以下文章