sqlalchemy
Posted mitsuhide1992
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了sqlalchemy相关的知识,希望对你有一定的参考价值。
Using Subqueries
是指需要用到子查询的时候,如select ... from (select .. from)
The Query is suitable for generating statements which can be used as subqueries. Suppose we wanted to load User objects along with a count of how many Address records each user has. The best way to generate SQL like this is to get the count of addresses grouped by user ids, and JOIN to the parent. In this case we use a LEFT OUTER JOIN so that we get rows back for those users who don’t have any addresses, e.g.:
SELECT users.*, adr_count.address_count FROM users LEFT OUTER JOIN
(SELECT user_id, count(*) AS address_count
FROM addresses GROUP BY user_id) AS adr_count
ON users.id=adr_count.user_id
Using the Query, we build a statement like this from the inside out. The statement accessor returns a SQL expression representing the statement generated by a particular Query - this is an instance of a select() construct, which are described in SQL Expression Language Tutorial:
>>> from sqlalchemy.sql import func
>>> stmt = session.query(Address.user_id, func.count('*').\\
... label('address_count')).\\
... group_by(Address.user_id).subquery()
The func keyword generates SQL functions, and the subquery() method on Query produces a SQL expression construct representing a SELECT statement embedded within an alias (it’s actually shorthand for query.statement.alias()).
Once we have our statement, it behaves like a Table construct, such as the one we created for users at the start of this tutorial. The columns on the statement are accessible through an attribute called c:
SQL>>> for u, count in session.query(User, stmt.c.address_count).\\
... outerjoin(stmt, User.id==stmt.c.user_id).order_by(User.id):
... print(u, count)
<User(name='ed', fullname='Ed Jones', password='f8s7ccs')> None
<User(name='wendy', fullname='Wendy Williams', password='foobar')> None
<User(name='mary', fullname='Mary Contrary', password='xxg527')> None
<User(name='fred', fullname='Fred Flinstone', password='blah')> None
<User(name='jack', fullname='Jack Bean', password='gjffdd')> 2
子查询举例:
ResourceTableSub = session.query(
ResourceTable.id, ResourceTable.host, ResourceTable.db_name, ResourceTable.table_name).filter(
ResourceTable.host == host, ResourceTable.db_name == db, ResourceTable.table_name == table
).subquery()
ResourceSub = session.query(Resource.id).join(ResourceTableSub, ResourceTableSub.c.id == Resource.resource_type_idx).subquery()
ResourceTaskRelationSub = session.query(
ResourceTaskRelation.task_id, ResourceTaskRelation.resource_id, ResourceTaskRelation.resource_type)\\
.join(ResourceSub, ResourceSub.c.id == ResourceTaskRelation.id).subquery()
ResourceTaskRelationTaskIdSub = session.query(ResourceTaskRelationSub.c.task_id).filter(ResourceTaskRelationSub.c.resource_type == ResourceTaskRelation.TARGET).subquery()
TaskSub = session.query(Task.id).join(ResourceTaskRelationTaskIdSub, ResourceTaskRelationTaskIdSub.c.task_id == Task.id).subquery()
PlanSub = session.query(Plan.id).join(TaskSub, TaskSub.c.id == Plan.id).subquery()
JobSub = session.query(Job.scheduler_id, Job.status, Job.parent_status, Job.schedule_time).join(PlanSub, PlanSub.c.id == Job.plan_id).subquery()
JobPartSub = session.query(JobSub.c.scheduler_id, JobSub.c.status, JobSub.c.parent_status)\\
.filter(JobSub.c.schedule_time >= datetime.datetime.today().replace(hour=0, minute=0, second=0, microsecond=0)).subquery()
SchedulerSub = session.query(Scheduler.id, Scheduler.queue_id, JobPartSub.c.status, JobPartSub.c.parent_status).join(JobPartSub, JobPartSub.c.scheduler_id == Scheduler.id).subquery()
QueueSub = session.query(Queue.id, Queue.queue_name, SchedulerSub.c.status, SchedulerSub.c.parent_status).join(SchedulerSub, SchedulerSub.c.queue_id == Queue.id).subquery()
jobsAllQ = session.query(QueueSub.c.status, QueueSub.c.parent_status).filter(QueueSub.c.queue_name == 'bigqueue')
jobs = jobsAllQ.all()
Selecting Entities from Subqueries
Above, we just selected a result that included a column from a subquery. What if we wanted our subquery to map to an entity ? For this we use aliased() to associate an “alias” of a mapped class to a subquery:
SQL>>> stmt = session.query(Address).\\
... filter(Address.email_address != 'j25@yahoo.com').\\
... subquery()
>>> adalias = aliased(Address, stmt)
>>> for user, address in session.query(User, adalias).\\
... join(adalias, User.addresses):
... print(user)
... print(address)
<User(name='jack', fullname='Jack Bean', password='gjffdd')>
<Address(email_address='jack@google.com')>
执行原生SQL语句
SQLAlchemy提供给我们非常丰富的数据库查询方法,比如下面的代码中我们可以直接将查询条件以字符串的方式传入filter方法:
#String parameters
for user in session.query(User).filter("id<224").\\
order_by('id').all():
print user.id,user.name
其相当于执行了下面的SQL语句:
SELECT users.id AS users_id, users.name AS users_name,
users.fullname AS users_fullname, users.password AS users_password
FROM users
WHERE id<224 ORDER BY id
上面的例子中我们直接指定了id的值小于244,如果我们希望动态传入参数,可以使用name=:value的方式,即以一个冒号:作为参数变量的前缀,然后在params方法中给出具体的参数值:
#Named parameters
user_fred= session.query(User).filter("id<:value and name=:name").\\
params(value=224,name='Fred').order_by(User.id).one()
print user.id,user.name,user.fullname
其相当于执行了下面的SQL语句:
SELECT users.id AS users_id, users.name AS users_name, users.fullname AS users_fullname, users.password AS users_password
FROM users
WHERE id<? and name=? ORDER BY users.id
或者我们可以直接把完整的查询语句放入filter方法中:
#String based statement
for user in session.query(User).from_statement(
"select * from users where name=:name"
).params(name='ed').all():
print user.id,user.name,user.fullname
我们甚至可以不使用映射类,直接在query方法中指定查询结果字段,然后在filter方法中写入SQL语句:
#Raw SQL
for user in session.query("id","name","thenumber12").\\
from_statement(
"select id,name,12 as thenumber12 from users where name=:name"
).params(name='ed').all():
print user.id,user.name,user.thenumber12
其相当于执行以下的SQL语句:
select id,name,12 as thenumber12 from users where name=?
2011-12-16 13:11:22,619 INFO sqlalchemy.engine.base.Engine ('ed',)
要注意的是,在使用原生字符串SQL查询的时候,如果涉及到嵌套查询,或者一个SQL中出现相同的表名的情况下,假如希望按照明某个表字段进行排序,那么必须明确给出,是按照哪个表的字段排序:
#Pros and Cons of literal SQL
q=session.query(User.id,User.name)
for user in q.order_by("name").all():
print user
from sqlalchemy import func
from sqlalchemy.orm import aliased
ua=aliased(User)
q=q.from_self(User.id,User.name,ua.name).filter(User.name<ua.name).\\
filter(func.length(ua.name)!=func.length(User.name))
for user in q.order_by("name").all():
print user
for user in q.order_by(ua.name).all():
print user
for user in q.order_by(User.name).all():
print user
上面的例子中,第一句直接用”name”来排序,但是SQLAlchemy不知道是应该用哪个User表的name排序,所以后面两个方法中,我们分别给出了明确的指示,其执行SQL语句如下:
--statement 1
SELECT anon_1.users_id AS anon_1_users_id, anon_1.users_name AS anon_1_users_name, users_1.name AS users_1_name
FROM (SELECT users.id AS users_id, users.name AS users_name
FROM users) AS anon_1, users AS users_1
WHERE anon_1.users_name < users_1.name AND length(users_1.name) != length(anon_1.users_name) ORDER BY name
--statement 2
SELECT anon_1.users_id AS anon_1_users_id, anon_1.users_name AS anon_1_users_name, users_1.name AS users_1_name
FROM (SELECT users.id AS users_id, users.name AS users_name
FROM users) AS anon_1, users AS users_1
WHERE anon_1.users_name < users_1.name AND length(users_1.name) != length(anon_1.users_name) ORDER BY users_1.name
--statement 3
SELECT anon_1.users_id AS anon_1_users_id, anon_1.users_name AS anon_1_users_name, users_1.name AS users_1_name
FROM (SELECT users.id AS users_id, users.name AS users_name
FROM users) AS anon_1, users AS users_1
WHERE anon_1.users_name < users_1.name AND length(users_1.name) != length(anon_1.users_name) ORDER BY anon_1.users_name
以上是关于sqlalchemy的主要内容,如果未能解决你的问题,请参考以下文章
flask-sqlalchemy 和sqlalchemy的区别
sqlalchemy.exc.NoSuchModuleError:无法加载插件:sqlalchemy.dialects:postgres