python SQLAlchemy

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了python SQLAlchemy相关的知识,希望对你有一定的参考价值。

定义表

定义数据表,才能进行sql表达式的操作,毕竟sql表达式的表的确定,是sqlalchemy制定的,如果数据库已经存在了数据表还需要定义么?当然,这里其实是一个映射关系,如果不指定,查询表达式就不知道是附加在那个表的操作,当然定义的时候,注意表名和字段名,代码和数据的必须保持一致。定义好之后,就能创建数据表,一旦创建了,再次运行创建的代码,数据库是不会创建的。


from sqlalchemy import create_engine, Table, Column, Integer, String, MetaData, ForeignKey
# 连接数据库 
engine = create_engine("mysql://root:@localhost:3306/webpy?charset=utf8",encoding="utf-8", echo=True)
# 获取元数据
metadata = MetaData()
# 定义表
user = Table(‘user‘, metadata,        Column(‘id‘, Integer, primary_key=True),        Column(‘name‘, String(20)),        Column(‘fullname‘, String(40)),
    )

address = Table(‘address‘, metadata,        Column(‘id‘, Integer, primary_key=True),        Column(‘user_id‘, None, ForeignKey(‘user.id‘)),        Column(‘email‘, String(60), nullable=False)
    )
# 创建数据表,如果数据表存在,则忽视
metadata.create_all(engine)
# 获取数据库连接
conn = engine.connect()
插入 insert

有了数据表和连接对象,对应数据库操作就简单了。

>>> i = user.insert()   # 使用查询>>> i  
<sqlalchemy.sql.dml.Insert object at 0x0000000002637748>>>> print i  # 内部构件的sql语句INSERT INTO "user" (id, name, fullname) VALUES (:id, :name, :fullname)>>> u = dict(name=‘jack‘, fullname=‘jack Jone‘)>>> r = conn.execute(i, **u)  # 执行查询,第一个为查询对象,第二个参数为一个插入数据字典,如果插入的是多个对象,就把对象字典放在列表里面>>> r
<sqlalchemy.engine.result.ResultProxy object at 0x0000000002EF9390>>>> r.inserted_primary_key  # 返回插入行 主键 id[4L]>>> addresses
[{‘user_id‘: 1, ‘email‘: ‘[email protected]‘}, {‘user_id‘: 1, ‘email‘: ‘[email protected]‘}, {‘user_id‘: 2, ‘email‘: ‘[email protected]‘}, {‘user_id‘: 2, ‘email‘: ‘[email protected]‘}]>>> i = address.insert()>>> r = conn.execute(i, addresses)   # 插入多条记录>>> r
<sqlalchemy.engine.result.ResultProxy object at 0x0000000002EB5080>>>> r.rowcount   #返回影响的行数4L>>> i = user.insert().values(name=‘tom‘, fullname=‘tom Jim‘)>>> i.compile()
<sqlalchemy.sql.compiler.SQLCompiler object at 0x0000000002F6F390>>>> print i.compile()INSERT INTO "user" (name, fullname) VALUES (:name, :fullname)>>> print i.compile().params
{‘fullname‘: ‘tom Jim‘, ‘name‘: ‘tom‘}>>> r = conn.execute(i)>>> r.rowcount1L
查询 select

查询方式很灵活,多数时候使用 sqlalchemy.sql 下面的 select方法

from sqlalchemy import select 

>>> s = select([user])  # 查询 user表>>> s
<sqlalchemy.sql.selectable.Select at 0x25a7748; Select object>>>> print sSELECT "user".id, "user".name, "user".fullname 
FROM "user"


如果需要查询自定义的字段,可是使用 user 的cloumn 对象,例如

