Django:分页器 + 原始 SQL 查询
Posted
技术标签:
【中文标题】Django:分页器 + 原始 SQL 查询【英文标题】:Django: Paginator + raw SQL query 【发布时间】:2011-02-01 17:07:10 【问题描述】:我在我的网站上到处都在使用 Django Paginator,甚至写了一个特殊的模板标签,以使其更方便。但是现在我遇到了一个状态,我需要进行一个复杂的自定义原始 SQL 查询,如果没有 LIMIT
将返回大约 100K 记录。
如何将 Django Pagintor 与自定义查询一起使用?
我的问题的简化示例:
我的模特:
class PersonManager(models.Manager):
def complicated_list(self):
from django.db import connection
#Real query is much more complex
cursor.execute("""SELECT * FROM `myapp_person`""");
result_list = []
for row in cursor.fetchall():
result_list.append(row[0]);
return result_list
class Person(models.Model):
name = models.CharField(max_length=255);
surname = models.CharField(max_length=255);
age = models.IntegerField();
objects = PersonManager();
我在 Django ORM 中使用分页的方式:
all_objects = Person.objects.all();
paginator = Paginator(all_objects, 10);
try:
page = int(request.GET.get('page', '1'))
except ValueError:
page = 1
try:
persons = paginator.page(page)
except (EmptyPage, InvalidPage):
persons = paginator.page(paginator.num_pages)
这样,Django 变得非常聪明,并在执行查询时将LIMIT
添加到查询中。但是当我使用自定义管理器时:
all_objects = Person.objects.complicated_list();
选择所有数据,然后才切片python列表,这非常慢。如何让我的自定义管理器表现得像内置的一样?
【问题讨论】:
在 Python 中,您不应该在您的 Person 类中随时使用空格。 【参考方案1】:查看 Paginator 的源代码,尤其是 page() function,我认为只需在您身边实现 slicing,并将其转换为 SQL 查询中的相关 LIMIT 子句即可。您可能还需要添加一些缓存,但这开始看起来像 QuerySet,所以也许您可以做其他事情:
您可以使用 CREATE VIEW myview AS [your query] 创建数据库 VIEW; 为该视图添加 Django 模型,Meta: managed=False 像使用任何其他模型一样使用该模型,包括对其查询集进行切片 - 这意味着它非常适合与 Paginator 一起使用(供您参考 - 我已经使用这种方法很长时间了,即使与伪造 m2m 中间表的 VIEW 存在复杂的多对多关系。)
【讨论】:
哇,太酷了 :) 感谢您的回复,但我想我看错了方向。我问了另一个问题:***.com/questions/2532686/…Guess 自定义管理器在这里不是最好的。【参考方案2】:这是我创建的 RawPaginator
类,它覆盖 Paginator
以处理原始查询。它需要一个额外的参数count
,它是您的查询的总数。它不会对object_list
进行切片,因为您必须通过OFFSET
和LIMIT
在原始查询中进行分页。
from django.core.paginator import Paginator
class RawPaginator(Paginator):
def __init__(self, object_list, per_page, count, **kwargs):
super().__init__(object_list, per_page, **kwargs)
self.raw_count = count
def _get_count(self):
return self.raw_count
count = property(_get_count)
def page(self, number):
number = self.validate_number(number)
return self._get_page(self.object_list, number, self)
【讨论】:
非常好的解决方案。谢谢。 当你调用新的 Paginator 类时,count
作为参数时会取什么值?【参考方案3】:
我不了解 Django 1.1,但如果您可以等待 1.2(不应该再那么长),您可以使用 objects.raw()
,如 this article 和 development documentation 中所述。
否则,如果查询不是太复杂,也许使用extra
clause就足够了。
【讨论】:
感谢您提供有用的提示。但我想这对我的情况没有帮助 您仍然无法获取原始查询结果的计数。看起来您实际上必须执行 list(objects.raw()) 才能使其与分页器一起使用。感谢 ***.com/questions/2317452/django-count-rawqueryset 提供的信息。【参考方案4】:我还想插入我写的PaginatedRawQuerySet
(请将此视为 alpha 版本)。这为原始查询集添加了切片能力。请参考to this answer——这是我为另一个具有类似要求的问题而写的——以了解它是如何工作的(尤其是最后的“注意事项”部分)。
from django.db import models
from django.db.models import sql
from django.db.models.query import RawQuerySet
class PaginatedRawQuerySet(RawQuerySet):
def __init__(self, raw_query, **kwargs):
super(PaginatedRawQuerySet, self).__init__(raw_query, **kwargs)
self.original_raw_query = raw_query
self._result_cache = None
def __getitem__(self, k):
"""
Retrieves an item or slice from the set of results.
"""
if not isinstance(k, (slice, int,)):
raise TypeError
assert ((not isinstance(k, slice) and (k >= 0)) or
(isinstance(k, slice) and (k.start is None or k.start >= 0) and
(k.stop is None or k.stop >= 0))), \
"Negative indexing is not supported."
if self._result_cache is not None:
return self._result_cache[k]
if isinstance(k, slice):
qs = self._clone()
if k.start is not None:
start = int(k.start)
else:
start = None
if k.stop is not None:
stop = int(k.stop)
else:
stop = None
qs.set_limits(start, stop)
return qs
qs = self._clone()
qs.set_limits(k, k + 1)
return list(qs)[0]
def __iter__(self):
self._fetch_all()
return iter(self._result_cache)
def count(self):
if self._result_cache is not None:
return len(self._result_cache)
return self.model.objects.count()
def set_limits(self, start, stop):
limit_offset = ''
new_params = tuple()
if start is None:
start = 0
elif start > 0:
new_params += (start,)
limit_offset = ' OFFSET %s'
if stop is not None:
new_params = (stop - start,) + new_params
limit_offset = 'LIMIT %s' + limit_offset
self.params = self.params + new_params
self.raw_query = self.original_raw_query + limit_offset
self.query = sql.RawQuery(sql=self.raw_query, using=self.db, params=self.params)
def _fetch_all(self):
if self._result_cache is None:
self._result_cache = list(super().__iter__())
def __repr__(self):
return '<%s: %s>' % (self.__class__.__name__, self.model.__name__)
def __len__(self):
self._fetch_all()
return len(self._result_cache)
def _clone(self):
clone = self.__class__(raw_query=self.raw_query, model=self.model, using=self._db, hints=self._hints,
query=self.query, params=self.params, translations=self.translations)
return clone
【讨论】:
以上是关于Django:分页器 + 原始 SQL 查询的主要内容,如果未能解决你的问题,请参考以下文章