DRF 中的异步请求错误

Posted

技术标签:

【中文标题】DRF 中的异步请求错误【英文标题】:Error with asynchronous request in DRF 【发布时间】:2018-05-16 14:04:02 【问题描述】:

我需要满足对两个服务的请求。 代码如下所示:

async def post1(data):
    response = await aiohttp.request('post', 'http://', json=data)
    json_response = await response.json()
    response.close()
    return json_response

async def get2():
    response = await aiohttp.request('get', 'http://')
    json_response = await response.json()
    response.close()
    return json_response

async def asynchronous(parameters):

    task1 = post1(parameters['data'])
    task2 = get2()

    result_list = []
    for body in await asyncio.gather(task1, task2):
        result_list.append(body)
    return result_list

如果我在本地运行代码,就可以了。代码如下所示:

if __name__ == "__main__":
    ioloop = asyncio.get_event_loop()
    parameters = 'data': data
    result = ioloop.run_until_complete(asynchronous(parameters))
    ioloop.close()
    print(result)

我得到了正确的结果。但是如果我尝试从 DRF 方法执行代码,就会出现错误:

TypeError: object _SessionRequestContextManager can't be used in “等待”表达式

我运行的示例代码:

 .....
 class MyViewSet(GenericAPIView):
    def post(self, request, *args, **kwargs):
        serializer = self.get_serializer(data=request.data)
        serializer.is_valid(raise_exception=True)
        ......

        ioloop = asyncio.get_event_loop()
        result = ioloop.run_until_complete(asynchronous(serializer.data)) # <<<<< error here
        ioloop.close()

        ......
        return Response(serializer.data, status=status.HTTP_201_CREATED)

请告诉我可能是什么问题?

【问题讨论】:

【参考方案1】:

aiohttp.request 返回的对象不能等待,它必须用作异步上下文管理器。这段代码:

response = await aiohttp.request('post', 'http://', json=data)
json_response = await response.json()
response.close()

需要改为:

async with aiohttp.request('post', 'http://', json=data) as response:
    json_response = await response.json()

有关更多使用示例,请参阅documentation。

也许您在运行 DRF 的服务器上有不同的 aiohttp 版本,这就是它在本地工作并在 DRF 下失败的原因。

【讨论】:

【参考方案2】:

试试https://github.com/Skorpyon/aiorestframework

创建一些用于身份验证的中间件:

import re
from xml.etree import ElementTree as etree
from json.decoder import JSONDecodeError
from multidict import MultiDict, MultiDictProxy

from aiohttp import web
from aiohttp.hdrs import (
    METH_POST, METH_PUT, METH_PATCH, METH_DELETE
)

from aiorestframework import exceptions
from aiorestframework omport serializers
from aiorestframework import Response
from aiorestframework.views import BaseViewSet
from aiorestframework.permissions import set_permissions, AllowAny
from aiorestframework.app import APIApplication

from my_project.settings import Settings  # Generated by aiohttp-devtools


TOKEN_RE = re.compile(r'^\s*BEARER\s,3(\S64)\s*$')

async def token_authentication(app, handler):
    """
    Authorization middleware
    Catching Authorization: BEARER <token> from request headers
    Found user in Tarantool by token and bind User or AnonymousUser to request
    """
    async def middleware_handler(request):
        # Check that `Authorization` header exists
        if 'authorization' in request.headers:
            authorization = request.headers['authorization']
            # Check matches in header value
            match = TOKEN_RE.match(authorization)
            if not match:
                setattr(request, 'user', AnonymousUser())
                return await handler(request)
            else:
                token = match[1]
        elif 'authorization_token' in request.query:
            token = request.query['authorization_token']
        else:
            setattr(request, 'user', AnonymousUser())
            return await handler(request)

        # Try select user auth record from Tarantool by token index
        res = await app['tnt'].select('auth', [token, ])
        cached = res.body
        if not cached:
            raise exceptions.AuthenticationFailed()

        # Build basic user data and bind it to User instance
        record = cached[0]
        user = User()
        user.bind_cached_tarantool(record)

        # Add User to request
        setattr(request, 'user', user)

        return await handler(request)
    return middleware_handler

对于从请求中提取数据:

DATA_METHODS = [METH_POST, METH_PUT, METH_PATCH, METH_DELETE]
JSON_CONTENT = ['application/json', ]
XML_CONTENT = ['application/xml', ]
FORM_CONTENT = ['application/x-www-form-urlencoded', 'multipart/form-data']