>>> user.c  # 表 user 的字段column对象<sqlalchemy.sql.base.ImmutableColumnCollection object at 0x0000000002E804A8>>>> print user.c
[‘user.id‘, ‘user.name‘, ‘user.fullname‘]>>> s = select([user.c.name,user.c.fullname])>>> r = conn.execute(s)>>> r
<sqlalchemy.engine.result.ResultProxy object at 0x00000000025A7748>>>> r.rowcount  # 影响的行数5L>>> ru = r.fetchall()  
>>> ru
[(u‘hello‘, u‘hello world‘), (u‘Jack‘, u‘Jack Jone‘), (u‘Jack‘, u‘Jack Jone‘), (u‘jack‘, u‘jack Jone‘), (u‘tom‘, u‘tom Jim‘)]>>> r  
<sqlalchemy.engine.result.ResultProxy object at 0x00000000025A7748>>>> r.closed  # 只要 r.fetchall() 之后,就会自动关闭 ResultProxy 对象True


同时查询两个表

>>> s = select([user.c.name, address.c.user_id]).where(user.c.id==address.c.user_id)   # 使用了字段和字段比较的条件>>> s
<sqlalchemy.sql.selectable.Select at 0x2f03390; Select object>>>> print sSELECT "user".name, address.user_id 
FROM "user", address 
WHERE "user".id = address.user_id

操作符

>>> print user.c.id == address.c.user_id  # 返回一个编译的字符串"user".id = address.user_id>>> print user.c.id == 7"user".id = :id_1   # 编译成为带参数的sql 语句片段字符串>>> print user.c.id != 7"user".id != :id_1>>> print user.c.id > 7"user".id > :id_1>>> print user.c.id == None"user".id IS NULL>>> print user.c.id + address.c.id   # 使用两个整形的变成 +"user".id + address.id>>> print user.c.name + address.c.email  # 使用两个字符串 变成 ||"user".name || address.email
操作连接

这里的连接指条件查询的时候,逻辑运算符的连接,即 and or 和 not

print and_(
        user.c.name.like(‘j%‘),
        user.c.id == address.c.user_id,
        or_(
            address.c.email == ‘[email protected]‘,
            address.c.email == ‘[email protected]‘
        ),        not_(user.c.id>5))"user".name LIKE :name_1 AND "user".id = address.user_id AND (address.email = :email_1 OR address.email = :email_2) AND "user".id <= :id_1

得到的结果为 编译的sql语句片段,下面看一个完整的例子

>>> se_sql = [(user.c.fullname +", " + address.c.email).label(‘title‘)]>>> wh_sql = and_(
              user.c.id == address.c.user_id,
              user.c.name.between(‘m‘, ‘z‘),
              or_(
                  address.c.email.like(‘%@aol.com‘),
                  address.c.email.like(‘%@msn.com‘)
              )
         )>>> print wh_sql"user".id = address.user_id AND "user".name BETWEEN :name_1 AND :name_2 AND (address.email LIKE :email_1 OR address.email LIKE :email_2)>>> s = select(se_sql).where(wh_sql)>>> print sSELECT "user".fullname || :fullname_1 || address.email AS title 
FROM "user", address 
WHERE "user".id = address.user_id AND "user".name BETWEEN :name_1 AND :name_2 AND (address.email LIKE :email_1 OR address.email LIKE :email_2)>>> r = conn.execute(s)>>> r.fetchall()

使用 raw sql 方式

遇到负责的sql语句的时候,可以使用 sqlalchemy.sql 下面的 text 函数。将字符串的sql语句包装编译成为 execute执行需要的sql对象。例如:、

>>> text_sql = "SELECT id, name, fullname FROM user WHERE id=:id"  # 原始sql语句,参数用( :value)表示>>> s = text(text_sql)>>> print s
SELECT id, name, fullname FROM user WHERE id=:id>>> s
<sqlalchemy.sql.elements.TextClause object at 0x0000000002587668>>>> conn.execute(s, id=3).fetchall()   # id=3 传递:id参数[(3L, u‘Jack‘, u‘Jack Jone‘)]
连接 join

连接有join 和 outejoin 两个方法,join 有两个参数,第一个是join 的表,第二个是on 的条件,joing之后必须要配合select_from 方法

