EXAMPLE FOR PEEWEE 多姿势使用 PEEWEE
Posted piperck
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了EXAMPLE FOR PEEWEE 多姿势使用 PEEWEE相关的知识,希望对你有一定的参考价值。
使用 PEEWEE 断断续续的差不多已经三个年头了,但是没有像这次使用这么多的特性和功能,所以这次一并记录一下,需要注意的地方和一些使用细节,之后使用起来可能会更方便。
因为是使用的 SQLAchedemy 的引擎,所以增删改查的语法也很像。
查找方法
cls 这里默认指类对象了
查找单个会使用:
cls.get_one(cls.user_id == user_id)
查找批量可以使用:
cls.select().where(cls.user_id == user_id)
批量查找会返回一个数组,可以使用 for in 语句迭代他们。另外需要注意的一点是,如果需要指定查询的内容可以使用:
cls.select(cls.user_id, cls.id).where(cls.user_id == user_id)
里面也可以使用一些聚合函数例如 count sum 之类的
cls.select(cls.user_id, fn.COUNT(cls.id).alias(‘hahah‘)).where(xxxxx)
添加数据方法
User.create(username=‘Charlie‘)
类似使用这种语法,我们可以构造一个key: value 的字典,最后使用
User.create(**data)
一并写入即可。
注意,使用这种方法的返回值是有几条结果受到了影响。如果我们像知道添加了之后的主键 id 可以使用:
User.insert(username=‘Mickey‘).execute()
这种插入方法会返回 主键值。一般我们可以认为是 id 值。
批量插入方法
批量插入提供了很多种方法:
data_source = [ {‘field1‘: ‘val1-1‘, ‘field2‘: ‘val1-2‘}, {‘field1‘: ‘val2-1‘, ‘field2‘: ‘val2-2‘}, # ... ] for data_dict in data_source: MyModel.create(**data_dict)
写到一个数组中,然后遍历一个一个插。。。亲测这是最慢的,如果另外几种行得通最好不要用。
with db.atomic(): for data_dict in data_source: MyModel.create(**data_dict)
修改刚才那种方法的提交模式,让他们在同一个事务里面提交以此来提高速度。这次我的场景其实对插入速度要求很高,尝试了这种方法,但是我每次插入500条虽然速度是提高了不少,但是当我写到2000条的时候就报了一个事务的错误,在网上搜了一下也无果。所以对于量很大的插入,感觉这个方法也不是很好。。。或者欢迎来人指出的我使用姿势的问题。
另外的姿势:
# Fastest. MyModel.insert_many(data_source).execute() # Fastest using tuples and specifying the fields being inserted. fields = [MyModel.field1, MyModel.field2] data = [(‘val1-1‘, ‘val1-2‘), (‘val2-1‘, ‘val2-2‘), (‘val3-1‘, ‘val3-2‘)] MyModel.insert_many(data, fields=fields).execute() # You can, of course, wrap this in a transaction as well: with db.atomic(): MyModel.insert_many(data, fields=fields).execute()
第一种方法就是一个 list 然后里面是字典,只要你能保证这个结构,就可以进行批量插入。亲测速度不错,也没有遇到事务报错的问题。这次使用的就是这个方法,下面第二种第三种写法,感觉构造起来不是很友好。所以没有使用,有兴趣的朋友可以尝试。
修改数据方法
修改数据方法提供了普通的更新和原子更新方法。
普通的更新:
>>> today = datetime.today() >>> query = Tweet.update(is_published=True).where(Tweet.creation_date < today) >>> query.execute() # Returns the number of rows that were updated. 4
原子更新:
>>> query = Stat.update(counter=Stat.counter + 1).where(Stat.url == request.url)
>>> query.execute()
>>> subquery = Tweet.select(fn.COUNT(Tweet.id)).where(Tweet.user == User.id) >>> update = User.update(num_tweets=subquery) >>> update.execute()
还有一种 upsert 操作。如果指定键存在就执行更新操作,如果不存在执行插入操作。
下面提供了两种写法。
# Insert or update the user. The "last_login" value will be updated # regardless of whether the user existed previously. user_id = (User .replace(username=‘the-user‘, last_login=datetime.now()) .execute()) # This query is equivalent: user_id = (User .insert(username=‘the-user‘, last_login=datetime.now()) .on_conflict_replace() .execute())
另外 mysql 还提供了一种独有的语法 ON DUPLICATE KEY UPDATE 可以使用以下方法实现。
class User(Model): username = TextField(unique=True) last_login = DateTimeField(null=True) login_count = IntegerField() # Insert a new user. User.create(username=‘huey‘, login_count=0) # Simulate the user logging in. The login count and timestamp will be # either created or updated correctly. now = datetime.now() rowid = (User .insert(username=‘huey‘, last_login=now, login_count=1) .on_conflict( preserve=[User.last_login], # Use the value we would have inserted. update={User.login_count: User.login_count + 1}) .execute())
上面的例子要注意 username 申明的 unique=true 如果模型上不申明可能会报错。
删除方法
删除方法跟普通的 orm 似乎设计得很像。要么获得一个 instance 然后调用 delete() 方法进行删除,要么就是直接条件查询直接删除,来看例子。
>>> user = User.get(User.id == 1) >>> user.delete_instance() # Returns the number of rows deleted.
批量删除
>>> query = Tweet.delete().where(Tweet.creation_date < one_year_ago) >>> query.execute() # Returns the number of rows deleted. 7
联表查询方法
query = (Tag
.select()
.join(PhotoTag)
.join(Photo)
.group_by(Tag)
.having(fn.Count(Photo.id) > 5))
其实 peewee 提供的方法就非常独立和干净。你可以按照自己的需求构造足够复杂的 sql ,语句也比较清晰。需要注意的是这几点,在联表查询的时候,select()函数里面需要写上自己需要的返回值,否则联表之后会发现竟然还是只有自己 cls 的字段,就会感觉到很疑惑。
query = User .select(User, UserDatum, Card7daysRecv) .join(UserDatum, JOIN.LEFT_OUTER, on=(User.uid == UserDatum.user_id)) .join(Card7daysRecv, JOIN.LEFT_OUTER, on=(User.uid == Card7daysRecv.receive_uid)).order_by(User.create_time).limit(limit).offset(offset)
另外在遇到复杂查询的时候有个不可缺少的工具就是打印以下自己的 orm 究竟构造了什么语句。可以使用 query.sql() 方便的看到。
另外谈下在 model 构造上的时候遇到的一些问题。
如果要往模型里面插数据,建议最好还是指定好 主键和默认值,以免读取模型的时候报错。需要的字段一定要在模型里面写上,否则拿的时候会发现没有这个字段。
class HdNewUserInfo(AliyunModel): class Meta: db_table = ‘hd_new_user_info‘ _version = 0 user_id = CharField(64, default=‘‘, primary_key=True) phone = CharField(32, default=‘‘) phone_province = CharField(64, default=‘‘) phone_city = CharField(64, default=‘‘) phone_community_id = CharField(32, default=‘‘) name = CharField(64, default=‘‘) head_img = CharField(1024, default=‘‘) student_no = CharField(36, default=‘‘)
另外还有一个 database gone away 的问题。注意构造好自己的 db
class Off(Model): class Meta: database = db.offline_db_08_yanzhi @classmethod def get_one(cls, *query, **kwargs): try: return cls.get(*query, **kwargs) except DoesNotExist: return None
初始化自己的 db。并且构造一个可以重连的 retrydb
class RetryOperationalError(object): def execute_sql(self, sql, params=None, commit=True): try: cursor = super(RetryOperationalError, self).execute_sql( sql, params, commit) except OperationalError: if not self.is_closed(): self.close() with __exception_wrapper__: cursor = self.cursor() cursor.execute(sql, params or ()) if commit and not self.in_transaction(): self.commit() return cursor class MyRetryDB(RetryOperationalError, MySQLDatabase): pass
这次玩数据暂时就遇到这些问题,以后再来补充。
Reference:
https://github.com/coleifer/peewee/issues/114 Print SQL queries
http://docs.peewee-orm.com/en/latest/peewee/query_examples.html#retrieve-the-start-times-of-members-bookings 联表查 example
https://stackoverflow.com/questions/15559468/why-is-peewee-including-the-id-column-into-the-mysql-select-query column into id 1 报错解决
以上是关于EXAMPLE FOR PEEWEE 多姿势使用 PEEWEE的主要内容,如果未能解决你的问题,请参考以下文章