Django:分页器 + 原始 SQL 查询

Posted

技术标签:

【中文标题】Django:分页器 + 原始 SQL 查询【英文标题】:Django: Paginator + raw SQL query 【发布时间】:2011-02-01 17:07:10 【问题描述】:

我在我的网站上到处都在使用 Django Paginator,甚至写了一个特殊的模板标签,以使其更方便。但是现在我遇到了一个状态,我需要进行一个复杂的自定义原始 SQL 查询,如果没有 LIMIT 将返回大约 100K 记录。

如何将 Django Pagintor 与自定义查询一起使用?

我的问题的简化示例:

我的模特:

class PersonManager(models.Manager):

    def complicated_list(self):

        from django.db import connection

        #Real query is much more complex        
        cursor.execute("""SELECT * FROM `myapp_person`""");  

        result_list = []

        for row in cursor.fetchall():
            result_list.append(row[0]); 

        return result_list


class Person(models.Model):
    name      = models.CharField(max_length=255);
    surname   = models.CharField(max_length=255);     
    age       = models.IntegerField(); 

    objects   = PersonManager();

我在 Django ORM 中使用分页的方式:

all_objects = Person.objects.all();

paginator = Paginator(all_objects, 10);

try:
    page = int(request.GET.get('page', '1'))
except ValueError:
    page = 1

try:
    persons = paginator.page(page)
except (EmptyPage, InvalidPage):
    persons = paginator.page(paginator.num_pages)

这样,Django 变得非常聪明,并在执行查询时将LIMIT 添加到查询中。但是当我使用自定义管理器时:

all_objects = Person.objects.complicated_list();

选择所有数据,然后才切片python列表,这非常慢。如何让我的自定义管理器表现得像内置的一样?

【问题讨论】:

在 Python 中,您不应该在您的 Person 类中随时使用空格。 【参考方案1】:

查看 Paginator 的源代码,尤其是 page() function,我认为只需在您身边实现 slicing,并将其转换为 SQL 查询中的相关 LIMIT 子句即可。您可能还需要添加一些缓存,但这开始看起来像 QuerySet,所以也许您可以做其他事情:

您可以使用 CREATE VIEW myview AS [your query] 创建数据库 VIEW; 为该视图添加 Django 模型,Meta: managed=False 像使用任何其他模型一样使用该模型,包括对其查询集进行切片 - 这意味着它非常适合与 Paginator 一起使用

(供您参考 - 我已经使用这种方法很长时间了,即使与伪造 m2m 中间表的 VIEW 存在复杂的多对多关系。)

【讨论】:

哇,太酷了 :) 感谢您的回复,但我想我看错了方向。我问了另一个问题:***.com/questions/2532686/…Guess 自定义管理器在这里不是最好的。【参考方案2】:

这是我创建的 RawPaginator 类,它覆盖 Paginator 以处理原始查询。它需要一个额外的参数count,它是您的查询的总数。它不会对object_list 进行切片,因为您必须通过OFFSETLIMIT 在原始查询中进行分页。

from django.core.paginator import Paginator

class RawPaginator(Paginator):
    def __init__(self, object_list, per_page, count, **kwargs):
        super().__init__(object_list, per_page, **kwargs)
        self.raw_count = count

    def _get_count(self):
        return self.raw_count
    count = property(_get_count)

    def page(self, number):
        number = self.validate_number(number)
        return self._get_page(self.object_list, number, self)

【讨论】:

非常好的解决方案。谢谢。 当你调用新的 Paginator 类时,count 作为参数时会取什么值?【参考方案3】:

我不了解 Django 1.1,但如果您可以等待 1.2(不应该再那么长),您可以使用 objects.raw(),如 this article 和 development documentation 中所述。

否则,如果查询不是太复杂,也许使用extra clause就足够了。

【讨论】:

感谢您提供有用的提示。但我想这对我的情况没有帮助 您仍然无法获取原始查询结果的计数。看起来您实际上必须执行 list(objects.raw()) 才能使其与分页器一起使用。感谢 ***.com/questions/2317452/django-count-rawqueryset 提供的信息。【参考方案4】:

我还想插入我写的PaginatedRawQuerySet(请将此视为 alpha 版本)。这为原始查询集添加了切片能力。请参考to this answer——这是我为另一个具有类似要求的问题而写的——以了解它是如何工作的(尤其是最后的“注意事项”部分)。

from django.db import models
from django.db.models import sql
from django.db.models.query import RawQuerySet


class PaginatedRawQuerySet(RawQuerySet):
    def __init__(self, raw_query, **kwargs):
        super(PaginatedRawQuerySet, self).__init__(raw_query, **kwargs)
        self.original_raw_query = raw_query
        self._result_cache = None

    def __getitem__(self, k):
        """
        Retrieves an item or slice from the set of results.
        """
        if not isinstance(k, (slice, int,)):
            raise TypeError
        assert ((not isinstance(k, slice) and (k >= 0)) or
                (isinstance(k, slice) and (k.start is None or k.start >= 0) and
                 (k.stop is None or k.stop >= 0))), \
            "Negative indexing is not supported."

        if self._result_cache is not None:
            return self._result_cache[k]

        if isinstance(k, slice):
            qs = self._clone()
            if k.start is not None:
                start = int(k.start)
            else:
                start = None
            if k.stop is not None:
                stop = int(k.stop)
            else:
                stop = None
            qs.set_limits(start, stop)
            return qs

        qs = self._clone()
        qs.set_limits(k, k + 1)
        return list(qs)[0]

    def __iter__(self):
        self._fetch_all()
        return iter(self._result_cache)

    def count(self):
        if self._result_cache is not None:
            return len(self._result_cache)

        return self.model.objects.count()

    def set_limits(self, start, stop):
        limit_offset = ''

        new_params = tuple()
        if start is None:
            start = 0
        elif start > 0:
            new_params += (start,)
            limit_offset = ' OFFSET %s'
        if stop is not None:
            new_params = (stop - start,) + new_params
            limit_offset = 'LIMIT %s' + limit_offset

        self.params = self.params + new_params
        self.raw_query = self.original_raw_query + limit_offset
        self.query = sql.RawQuery(sql=self.raw_query, using=self.db, params=self.params)

    def _fetch_all(self):
        if self._result_cache is None:
            self._result_cache = list(super().__iter__())

    def __repr__(self):
        return '<%s: %s>' % (self.__class__.__name__, self.model.__name__)

    def __len__(self):
        self._fetch_all()
        return len(self._result_cache)

    def _clone(self):
        clone = self.__class__(raw_query=self.raw_query, model=self.model, using=self._db, hints=self._hints,
                               query=self.query, params=self.params, translations=self.translations)
        return clone

【讨论】:

以上是关于Django:分页器 + 原始 SQL 查询的主要内容,如果未能解决你的问题,请参考以下文章

在Django REST List API视图中对原始SQL查询进行分页的最佳方法是什么?

Django自定义分页器

Django中的分页器以及手绘验证码

Django自定义分页器

django分页后查询丢失

drf分页器