>>> print user.join(address)"user" JOIN address ON "user".id = address.user_id   # 因为开启了外键 ,所以join 能只能识别 on 条件>>> print user.join(address, address.c.user_id==user.c.id)  # 手动指定 on 条件"user" JOIN address ON address.user_id = "user".id>>> s = select([user.c.name, address.c.email]).select_from(user.join(address, user.c.id==address.c.user_id))  # 被jion的sql语句需要用 select_from方法配合>>> s
<sqlalchemy.sql.selectable.Select at 0x2eb63c8; Select object>>>> print s
SELECT "user".name, address.email 
FROM "user" JOIN address ON "user".id = address.user_id>>> conn.execute(s).fetchall()
[(u‘hello‘, u‘[email protected]‘), (u‘hello‘, u‘[email protected]‘), (u‘hello‘, u‘[email protected]‘), (u‘hello‘, u‘[email protected]‘), (u‘Jack‘, u‘[email protected]‘), (u‘Jack‘, u‘[email protected]‘), (u‘Jack‘, u‘[email protected]‘), (u‘Jack‘, u‘[email protected]‘)]
排序 分组 分页

排序使用 order_by 方法,分组是 group_by ,分页自然就是limit 和 offset两个方法配合

>>> s = select([user.c.name]).order_by(user.c.name)  # order_by>>> print s
SELECT "user".name 
FROM "user" ORDER BY "user".name>>> s = select([user]).order_by(user.c.name.desc())>>> print s
SELECT "user".id, "user".name, "user".fullname 
FROM "user" ORDER BY "user".name DESC>>> s = select([user]).group_by(user.c.name)       # group_by>>> print s
SELECT "user".id, "user".name, "user".fullname 
FROM "user" GROUP BY "user".name>>> s = select([user]).order_by(user.c.name.desc()).limit(1).offset(3)  # limit(1).offset(3)>>> print s
SELECT "user".id, "user".name, "user".fullname 
FROM "user" ORDER BY "user".name DESC
 LIMIT :param_1 OFFSET :param_2
[(4L, u‘jack‘, u‘jack Jone‘)]
更新 update

前面都是一些查询,更新和插入的方法很像,都是 表下面的方法,不同的是,update 多了一个 where 方法 用来选择过滤

>>> s = user.update()>>> print sUPDATE "user" SET id=:id, name=:name, fullname=:fullname>>> s = user.update().values(fullname=user.c.name)           # values 指定了更新的字段>>> print sUPDATE "user" SET fullname="user".name>>> s = user.update().where(user.c.name == ‘jack‘).values(name=‘ed‘)  # where 进行选择过滤>>> print s 
UPDATE "user" SET name=:name WHERE "user".name = :name_1>>> r = conn.execute(s)>>> print r.rowcount         # 影响行数3

还有一个高级用法,就是一次命令执行多个记录的更新,需要用到 bindparam 方法

 from sqlalchemy import bindparam

>>> s = user.update().where(user.c.name==bindparam(‘oldname‘)).values(name=bindparam(‘newname‘))   # oldname 与下面的传入的从拿书进行绑定,newname也一样>>> print sUPDATE "user" SET name=:newname WHERE "user".name = :oldname>>> u = [{‘oldname‘:‘hello‘, ‘newname‘:‘edd‘},
{‘oldname‘:‘ed‘, ‘newname‘:‘mary‘},
{‘oldname‘:‘tom‘, ‘newname‘:‘jake‘}]>>> r = conn.execute(s, u)>>> r.rowcount5L
删除 delete

删除比较容易,调用 delete方法即可,不加 where 过滤,则删除所有数据,但是不会drop掉表,等于清空了数据表

>>> r = conn.execute(address.delete()) # 清空表>>> print r
<sqlalchemy.engine.result.ResultProxy object at 0x0000000002EAF550>>>> r.rowcount8L>>> r = conn.execute(users.delete().where(users.c.name > ‘m‘)) # 删除记录>>> r.rowcount3L


以上是关于python SQLAlchemy的主要内容,如果未能解决你的问题,请参考以下文章

python SQLAlchemy 缓存问题

from flask_sqlalchemy import SQLAlchemy 无法创建数据库表,代码无报错,代码如下,python版本是3.5

Python SQLAlchemy入门教程

sql 可以运行,但是当你用 sqlalchemy 运行 python 代码时你失败了

Python 之 sqlalchemy查询数据

从 python 使用 TSQL(SQL Server 上的 mssql)时,如何为 SQLAlchemy 自动生成 ORM 代码?