Django 3.1:带有异步生成器的 StreamingHttpResponse

Posted

技术标签:

【中文标题】Django 3.1:带有异步生成器的 StreamingHttpResponse【英文标题】:Django 3.1: StreamingHttpResponse with an async generator 【发布时间】:2020-11-28 16:18:12 【问题描述】:

Documentation for Django 3.1 谈到异步视图:

主要好处是能够在不使用 Python 线程的情况下为数百个连接提供服务。这允许您使用慢速流式传输、长轮询和其他令人兴奋的响应类型。

我相信“慢速流”意味着我们可以实现 SSE 视图,而无需为每个客户端独占一个线程,因此我尝试绘制一个简单的视图,如下所示:

async def stream(request):

    async def event_stream():
        while True:
            yield 'data: The server time is: %s\n\n' % datetime.datetime.now()
            await asyncio.sleep(1)

    return StreamingHttpResponse(event_stream(), content_type='text/event-stream')

(注意:我改编了this response的代码)

不幸的是,当调用此视图时,它会引发以下异常:

Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/asgiref/sync.py", line 330, in thread_handler
    raise exc_info[1]
  File "/usr/local/lib/python3.7/site-packages/django/core/handlers/exception.py", line 38, in inner
    response = await get_response(request)
  File "/usr/local/lib/python3.7/site-packages/django/core/handlers/base.py", line 231, in _get_response_async
    response = await wrapped_callback(request, *callback_args, **callback_kwargs)
  File "./chat/views.py", line 144, in watch
    return StreamingHttpResponse(event_stream(), content_type='text/event-stream')
  File "/usr/local/lib/python3.7/site-packages/django/http/response.py", line 367, in __init__
    self.streaming_content = streaming_content
  File "/usr/local/lib/python3.7/site-packages/django/http/response.py", line 382, in streaming_content
    self._set_streaming_content(value)
  File "/usr/local/lib/python3.7/site-packages/django/http/response.py", line 386, in _set_streaming_content
    self._iterator = iter(value)
TypeError: 'async_generator' object is not iterable

对我来说,这表明 StreamingHttpResponse 目前不支持异步生成器。

我尝试将StreamingHttpResponse 修改为使用async for,但我做不到很多。

知道我该怎么做吗?

【问题讨论】:

我 99% 确定它不受支持,因为响应对象中没有任何内容是 awaitable @BenoitBlanchon 那么你的实际目标是什么?是否有一些页面会逐块生成响应(用于大响应),或者有能力在其他事件发生时异步发送 SSE(服务器端事件)?还是完全不同的东西?因为我从您的问题中看到了固定的StreamingHttpResponse,但您仍然没有接受该回复。 我的目标是发送 SSE,我会接受第一个有效的响应。 【参考方案1】:

老实说,Django 本身并不支持它,但我有一个使用 Daphne 的解决方案(也在 Django 频道中使用)。

创建了自己的StreamingHttpResponse 类,该类能够从异步方法中检索数据流并将其提供给 Django 的同步部分。

import asyncio

# By design asyncio does not allow its event loop to be nested.
# Trying to do so will give the error "RuntimeError: This event loop is already running".
# This library solves that problem.
import nest_asyncio

from django.http.response import StreamingHttpResponse


class AsyncStreamingHttpResponse(StreamingHttpResponse):

    def __init__(self, streaming_content=(), *args, **kwargs):
        sync_streaming_content = self.get_sync_iterator(streaming_content)
        super().__init__(streaming_content=sync_streaming_content, *args, **kwargs)

    @staticmethod
    async def convert_async_iterable(stream):
        """Accepts async_generator and async_iterator"""
        return iter([chunk async for chunk in stream])

    def get_sync_iterator(self, async_iterable):
        nest_asyncio.apply()

        loop = asyncio.new_event_loop()
        asyncio.set_event_loop(loop)
        result = loop.run_until_complete(self.convert_async_iterable(async_iterable))
        return result

另外,您需要使用Daphne 运行您的 Django 网络服务器,以正确支持服务器发送事件 (SSE)。它由“Django Software Foundation”官方支持,语法与gunicorn相似,但使用asgi.py而不是wsgi.py

要使用它 - 您可以使用以下方式安装:pip install daphne

并从以下位置更改命令:python manage.py runserver 类似于:daphne -b 0.0.0.0 -p 8000 sse_demo.asgi:application

不确定它是否适用于gunicorn

如果您还有其他问题,请告诉我。

【讨论】:

不好意思问一下,SSE代表什么? 在代码中这只是一个命名(来自作者问题的想法),可以更改。但我相信它是关于Server Sent Events。将类从 SSEResponse 重命名为 AsyncStreamingHttpResponse 以便其他人更清楚。 谢谢,@wowkin2。我尝试使用这个类,但得到了ValueError: loop argument must agree with Future。 Here is the source code of the view;你能查一下吗? @BenoitBlanchon 忘了提及,您需要使用daphne 运行 django 才能使用异步和服务器发送事件。添加了更多详细信息以回答,但这里是要检查的命令:daphne -b 0.0.0.0 -p 8000 sse_demo.asgi:application 我想我已经很清楚了,看看这个页面的顶部:“我将 200 声望授予第一个在 Django 中找到使用异步视图进行 SSE 的正确方法的人3.1+"。【参考方案2】:

另一种实现 SSE 的方法是使用特殊库 django-eventstream:

将以下内容添加到将使用数据的 html 页面:

<script src="% static 'django_eventstream/eventsource.min.js' %"></script>
<script src="% static 'django_eventstream/reconnecting-eventsource.js' %"></script>

var es = new ReconnectingEventSource('/events/');

es.addEventListener('message', function (e) 
    console.log(e.data);
, false);

es.addEventListener('stream-reset', function (e) 
    // ... client fell behind, reinitialize ...
, false);

对于后端,您需要properly setup Django,稍后您将能够在需要执行服务器端事件 (SSE) 的任何视图/任务/信号/方法中调用以下方法:

添加以下将产生数据(事件)的视图:

# from django_eventstream import send_event

send_event('test', 'message', 'text': 'hello world')

【讨论】:

django-eventstream 依赖于 Django Channels,不利用 Django 3.1 的新异步视图。 @BenoitBlanchon 同意,无论如何你可以从任何异步视图中调用它。或者你可以使用应该使用它的纯 Django Channels。【参考方案3】:

看来您必须使用 django-channel 之类的东西:

Channels 增强 Django 以带来 WebSocket、长轮询 HTTP、任务 使用熟悉的 Django 为您的代码卸载和其他异步支持 设计模式和灵活的底层框架,让您不 只自定义行为,还要为自己的行为编写支持 协议和需求。

【讨论】:

感谢@Adrien 的回复,但这并不能回答问题。我知道 Django 频道,但我想知道既然 Django 3.1 支持异步视图,我们是否可以避免它。

以上是关于Django 3.1:带有异步生成器的 StreamingHttpResponse的主要内容,如果未能解决你的问题,请参考以下文章

event_loop 在 Django>=3.1 异步视图中存在多长时间

Django 异步视图中的多个并发请求

Mdb 与 EJB 3.1 异步方法

Django

xadmin引入celery执行异步任务与定时任务

Django:池化 MySQL 数据库连接