async def request_data_handler(app, handler):
    """
    Request .data middleware
    Try extract POST data or application/json from request body
    """
    async def middleware_handler(request):
        data = None
        if request.method in DATA_METHODS:
            if request.has_body:
                if request.content_type in JSON_CONTENT:
                    # If request has body - try to decode it to JSON
                    try:
                        data = await request.json()
                    except JSONDecodeError:
                        raise exceptions.ParseError()
                elif request.content_type in XML_CONTENT:
                    if request.charset is not None:
                        encoding = request.charset
                    else:
                        encoding = api_settings.DEFAULT_CHARSET
                    parser = etree.XMLParser(encoding=encoding)
                    try:
                        text = await request.text()
                        tree = etree.XML(text, parser=parser)
                    except (etree.ParseError, ValueError) as exc:
                        raise exceptions.ParseError(
                            detail='XML parse error - %s' % str(exc))
                    data = tree
                elif request.content_type in FORM_CONTENT:
                    data = await request.post()

        if data is None:
            # If not catch any data create empty MultiDictProxy
            data = MultiDictProxy(MultiDict())

        # Attach extracted data to request
        setattr(request, 'data', data)

        return await handler(request)
    return middleware_handler

创建几个序列化器:

class UserRegisterSerializer(s.Serializer):
    """Register new user"""
    email = s.EmailField(max_length=256)
    password = s.CharField(min_length=8, max_length=64)
    first_name = s.CharField(min_length=2, max_length=64)
    middle_name = s.CharField(default='', min_length=2, max_length=64, 
                              required=False, allow_blank=True)
    last_name = s.CharField(min_length=2, max_length=64)
    phone = s.CharField(max_length=32, required=False,
                        allow_blank=True, default='')

    async def register_user(self, app):
        user = User()
        data = self.validated_data
        try:
            await user.register_user(data, app)
        except Error as e:
            resolve_db_exception(e, self)
       return user

还有一些 ViewSet。可能嵌套在bindings['custom']

class UserViewSet(BaseViewSet):
    name = 'user'
    lookup_url_kwarg = 'user_id:[0-9a-f]32'
    permission_classes = [AllowAny, ]
    bindings = 
        'list': 
            'retrieve': 'get',
            'update': 'put'
        ,
        'custom': 
            'list': 
                'set_status': 'post',
                'create_new_sip_password': 'post',
                'get_registration_domain': 'get',
                'report': UserReportViewSet
            
        
    

    @staticmethod
    async def resolve_sip_host(data, user, app):
        sip_host = await resolve_registration_switch(user, app)
        data.update('sip_host': sip_host)

    async def retrieve(self, request):
        user = User()
        await user.load_from_db(request.match_info['user_id'], request.app)

        serializer = user_ser.UserProfileSerializer(instance=user)
        data = serializer.data
        await self.resolve_sip_host(data, user, request.app)
        return Response(data=data)

    @atomic
    @set_permissions([AuthenticatedOnly, IsCompanyMember, CompanyIsEnabled])
    async def update(self, request):
        serializer = user_ser.UserProfileSerializer(data=request.data)
        serializer.is_valid(raise_exception=True)
        user = await serializer.update_user(user_id=request.user.id,
                                            app=request.app)

        serializer = user_ser.UserProfileSerializer(instance=user)
        data = serializer.data
        await self.resolve_sip_host(data, user, request.app)

        return Response(data=data)

注册视图集并运行应用程序:

def setup_routes(app: APIApplication):
    """Add app routes here"""
    # Auth API
    app.router.register_viewset('/auth', auth_vws.AuthViewSet())
    # User API
    app.router.register_viewset('/user', user_vws.UserViewSet())
    # Root redirection to Swagger
    redirect = app.router.add_resource('/', name='home_redirect')
    redirect.add_route('*', swagger_redirect)


def create_api_app():
    sentry = get_sentry_middleware(settings.SENTRY_CONNECT_STRING, settings.SENTRY_ENVIRONMENT)
    middlewares = [sentry, token_authentication, request_data_handler]
    api_app = APIApplication(name='api', middlewares=middlewares, 
client_max_size=10*(1024**2))

    api_app.on_startup.append(startup.startup_api)
    api_app.on_shutdown.append(startup.shutdown_api)
    api_app.on_cleanup.append(startup.cleanup_api)

    setup_routes(api_app)

if __name__ == '__main__':
    app = create_api_app()
    web.run_app(app)

【讨论】:

以上是关于DRF 中的异步请求错误的主要内容,如果未能解决你的问题,请参考以下文章

如何保证异步请求中的正确顺序

向http服务器发送了很多异步请求,但每次只服务几个请求。

使用并行异步请求处理错误

Guzzle |异步请求|资源类型错误无效

Vue--axios:vue中的ajax异步请求(发送和请求数据)vue-resource异步请求和跨域

多个异步请求